Harshil Doshi
Harshil Doshi

Reputation: 3592

Loop through RDD elements, read its content for further processing

I have a folder that contains n number of files.

I am creating an RDD that contains all the filenames of above folder with the code below:

fnameRDD = spark.read.text(filepath).select(input_file_name()).distinct().rdd)

I want to iterate through these RDD elements and process following steps:

  1. Read content of each element (each element is a filepath, so need to read content throgh SparkContext)
  2. Above content should be another RDD which I want to pass as an argument to a Function
  3. Perform certain steps on the RDD passed as argument inside called function

I already have a Function written which has steps that I've tested for Single file and it works fine But I've tried various things syntactically to do first 2 steps, but I am just getting invalid syntax every time.

I know I am not supposed to use map() since I want to read a file in each iteration which will require sc, but map will be executed inside worker node where sc can't be referenced.

Also, I know I can use wholeTextFiles() as an alternative, but that means I'll be having text of all the files in memory throughout the process, which doesn't seems efficient to me.

I am open to suggestions for different approaches as well.

Upvotes: 3

Views: 889

Answers (2)

mazaneicha
mazaneicha

Reputation: 9425

There are possibly other, more efficient ways to do it but assuming you already have a function SomeFunction(df: DataFrame[value: string]), the easiest would be to use toLocalIterator() on your fnameRDD to process one file at a time. For example:

for x in fnameRDD.toLocalIterator():
  fileContent = spark.read.text(x[0])
# fileContent.show(truncate=False)
  SomeFunction(fileContent)

A couple of thoughts regarding efficiency:

  1. Unlike .collect(), .toLocalIterator() brings data to driver memory one partition at a time. But in your case, after you call .distinct(), all the data will reside in a single partition, and so will be moved to the driver all at once. Hence, you may want to add .repartition(N) after .distinct(), to break that single partition into N smaller ones, and avoid the need to have large heap on the driver. (Of course, this is only relevant if your list of input files is REALLY long.)
  2. The method to list file names itself seems to be less than efficient. Perhaps you'd want to consider something more direct, using FileSystem API for example like in this article.

Upvotes: 2

smfjaw
smfjaw

Reputation: 146

I believe you're looking for recursive file lookup,

spark.read.option("recursiveFileLookup", "true").text(filepathroot)

if you point this to the root directory of your files, spark will traverse the directory and pick up all the files that sit under the root and child folders, this will read the file into a single dataframe

Upvotes: 1

Related Questions