Reputation: 67
I have a use case to read and process all the files from a directory and create separate output files after applying some transformation in spark. Here I want to perform parallel processing in spark to apply all the required transformations to the currently present files in the landing directory.
Below is the sample code that I tried but it is not working.
def fileList = .... //to fetch file names from a directory
def businessLogic() //where I am doing all the operations. (read from a file, transformation, etc)
fileList().map(businessLogic) // calling business logic in parallel
Could you please let me know how can I achieve parallel processing?
Note: The number of files can be N, reading all the files into a dataframe is not an option as I have to create output files for each file, triggering multiple spark job is also not an option.
Thanks, Sourav
Upvotes: 0
Views: 1027
Reputation: 2821
Here's an example which is basically what Srinivas has suggested in the comments.
The key here is the function input_file_name
which provides the original file name.
Note that if fileList
is a standard (not distributed) data structure (such as a DataFrame/Dataset/RDD), then actions and transformations such as forEach
are not done in parallel. If you would like to use native Scala to achieve the parallel execution, you can look into Futures.
// spark is a SparkSession
// input_file_name is spark.sql.functions.input_file_name
val df = spark.read.option("header", "true").option("inferSchema", "true").csv("path/to/data")
val df2 = df.withColumn("input", substring_index(input_file_name(), "/", -1))
df2.write.partitionBy("input").option("header", "true").csv("output")
Upvotes: 1
Reputation: 40500
Something like this, maybe?
val combinedDF = fileList.map { name =>
spark.read.whatever(name).withColumn("file_name", lit(name)
}.reduce { _ union _ }
val result = applyBusinessLogic(combinedDF).partitionBy("file_name").persist
fileList.foreach { name =>
createOutput(name, result.filter(col('file_name') === name)
}
Upvotes: 1