Confused Coders is a place where we share lessons and thoughts with you. Feel free to fire you doubts straight on our face and we will try best to come back to you with the clarifications. We also have few pdf's which might be helpful to you for your interview preparations.

     Book shelf: Feel free to download and share. Cheers \m/

           



Have Fun !

How to connect/query Hive metastore on EMR cluster

Just Look for the hive config file – On EMR emr-4.7.2 it is here – less /etc/hive/conf/hive-site.xml Look for the below properties in the hive-site <property> <name>javax.jdo.option.ConnectionURL</name> <value>jdbc:mysql://ip-xx-xx-xx-xx:3306/hive?createDatabaseIfNotExist=true</value> <description>username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionUserName</name> <value>hive</value> <description>username to use against metastore database</description> </property> <property> <name>javax.jdo.option.ConnectionPassword</name> <value>xxxxxxxxxxx</value> <description>password to use against metastore database</description> </property>   Use your fav sql query editor to connect to hive. Thats how it would look on Intellij IDEA. show databases; use information_schema; show tables; use hive; — This shows the hive meta store version — select * from VERSION; 1,0.14.0,Hive release version 0.14.0 Or to access mysql directly – mysql -h ip-xx.xx.xx.xx -P 3306 -u hive […]

How to get the Hive metastore version on EMR cluster

Quick note – $ /usr/lib/hive/bin/schematool -dbType mysql -info Metastore connection URL: jdbc:mysql://ip-XX.XX.XX.XX:3306/hive?createDatabaseIfNotExist=true Metastore Connection Driver : org.mariadb.jdbc.Driver Metastore connection User: hive Hive distribution version: 0.14.0 Metastore schema version: 0.14.0 schemaTool completed

Hive Dynamix partition Error : [Fatal Error] total number of created files now is 100028, which exceeds 100000. Killing the job.

[Fatal Error] total number of created files now is 900320, which exceeds 900000. Killing the job. tldr; quick fix – but probably not the right thing to do always: SET hive.exec.max.created.files=900000; So my config increases the default partitions and files created limit: set hive.exec.dynamic.partition=true; set hive.exec.max.dynamic.partitions=100000; SET hive.exec.max.dynamic.partitions.pernode=100000; set hive.exec.dynamic.partition.mode=nonstrict; SET hive.exec.max.created.files=900000; Correct thing to do: Investigate why Hive is creating these many files. Most partitions should be within 100000, so hitting the limit sounds suspicious. This happens when we misplace the wrong columns in the partition column and the wrong data creates thousands of useless partitions. Verify the query && check output data location to check what exact files and […]

Spark append mode for partitioned text file fails with SaveMode.Append – IOException File already Exists

