Reputation: 23
I would like to store the result of the work in a specific variable after multiprocessing as shown below.
Alternatively, I want to save the results of the job as a csv file. May I know how to do it?
This is my code:
(I want to get 'df4' and 'df7' data and to save csv file)
import pandas as pd
from pandas import DataFrame
import time
import multiprocessing
df2 = pd.DataFrame()
df3 = pd.DataFrame()
df4 = pd.DataFrame()
df5 = pd.DataFrame()
df6 = pd.DataFrame()
df7 = pd.DataFrame()
df8 = pd.DataFrame()
date = '2011-03', '2011-02' ........ '2021-03' #There are 120 list.
list1 = df1['resion'].drop_duplicates() # There are 20 list. 'df1' is original data
#I'd like to divide the list and work on it.
list11 = list1.iloc[0:10]
list12 = list1.iloc[10:20]
#It's a function using 'list11'.
def cal1():
global df2
global df3
global df4
start = time.time()
for i, t in enumerate(list11):
df2 = pd.DataFrame(df1[df1['resion'] == t]) #'df1' is original data
if i%2 == 0:
print ("cal1 function processing: ", i)
end = time.time()
print (end-start)
else:
pass
for n, d in enumerate(date):
df3 = pd.DataFrame(df2[df2['date'] == d])
df3['number'] = df3['price'].rank(pct=True, ascending = False )
df4 = df4.append(pd.DataFrame(df3))
return df4
#It's a function using 'list12'.
def cal2():
global df5
global df6
global df7
start = time.time()
for i, t in enumerate(list12):
df5 = pd.DataFrame(df1[df1['resion'] == t]) #'df1' is original data
if i%2 == 0:
print ("cal1 function processing: ", i)
end = time.time()
print (end-start)
else:
pass
for n, d in enumerate(date):
df6 = pd.DataFrame(df5[df5['date'] == d])
df6['number'] = df6['price'].rank(pct=True, ascending = False )
df7 = df7.append(pd.DataFrame(df6))
return df7
## Multiprocessing code
if __name__ == "__main__":
# creating processes
p1 = multiprocessing.Process(target=cal1, args=())
p2 = multiprocessing.Process(target=cal2, args=())
# starting process 1
p1.start()
# starting process 2
p2.start()
# wait until process 1 is finished
p1.join()
# wait until process 2 is finished
p2.join()
# both processes finished
print("Done!")
Upvotes: 2
Views: 87
Reputation: 23356
It looks like your functions cal1
and cal2
are identical except that they are trying to assign results to some different global variables. This is not going to work, because when you run them in a subprocess, they will assign that global variable in the subprocess, but that will have no impact whatsoever on the main process from which you started them.
If you want to map a function to multiple input ranges across multiple processes you can use a process Pool and Pool.map.
For example:
def cal(input_list):
start = time.time()
for i, t in enumerate(input_list):
df2 = pd.DataFrame(df1[df1['resion'] == t]) #'df1' is original data
if i%2 == 0:
print ("cal1 function processing: ", i)
end = time.time()
print (end-start)
else:
pass
for n, d in enumerate(date):
df3 = pd.DataFrame(df2[df2['date'] == d])
df3['number'] = df3['price'].rank(pct=True, ascending = False )
df4 = df4.append(pd.DataFrame(df3))
# I kept your original code unmodified but I'm not really sure this
# is what to do, because you are returning after one pass through the
# outer loop. I haven't scrutinized what you are actually trying to
# do but I suspect this is wrong too.
return df4
Then create a process pool and you can divide up the input how you want (or, with a bit of tweaking, you can let Pool.map
chunk the input for you, and then reduce the outputs from map into a single output):
pool = multiprocessing.Pool(2)
dfs = pool.map(cal, [list1.iloc[0:10], list1.iloc[10:20]])
This is just to get you started. I would probably do a number of other things differently as well.
Upvotes: 1