Deno George
Deno George

Reputation: 362

Spark Saving Results to HDFS

I am facing strange behavior while saving my parsed XML file using spark with one master and three worker nodes on HDFS, the problem is

when i parsed XMLFile and trying to save in HDFS then file is not able to save with all the parsed result.

and when i executed same code with local-Mode by specifying

sc = SparkContext("local", "parser") 

and the spark-submit will be ./bin/spark-submit xml_parser.py

this run gives 117mb parsed file on hdfs with complete records.

And in case of executing code in spark-client mode then i did the following,

 sc = SparkContext("spark://master:7077", "parser") 

And the spark submit is,

./bin/spark-submit --master yarn-client --deploy-mode client --driver-memory 7g --executor-memory 4g  --executor-cores 2  xml_parser.py 1000

giving me 19mb file on hdfs with incomplete records.

for saving the result in both the cases i am using rdd.saveAsTextFile("hdfs://")

i am using spark1.6.1-hadoop2.6 and Apache hadoop 2.7.2

can anyone help me out. I am not getting why it is happening. I have the following sparkCluster,

1-master 8GbRAM

2-workerNode1 8GbRAM

3-WorkerNode2 8GbRAM

4-workerNode3 8GbRAM

and i have configured above cluster over Hadoop-2.7.2 with 1 master and 3 DataNode,

if I jps On severNode gives me,

24097 Master

21652 Jps

23398 NameNode

23799 ResourceManager

23630 SecondaryNameNode

the JPS On all the DataNodes,

8006 Worker

7819 NodeManager

27164 Jps

7678 DataNode

By checking HadoopNameNode ui master:9000 give me the three live DataNodes

by checking SparkMaster Ui on master:7077 gives me three live workers

please have look here,

sc = SpakContext("spark://master:7077", "parser")
--------------------------------------------
 contains the logic of XMLParsing
--------------------------------------------
and I am appending the result in one list like,
cc_list.append([final_cll, Date,Time,int(cont[i]), float(values[i]),0])
Now I am Parallelizing the above cc_list like
 parallel_list = sc.parallelize(cc_list)
 parallel_list.saveAsTextFile("hdfs://master:9000/ some path")
 Now I am Doing some operations here.
 new_list = sc.textFile("hdfs://localhost:9000/some path/part-00000).map(lambda line:line.split(','))

 result = new_list.map(lambda x: (x[0]+',   '+x[3],float(x[4]))).sortByKey('true').coalesce(1)
 result = result.map(lambda x:x[0]+','+str(x[1]))
 result = result.map(lambda x: x.lstrip('[').rstrip(']').replace(' ','')).saveAsTextFile("hdfs://master:9000/some path1))

Upvotes: 2

Views: 1366

Answers (1)

Deno George
Deno George

Reputation: 362

Sorry For such fool question here. actually I find out two problems

1)while running on multiple worker,

 parallel_list = sc.parallelize(cc_list) 

creates 4-5 part files and parallel_list is saved in Hdfs with part-00000 to part-00004, and while loading parallel_list u can see above in code

new_list = sc.textFile(pathto parallel_list/part-00000) ==> so it was taking only the first part.

2) while running on localMode,

 parallel_list = sc.parallelize(cc_list) was creating only one part file so i was able to pick whole file at one stroke.

So while running on spark with workers i come up with two solutions

1) I just added the part-* while creating new_list from parallel_list

2)increased the spark.akka.frameSize to 10000 by passing --conf spark.akka.frameSize=1000 with spark submit.

Upvotes: 1

Related Questions