Reputation: 129
I have got a list of directories in HDFS each containing several files. My goal is to merge all files from one directory into only one file but for each directory separately. What is the fastest way to do this in spark? Sequentially iterating over all directories is too slow. So I want to do it in parallel. One solution might be to use a thread pool. Maybe there is a better and faster more native one?
Thanks!
Upvotes: 3
Views: 716
Reputation: 4540
Consider the following test directories foo
and bar
that contains the following files:
cat /tmp/foo/0.csv
4
cat /tmp/foo/1.csv
3
cat /tmp/bar/0.csv
7
We can read them using the following snippet:
val df = spark.read.csv("/tmp/foo", "/tmp/bar")
.withColumn("dir", regexp_extract(input_file_name(), """([^/]*)/[^/]+\.csv$""", 1))
df.show()
/*
+---+---+
|_c0|dir|
+---+---+
|4 |foo|
|3 |foo|
|7 |bar|
+---+---+
*/
Function input_file_name
gives the absolute path of the file so we can use that to get the directory. Function regexp_extract
is used just to convert e.g /tmp/foo/1.csv -> foo
.
When Spark writes files, it outputs one file per partition. Thus, we need to repartition by column dir
to merge all files under each dir. Finally, we can use partitionBy
to get the directory name to the output file structure as well. For example
df.repartition($"dir")
.write
.partitionBy("dir")
.csv("/tmp/out")
would produce files
/tmp/out/._SUCCESS.crc
/tmp/out/dir=bar/.part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv.crc
/tmp/out/dir=bar/part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
/tmp/out/_SUCCESS
/tmp/out/dir=foo/part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
/tmp/out/dir=foo/.part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv.crc
where /tmp/out/dir=bar/part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
contains
7
and /tmp/out/dir=foo/part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
contains
4
3
AFAIK it is not possible to write these output files to the same directory structure as the original input without e.g. having a customised Hadoop FileSystem
class etc.
Upvotes: 4