Reputation: 900
I have a bunch of compressed text files each line of which containing a JSON object. Simplified my workflow looks like this:
string_json = sc.textFile('/folder/with/gzip/textfiles/')
json_objects = string_json.map(make_a_json)
DataRDD = json_objects.map(extract_data_from_json)
DataDF = sqlContext.createDataFrame(DataRDD,schema).collect()
'''followed by some transformations to the dataframe'''
Now, the code works fine. The problem arises as soon as the number files can not be evenly divided between executors.
That is as far as I understand it, because spark is not extracting the files and then distributing the rows to the executors, but rather each executioner gets one file to work with.
e.g If i have 5 files and 4 executors, the first 4 files are processed in parallel and then the 5th file.
Because the 5th is not being processed in parallel with the other 4 and cannot be divided between the 4 executors, it takes the same amount of time as the first 4 together.
This happens at every stages of the program.
Is there a way to transform this kind compartmentalized RDD either into a RDD or Dataframe that is not ?
I'm using python 3.5 and spark 2.0.1
Upvotes: 0
Views: 221
Reputation: 3725
Spark operations are divided into tasks, or units of work that can be done in parallel. There are a few things to know about sc.textFile
:
Based on these two premises,your use case is going to see one task per file. You're absolutely right about how the tasks / cores ratio affects wall clock time: having 5 tasks running on 4 cores will take roughly the same time as 8 tasks on 4 cores (though not quite, because stragglers exist and the first core to finish will take on the 5th task).
A rule of thumb is that you should have roughly 2-5 tasks per core in your Spark cluster to see good performance. But if you only have 5 gzipped text files, you're not going to see this. You could try to repartition your RDD (which uses a somewhat expensive shuffle operation) if you're doing a lot downstream:
repartitioned_string_json = string_json.repartition(100, shuffle=True)
Upvotes: 2