Code- dataDF.write.partitionBy(“year”, “month”, “date”).mode(SaveMode.Append).text(“s3://data/test2/events/”) Error- 16/07/06 02:15:05 ERROR datasources.DynamicPartitionWriterContainer: Aborting task. java.io.IOException: File already exists:s3://path/1839dd1ed38a.gz at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.create(S3NativeFileSystem.java:614) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:913) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:894) at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:791) at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.create(EmrFileSystem.java:177) at org.apache.hadoop.mapreduce.lib.output.TextOutputFormat.getRecordWriter(TextOutputFormat.java:135) at org.apache.spark.sql.execution.datasources.text.TextOutputWriter.<init>(DefaultSource.scala:156) at org.apache.spark.sql.execution.datasources.text.TextRelation$$anon$1.newInstance(DefaultSource.scala:125) at org.apache.spark.sql.execution.datasources.BaseWriterContainer.newOutputWriter(WriterContainer.scala:129) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.newOutputWriter$1(WriterContainer.scala:424) at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:356) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) 16/07/06 02:15:05 INFO output.DirectFileOutputCommitter: Nothing to clean up on abort since there are no temporary files written 16/07/06 02:15:05 ERROR datasources.DynamicPartitionWriterContainer: Task attempt attempt_201607060215_0004_m_001709_3 aborted. 16/07/06 02:15:05 ERROR executor.Executor: Exception in task 1709.3 in stage 4.0 (TID 12093) org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.execution.datasources.DynamicPartitionWriterContainer.writeRows(WriterContainer.scala:414) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelation$$anonfun$run$1$$anonfun$apply$mcV$sp$3.apply(InsertIntoHadoopFsRelation.scala:150) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214) at […]

How to write gzip compressed Json in spark data frame

A compressed format can be specified in spark as : conf = SparkConf() conf.set(“spark.hadoop.mapred.output.compress”, “true”) conf.set(“spark.hadoop.mapred.output.compression.codec”, “true”) conf.set(“spark.hadoop.mapred.output.compression.codec”, “org.apache.hadoop.io.compress.GzipCodec”) conf.set(“spark.hadoop.mapred.output.compression.type”, “BLOCK”) The same can be provided to spark shell as: $> spark-shel –conf spark.hadoop.mapred.output.compress=true –conf spark.hadoop.mapred.output.compression.codec=true –conf spark.hadoop.mapred.output.compression.codec=org.apache.hadoop.io.compress.GzipCodec –conf spark.hadoop.mapred.output.compression.type=BLOCK The code for writing the Json/Text is same as usual- case class C(key: String, value: String) val list = List(C(“a”, “b”), C(“c”, “d”), C(“e”, “f”)) val rdd = sc.makeRDD(list) import sqlContext.implicits._ val df = rdd.toDF df.write.mode(“append”).json(“s3://work/data/tests/json”) Thats it. We should now have compressed GZ files as output.    

Spark Sql job executing very slow – Performance tuning

I have been facing trouble with a basic spark sql job which was unable to process 10’s of gigs in hours. Thats when I demystified the ‘spark.sql.shuffle.partitions’ which tends to slow down the job insanely. Adding the below changes to the Spark Sql code fixes the issue for me. Magic. // For handling large number of smaller files events = sqlContext.createDataFrame(rows]).coalesce(400) events.registerTempTable(“input_events”) // For overriding default value of 200 sqlContext.sql(“SET spark.sql.shuffle.partitions=10”) sqlContext.sql(sql_query)

Indexing csv data in Solr via Python – PySolr

Here is a crisp post to index Data in Solr using Python. 1. Install Pre-requisites – pip – PySolr 2. Python Script #!/usr/bin/python import sys, getopt import pysolr import csv, json #SOLR_URL=http://54.254.192.149:8983/solr/feeddata/ def main(args): solrurl=” inputfile=” try: opts, args = getopt.getopt(args,”hi:u:”) except getopt.GetoptError: print ‘index_data.py -i -u ‘ sys.exit(2) for opt, arg in opts: if opt == ‘-h’: print ‘index_data.py -i -u ‘ sys.exit() elif opt in (“-i”): inputfile = arg elif opt in (“-u”): solrurl = arg # create a connection to a solr server s = pysolr.Solr(solrurl, timeout=10) keys=(“rank”, “pogid”, “cat”, “subcat”, “question_bucketid”, “brand”, “discount”, “age_grp”, “gender”, “inventory”, “last_updated”) record_count=0 for line in open(inputfile, ‘r’).readlines(): splits = line.split(‘,’) […]

How to get Pig Logical plan (Execution DAG) from Pig Latin script

TLDR; A Pig Logical plan is the Plan DAG that is used to execute the chain oj Jobs on Hadoop. Here is the code snippet for obtaining a Pig latin Logical Plan DAG frpm a Pig Script- https://github.com/yssharma/pig-on-drill/blob/b2d8a23c11d03974e16eb2ff44e021b1e957f03f/exec/java-exec/src/main/java/org/apache/drill/exec/pigparser/parser/PigLatinParser.java#L53

PySolr : How to boost a field for Solr document

Adding a Quick note – PySolr : How to boost a field for Solr document Index time boosting conn.add(docs, boost={‘author’: ‘2.0’,}) Query time boosting qf=title^5 content^2 comments^0.5 Read: http://java.dzone.com/articles/options-tune-document%E2%80%99s    

JSolr Exception – Exception in thread “main” org.apache.solr.common.SolrException: Bad Request

Exception in thread “main” org.apache.solr.common.SolrException: Bad Request Bad Request request: http://54.254.192.149:8983/solr/feeddata/update?wt=javabin&version=2 Solution: Check Solr logs. INFO – 2014-11-07 07:04:42.985; org.apache.solr.update.processor.LogUpdateProcessor; [feeddata] webapp=/solr path=/update params={wt=javabin&version=2} {} 0 1 ERROR – 2014-11-07 07:04:42.985; org.apache.solr.common.SolrException; org.apache.solr.common.SolrException: Document is missing mandatory uniqueKey field: id Here it is: Document is missing mandatory uniqueKey field: id   Another instance: INFO – 2014-11-07 07:13:21.684; org.apache.solr.update.processor.LogUpdateProcessor; [feeddata] webapp=/solr path=/update params={wt=javabin&version=2} {} 0 1 ERROR – 2014-11-07 07:13:21.685; org.apache.solr.common.SolrException; org.apache.solr.common.SolrException: ERROR: [doc=0] unknown field ‘win_hour’ Takeaway :  Logs are very helpful. Do have a look before searching else where.