GANdalf85
GANdalf85

Reputation: 129

Process multiple directories in spark separately

I have got a list of directories in HDFS each containing several files. My goal is to merge all files from one directory into only one file but for each directory separately. What is the fastest way to do this in spark? Sequentially iterating over all directories is too slow. So I want to do it in parallel. One solution might be to use a thread pool. Maybe there is a better and faster more native one?

Thanks!

Upvotes: 3

Views: 716

Answers (1)

ollik1
ollik1

Reputation: 4540

Consider the following test directories foo and bar that contains the following files:

cat /tmp/foo/0.csv
4
cat /tmp/foo/1.csv
3
cat /tmp/bar/0.csv
7

We can read them using the following snippet:

val df = spark.read.csv("/tmp/foo", "/tmp/bar")
  .withColumn("dir", regexp_extract(input_file_name(), """([^/]*)/[^/]+\.csv$""", 1))
df.show()
/*
+---+---+
|_c0|dir|
+---+---+
|4  |foo|
|3  |foo|
|7  |bar|
+---+---+
*/

Function input_file_name gives the absolute path of the file so we can use that to get the directory. Function regexp_extract is used just to convert e.g /tmp/foo/1.csv -> foo.

When Spark writes files, it outputs one file per partition. Thus, we need to repartition by column dir to merge all files under each dir. Finally, we can use partitionBy to get the directory name to the output file structure as well. For example

df.repartition($"dir")
  .write
  .partitionBy("dir")
  .csv("/tmp/out")

would produce files

/tmp/out/._SUCCESS.crc
/tmp/out/dir=bar/.part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv.crc
/tmp/out/dir=bar/part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
/tmp/out/_SUCCESS
/tmp/out/dir=foo/part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv
/tmp/out/dir=foo/.part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv.crc

where /tmp/out/dir=bar/part-00067-d780b550-785f-416c-b090-8d93694ba65c.c000.csv contains

7

and /tmp/out/dir=foo/part-00110-d780b550-785f-416c-b090-8d93694ba65c.c000.csv contains

4
3

AFAIK it is not possible to write these output files to the same directory structure as the original input without e.g. having a customised Hadoop FileSystem class etc.

Upvotes: 4

Related Questions