Reputation: 362
I am facing strange behavior while saving my parsed XML file using spark with one master and three worker nodes on HDFS, the problem is
when i parsed XMLFile and trying to save in HDFS then file is not able to save with all the parsed result.
and when i executed same code with local-Mode by specifying
sc = SparkContext("local", "parser")
and the spark-submit will be ./bin/spark-submit xml_parser.py
this run gives 117mb parsed file on hdfs with complete records.
And in case of executing code in spark-client mode then i did the following,
sc = SparkContext("spark://master:7077", "parser")
And the spark submit is,
./bin/spark-submit --master yarn-client --deploy-mode client --driver-memory 7g --executor-memory 4g --executor-cores 2 xml_parser.py 1000
giving me 19mb file on hdfs with incomplete records.
for saving the result in both the cases i am using rdd.saveAsTextFile("hdfs://")
i am using spark1.6.1-hadoop2.6 and Apache hadoop 2.7.2
can anyone help me out. I am not getting why it is happening. I have the following sparkCluster,
1-master 8GbRAM
2-workerNode1 8GbRAM
3-WorkerNode2 8GbRAM
4-workerNode3 8GbRAM
and i have configured above cluster over Hadoop-2.7.2 with 1 master and 3 DataNode,
if I jps On severNode gives me,
24097 Master
21652 Jps
23398 NameNode
23799 ResourceManager
23630 SecondaryNameNode
the JPS On all the DataNodes,
8006 Worker
7819 NodeManager
27164 Jps
7678 DataNode
By checking HadoopNameNode ui master:9000 give me the three live DataNodes
by checking SparkMaster Ui on master:7077 gives me three live workers
please have look here,
sc = SpakContext("spark://master:7077", "parser")
--------------------------------------------
contains the logic of XMLParsing
--------------------------------------------
and I am appending the result in one list like,
cc_list.append([final_cll, Date,Time,int(cont[i]), float(values[i]),0])
Now I am Parallelizing the above cc_list like
parallel_list = sc.parallelize(cc_list)
parallel_list.saveAsTextFile("hdfs://master:9000/ some path")
Now I am Doing some operations here.
new_list = sc.textFile("hdfs://localhost:9000/some path/part-00000).map(lambda line:line.split(','))
result = new_list.map(lambda x: (x[0]+', '+x[3],float(x[4]))).sortByKey('true').coalesce(1)
result = result.map(lambda x:x[0]+','+str(x[1]))
result = result.map(lambda x: x.lstrip('[').rstrip(']').replace(' ','')).saveAsTextFile("hdfs://master:9000/some path1))
Upvotes: 2
Views: 1366
Reputation: 362
Sorry For such fool question here. actually I find out two problems
1)while running on multiple worker,
parallel_list = sc.parallelize(cc_list)
creates 4-5 part files and parallel_list is saved in Hdfs with part-00000 to part-00004, and while loading parallel_list u can see above in code
new_list = sc.textFile(pathto parallel_list/part-00000) ==> so it was taking only the first part.
2) while running on localMode,
parallel_list = sc.parallelize(cc_list) was creating only one part file so i was able to pick whole file at one stroke.
So while running on spark with workers i come up with two solutions
1) I just added the part-* while creating new_list from parallel_list
2)increased the spark.akka.frameSize to 10000 by passing --conf spark.akka.frameSize=1000 with spark submit.
Upvotes: 1