Reputation: 3592
I have a folder that contains n
number of files.
I am creating an RDD that contains all the filenames of above folder with the code below:
fnameRDD = spark.read.text(filepath).select(input_file_name()).distinct().rdd)
I want to iterate through these RDD
elements and process following steps:
I already have a Function written which has steps that I've tested for Single file and it works fine But I've tried various things syntactically to do first 2 steps, but I am just getting invalid syntax every time.
I know I am not supposed to use map()
since I want to read a file in each iteration which will require sc
, but map
will be executed inside worker node where sc
can't be referenced.
Also, I know I can use wholeTextFiles()
as an alternative, but that means I'll be having text of all the files in memory throughout the process, which doesn't seems efficient to me.
I am open to suggestions for different approaches as well.
Upvotes: 3
Views: 889
Reputation: 9425
There are possibly other, more efficient ways to do it but assuming you already have a function SomeFunction(df: DataFrame[value: string])
, the easiest would be to use toLocalIterator()
on your fnameRDD
to process one file at a time. For example:
for x in fnameRDD.toLocalIterator():
fileContent = spark.read.text(x[0])
# fileContent.show(truncate=False)
SomeFunction(fileContent)
A couple of thoughts regarding efficiency:
.collect()
, .toLocalIterator()
brings data to driver memory one partition at a time. But in your case, after you call .distinct()
, all the data will reside in a single partition, and so will be moved to the driver all at once. Hence, you may want to add .repartition(N)
after .distinct()
, to break that single partition into N smaller ones, and avoid the need to have large heap on the driver. (Of course, this is only relevant if your list of input files is REALLY long.)Upvotes: 2
Reputation: 146
I believe you're looking for recursive file lookup,
spark.read.option("recursiveFileLookup", "true").text(filepathroot)
if you point this to the root directory of your files, spark will traverse the directory and pick up all the files that sit under the root and child folders, this will read the file into a single dataframe
Upvotes: 1