Reputation: 261
I have a pyspark dataframe with millions of records and hundreds of columns (an example below)
clm1, clm2, clm3
code1,xyz,123
code2,abc,345
code1,qwe,456
I want to do divide it into multiple dataframes based on clm1 i.e. separate dataframe for clm1=code1
and separate dataframe for clm1=code2
and so on and then process them and write the result in separate files. I want to perform this operation in parallel to speed up the process. I am using below code:
S1 = myclass("code1")
S2 = myclass("code2")
t1 = multiprocessing.Process(target=S1.processdata,args=(df,))
t2 = multiprocessing.Process(target=S2.processdata,args=(df,))
t1.start()
t2.start()
t1.join()
t2.join()
but i am getting below error
Method __getstate__([]) does not exist
If I use threading.Thread
instead of multiprocessing.Process
it is working fine but that doesn't seem to reduce the overall time
Upvotes: 1
Views: 3961
Reputation: 1375
Method getstate([]) does not exist
It's a py4j.Py4JException
. You have this error with multiprocessing.Process
because this module uses processes. On the other hand
threading.Thread
uses threads which use the same memory, so they can share the the dataframe object.
Take a also a look in that SO question-answer: Multiprocessing vs Threading Python
I understand that maybe you are new, to Spark world and I suggest you my solution for your problem. You asked how to do multiprocessing, but if you have Spark maybe this is not a best practice.
You have Spark-a framework for parallel processing, you don't need to parallelize manually your task.
Spark has designed for parallel computing in a cluster, but it works extremely nice in a large single node. Multiprocessing library is useful in Python computation tasks, in Spark/Pyspark all the computations run in parallel in JVM.
In python_code.py
import pyspark.sql.functions as f
# JOB 1
df1 = df.filter(f.col('clm1')=='code1')
... many transformations
df1.write.format('..')..
# JOB 2
df2 = df.filter(f.col('clm1')=='code2')
... many transformations
df2.write.format('..')..
And then run this code with spark-submit by using all your cores (* = all cores)
# Run application locally on all cores
./bin/spark-submit --master local[*] python_code.py
With this approach, you use the Spark power. The jobs will be executed sequentially BUT you will have: CPU utilization all the time <=> parallel processing <=> lower computation time
Upvotes: 2