Sourav Chatterjee
Sourav Chatterjee

Reputation: 67

How to process all the files in a directory parallelly without reading into a single dataframe spark?

I have a use case to read and process all the files from a directory and create separate output files after applying some transformation in spark. Here I want to perform parallel processing in spark to apply all the required transformations to the currently present files in the landing directory.

Below is the sample code that I tried but it is not working.

def fileList = .... //to fetch file names from a directory

def businessLogic() //where I am doing all the operations. (read from a file, transformation, etc)

fileList().map(businessLogic) // calling business logic in parallel

Could you please let me know how can I achieve parallel processing?

Note: The number of files can be N, reading all the files into a dataframe is not an option as I have to create output files for each file, triggering multiple spark job is also not an option.

Thanks, Sourav

Upvotes: 0

Views: 1027

Answers (2)

ELinda
ELinda

Reputation: 2821

Here's an example which is basically what Srinivas has suggested in the comments.

The key here is the function input_file_name which provides the original file name.

Note that if fileList is a standard (not distributed) data structure (such as a DataFrame/Dataset/RDD), then actions and transformations such as forEach are not done in parallel. If you would like to use native Scala to achieve the parallel execution, you can look into Futures.

// spark is a SparkSession
// input_file_name is spark.sql.functions.input_file_name
val df = spark.read.option("header", "true").option("inferSchema", "true").csv("path/to/data")
val df2 = df.withColumn("input", substring_index(input_file_name(), "/", -1))
df2.write.partitionBy("input").option("header", "true").csv("output")

Upvotes: 1

Dima
Dima

Reputation: 40500

Something like this, maybe?

    val combinedDF = fileList.map { name => 
      spark.read.whatever(name).withColumn("file_name", lit(name)
    }.reduce { _ union _ }

    val result = applyBusinessLogic(combinedDF).partitionBy("file_name").persist
    

    fileList.foreach { name => 
       createOutput(name, result.filter(col('file_name') === name)
    }

Upvotes: 1

Related Questions