Reputation: 61
I have big list of folders ( 10.000 folders ) with .gz file(s) inside and try to do something per-folder basis, e.g. split each file(s) on smaller pieces.
To achieve this I decided :
foldersRDD.foreach(folderName =>
....
sc.textFile(folderName)
....
It works locally but on cluster lead to NullPointerException ( I guess, SparkContext is null for each executor node and we can't use it inside node's functions code at all ).
How I can redo this example in way that ensure 1-folder-per-single-worker execution mode or other way to avoid / minimize any heavy operations like shuffle ?
Upvotes: 1
Views: 1562
Reputation: 2095
You could use a combination of your solution altogether with the command wholeTextFiles()
. It's just a trick, but it might serve you well.
According to the official spark documentation, the command wholeTextFiles()
lets you read a directory containing multiple small text files, returning each of them as filename/content
pairs. This is in contrast with textFile()
, which would return one record per line in each file.
You could, starting from the original array of folder paths, create a set of key/value RDD-s, each one representing the name and content of an entire folder in a filename/content
data format.
Consider the following starting scenario:
Folder 1 (location > hdfs:\\Folder1)
- File01 (location > hdfs:\\Folder1\File01) > Hello this is the content of file 01
- File02 (location > hdfs:\\Folder1\File02) > Hello this is the content of file 02
Folder 2 (location > hdfs:\\Folder1)
- File03 (location > hdfs:\\Folder2\File03) > Hello this is the content of file 03
- File04 (location > hdfs:\\Folder2\File04) > Hello this is the content of file 04
Assuming you have an array of strings that contains the name of each folder that looks like
DirArray[0]: "hdfs:\\Folder1"
DirArray[1]: "hdfs:\\Folder2"
The next step would be to create a single RDD per folder. Each RDD would represent the entire list of filenames and their content in a filename/content
format. To achieve this, you could iterate through the path array and invoke the command wholeTextFiles()
for each element. It would consist on something like the following:
For each element in DirArray > wholeTextFiles("hdfs:\\FolderN")
Each of the resultant RDD would look something like below:
firstFolderRDD (key/value):
- "hdfs:\\Folder1\File01" > "Hello this is the content of file 01
- "hdfs:\\Folder1\File02" > "Hello this is the content of file 02
At this point, there would be two alternatives:
a) Store each RDD in an array-like structure and compute its elements later on
b) Compute the elements of each RDD as they are generated (in the previous for each part).
It is important to notice that this approach is recommended only for a set of small files, mainly because each line of the newly created RDD-s would contain the entire content of the file it represented.
Upvotes: 1
Reputation: 4515
In your example the code inside foreach
is serialized and transfered to the workers. You're right, there is no sparkContext on them.
My suggestion: use foldersList
instead of RDD and pray that your RDDs are created on the same workers where data is. In ideal case where you have small files (not divided by hdfs on many nodes) and have enough memory on each worker - there would be no shuffling. In the real case yarn will reduce the cost for you - it's his work, not yours.
P.S. There could be some tricks and other more experienced folks can answer you better. I just recommend trust hadoop magic under the hood and spent your time on the actual implementation of algorithms. Good luck!
Upvotes: 0