Ketoziger log
Ketoziger log

Reputation: 23

How do I get multiprocessing results?

I would like to store the result of the work in a specific variable after multiprocessing as shown below.

Alternatively, I want to save the results of the job as a csv file. May I know how to do it?

This is my code:

(I want to get 'df4' and 'df7' data and to save csv file)

import pandas as pd
from pandas import DataFrame
import time
import multiprocessing

df2 = pd.DataFrame()
df3 = pd.DataFrame()
df4 = pd.DataFrame()
df5 = pd.DataFrame()
df6 = pd.DataFrame()
df7 = pd.DataFrame()
df8 = pd.DataFrame()

date = '2011-03', '2011-02' ........ '2021-03'    #There are 120 list.
list1 = df1['resion'].drop_duplicates()  # There are 20 list. 'df1' is original data

#I'd like to divide the list and work on it. 
list11 = list1.iloc[0:10]
list12 = list1.iloc[10:20]

#It's a function using 'list11'.
def cal1():
    global df2
    global df3
    global df4

    start = time.time()

    for i, t in enumerate(list11):    
        df2 = pd.DataFrame(df1[df1['resion'] == t])  #'df1' is original data

        if i%2 == 0:
            print ("cal1 function processing: ", i)
            end = time.time()
            print (end-start)

        else:
            pass

        for n, d in enumerate(date):               
            df3 = pd.DataFrame(df2[df2['date'] == d])
            df3['number'] = df3['price'].rank(pct=True, ascending = False )
            df4 = df4.append(pd.DataFrame(df3))

        return df4

#It's a function using 'list12'.

def cal2():
    global df5
    global df6
    global df7

    start = time.time()

    for i, t in enumerate(list12):    
        df5 = pd.DataFrame(df1[df1['resion'] == t])  #'df1' is original data

        if i%2 == 0:
            print ("cal1 function processing: ", i)
            end = time.time()
            print (end-start)

        else:
            pass

        for n, d in enumerate(date):               
            df6 = pd.DataFrame(df5[df5['date'] == d])
            df6['number'] = df6['price'].rank(pct=True, ascending = False )
            df7 = df7.append(pd.DataFrame(df6))

        return df7

## Multiprocessing code

if __name__ == "__main__":
    # creating processes
    p1 = multiprocessing.Process(target=cal1, args=())
    p2 = multiprocessing.Process(target=cal2, args=())
  
    # starting process 1
    p1.start()
    # starting process 2
    p2.start()
  
    # wait until process 1 is finished
    p1.join()
    
    # wait until process 2 is finished
    p2.join()
  
    # both processes finished
    print("Done!")

Upvotes: 2

Views: 87

Answers (1)

Iguananaut
Iguananaut

Reputation: 23356

It looks like your functions cal1 and cal2 are identical except that they are trying to assign results to some different global variables. This is not going to work, because when you run them in a subprocess, they will assign that global variable in the subprocess, but that will have no impact whatsoever on the main process from which you started them.

If you want to map a function to multiple input ranges across multiple processes you can use a process Pool and Pool.map.

For example:

def cal(input_list):
    start = time.time()

    for i, t in enumerate(input_list):    
        df2 = pd.DataFrame(df1[df1['resion'] == t])  #'df1' is original data

        if i%2 == 0:
            print ("cal1 function processing: ", i)
            end = time.time()
            print (end-start)

        else:
            pass

        for n, d in enumerate(date):               
            df3 = pd.DataFrame(df2[df2['date'] == d])
            df3['number'] = df3['price'].rank(pct=True, ascending = False )
            df4 = df4.append(pd.DataFrame(df3))

        # I kept your original code unmodified but I'm not really sure this
        # is what to do, because you are returning after one pass through the
        # outer loop.  I haven't scrutinized what you are actually trying to
        # do but I suspect this is wrong too.
        return df4

Then create a process pool and you can divide up the input how you want (or, with a bit of tweaking, you can let Pool.map chunk the input for you, and then reduce the outputs from map into a single output):

pool = multiprocessing.Pool(2)
dfs = pool.map(cal, [list1.iloc[0:10], list1.iloc[10:20]])

This is just to get you started. I would probably do a number of other things differently as well.

Upvotes: 1

Related Questions