Reputation: 11
I have a large python project right now, in which the driver program has a function that uses a for loop to traverse each file on my GCP (google cloud platform) bucket. I'm using CLI to submit the job to GCP and let the job run there on GCP.
For each file being traversed in this for loop, I'm invoking a function parse_file(...) that parses the file and invokes serials of other functions that deal with this file.
The whole project runs and takes a few minutes, which is slow, and the driver program hasn't used much PySpark yet. The issue is each parse_file(...) in that file-level for loop is executed in sequential order. Is it possible to use PySpark to parallelize that file-level for loop to run the parse_file(...) function in parallel for all these files to reduce program execution time and improve efficiency? If so, since the program isn't using PySpark, is there a lot of code modification needed to be done to make it parallelized?
So the function of the program looks like this
# ... some other codes
attributes_table = ....
for obj in gcp_bucket.objects(path):
if obj.key.endswith('sys_data.txt'):
#....some other codes
file_data = (d for d in obj.download().decode('utf-8').split('\n'))
parse_file(file_data, attributes_table)
#....some other codes ....
How do I use PySpark to parallelize this part instead of using for loop traverse file one at a time?
Upvotes: 1
Views: 1738
Reputation: 788
Thank you for asking your question.
I would recommend creating a RDD based on your gcp_bucket.objects(path)
.
You have your SparkContext so creating the RDD should be as simple as:
my_rdd = sc.parallelize(gcp_bucket.objects(path)
.
To the uninitiated the convention is to assign the SparkContext is assigned to the variable sc
. The contents of your for loop will have to be put into a function, let's call it my_function
. You now have all your pieces.
Your next step will be mapping your function as such:
results_dag = my_rdd.map(my_function)
results = results_dag.collect()
Recall that Spark performs lazy evaluation. This is why we need to perform the collect
operation at the end.
A few other recommendations. The first is run your code on a small set of objects from your GCP bucket. Get a sense of the timings. The other recommendation, to facilitate good coding practices, is to consider breaking the operations inside your for loop down even further into additional RDDs. You can always chain them together...
my_rdd = sc.parallelize(gcp_bucket.objects(path)
dag1 = my_rdd.map(function1)
dag2 = dag1.map(function2)
dag3 = dag2.map(function3)
results = dag3.collect()
Upvotes: 1