Reputation: 561
I use the next code:
csv.saveAsTextFile(pathToResults, classOf[GzipCodec])
pathToResults directory has many files like part-0000, part-0001 etc. I can use FileUtil.copyMerge(), but it's really slow, it's download all files on driver program and then upload them in hadoop. But FileUtil.copyMerge() faster than:
csv.repartition(1).saveAsTextFile(pathToResults, classOf[GzipCodec])
How can I merge spark results files without repartition and FileUtil.copyMerge()?
Upvotes: 7
Views: 24787
Reputation: 81
coalesce(1) is working just fine. I also see hadoop-streaming option that can merge HDFS files on the fly if you would like to run this script:
$ hadoop jar /usr/hdp/2.3.2.0-2950/hadoop-mapreduce/hadoop-streaming-2.7.1.2.3.2.0-2950.jar \
-Dmapred.reduce.tasks=1 \
-input "/hdfs/input/dir" \
-output "/hdfs/output/dir" \
-mapper cat \
-reducer cat
Upvotes: 0
Reputation: 14939
Had exactly the same question and had to write pySpark code (with calls to Hadoop API) that implements copyMerge:
https://github.com/Tagar/stuff/blob/master/copyMerge.py
Unfortunately copyMerge as a standalone Hadoop API call is going to be deprecated and removed in Hadoop 3.0. So this implementation doesn't depend on Hadoop's copyMerge (it re-implements it).
Upvotes: 0
Reputation: 5018
Unfortunately, there is not other option to get a single output file in Spark. Instead of repartition(1)
you can use coalesce(1)
, but with parameter 1
their behavior would be the same. Spark would collect your data in a single partition in memory which might cause OOM error if your data is too big.
Another option for merging files on HDFS might be to write a simple MapReduce job (or Pig job, or Hadoop Streaming job) that would get the whole directory as an input and using a single reducer generate you a single output file. But be aware that with the MapReduce approach all the data would be first copied to the reducer local filesystem which might cause "out of space" error.
Here are some useful links on the same topic:
Upvotes: 8