user0204
user0204

Reputation: 261

How to process multiple pyspark dataframes in parallel

I have a pyspark dataframe with millions of records and hundreds of columns (an example below)

clm1, clm2, clm3
code1,xyz,123
code2,abc,345
code1,qwe,456

I want to do divide it into multiple dataframes based on clm1 i.e. separate dataframe for clm1=code1 and separate dataframe for clm1=code2 and so on and then process them and write the result in separate files. I want to perform this operation in parallel to speed up the process. I am using below code:

S1 = myclass("code1")
S2 = myclass("code2")


t1 = multiprocessing.Process(target=S1.processdata,args=(df,))
t2 = multiprocessing.Process(target=S2.processdata,args=(df,))
t1.start()
t2.start()

t1.join()
t2.join()

but i am getting below error

Method __getstate__([]) does not exist

If I use threading.Thread instead of multiprocessing.Process it is working fine but that doesn't seem to reduce the overall time

Upvotes: 1

Views: 3961

Answers (1)

ggeop
ggeop

Reputation: 1375

About error

Method getstate([]) does not exist

It's a py4j.Py4JException. You have this error with multiprocessing.Process because this module uses processes. On the other hand threading.Thread uses threads which use the same memory, so they can share the the dataframe object.

Take a also a look in that SO question-answer: Multiprocessing vs Threading Python


General advice

I understand that maybe you are new, to Spark world and I suggest you my solution for your problem. You asked how to do multiprocessing, but if you have Spark maybe this is not a best practice.

You have Spark-a framework for parallel processing, you don't need to parallelize manually your task.

Spark has designed for parallel computing in a cluster, but it works extremely nice in a large single node. Multiprocessing library is useful in Python computation tasks, in Spark/Pyspark all the computations run in parallel in JVM.

In python_code.py

import pyspark.sql.functions as f


# JOB 1
df1 = df.filter(f.col('clm1')=='code1')
... many transformations
df1.write.format('..')..

# JOB 2
df2 = df.filter(f.col('clm1')=='code2')
... many transformations
df2.write.format('..')..

And then run this code with spark-submit by using all your cores (* = all cores)

# Run application locally on all cores
./bin/spark-submit --master local[*] python_code.py

With this approach, you use the Spark power. The jobs will be executed sequentially BUT you will have: CPU utilization all the time <=> parallel processing <=> lower computation time

Upvotes: 2

Related Questions