Achyuta nanda sahoo
Achyuta nanda sahoo

Reputation: 455

How to use pass by reference for data frame in python pandas

  1. Manager Code..

    import pandas as pd    
    import multiprocessing    
    import time   
    import MyDF
    import WORKER
    class Manager():   
            'Common base class for all Manager'   
            def __init__(self,Name):   
                print('Hello Manager..')   
                self.MDF=MyDF.MYDF(Name);   
                self.Arg=self.MDF.display();   
                self.WK=WORKER.Worker(self.Arg);    MGR=Manager('event_wise_count')     if __name__ == '__main__':    
            jobs = []   
            x=5;   
            for i in range(5):   
                x=10*i   
                print('Manager : ',i)   
                p = multiprocessing.Process(target=MGR.WK.DISPLAY)   
                jobs.append(p)    
                p.start()    
                time.sleep(x);    
    
  2. worker code...

    import pandas as pd      
    import time     
    class Worker():    
       'Common base class for all Workers'    
        empCount = 0    
        def __init__(self,DF):    
            self.DF=DF;    
            print('Hello worker..',self.DF.count())    
        def DISPLAY(self):      
            self.DF=self.DF.head(10);         
            return self.DF
    

    Hi I am trying to do multiprocessing. and i want to share a Data Frame address with all sub-processes.

    So in above from Manager Class I am spawning 5 process , where each sub-process required to use Data Frame of worker class , expecting that each sub process will share reference of worker Data Frame. But unfortunately It is not happening..

    Any Answer welcome..

    Thanks In Advance,,.. please :)..

Upvotes: 1

Views: 5072

Answers (2)

ChronosZeJanitor
ChronosZeJanitor

Reputation: 11

Sorry about the necromancy.

The issue is that the workers must have unique DataFrame instances. Almost all attempts to slice, or chunk, a Pandas DataFrame will result in aliases to the original DataFrame. These aliases will still result in resource contention between workers.

There a two things that should improve performance. The first would be to make sure that you are working with Pandas. Iterating row by row, with iloc or iterrows, fights against the design of DataFrames. Using a new-style class object and the apply a method is one option.

def get_example_df():
    return pd.DataFrame(pd.np.random.randint(10, 100, size=(5,5)))

class Math(object):
    def __init__(self):
        self.summation = 0

    def operation(self, row):
        row_result = 0
        for elem in row:
            if elem % 2:
                row_result += elem
            else:
                row_result += 1
        self.summation += row_result
        if row_result % 2:
            return row_result
        else:
            return 1

    def get_summation(self):
        return self.summation

Custom = Math()
df = get_example_df()
df['new_col'] = df.apply(Custom.operation)
print Custom.get_summation()

The second option would be to read in, or generate, each DataFrame for each worker. Then recombine if desired.

workers = 5
df_list = [ get_example_df() ]*workers
...
# worker code
...
aggregated = pd.concat(df_list, axis=0)

However, multiprocessing will not be necessary in most cases. I've processed more than 6 million rows of data without multiprocessing in a reasonable amount of time (on a laptop).

Note: I did not time the above code and there is probably room for improvement.

Upvotes: 1

S Anand
S Anand

Reputation: 11948

This answer suggests using Namespaces to share large objects between processes by reference.

Here's an example of an application where 4 different processes can read from the same DataFrame. (Note: you can't run this on an interactive console -- save this as a program.py and run it.)

import pandas as pd
from multiprocessing import Manager, Pool


def get_slice(namespace, column, rows):
    '''Return the first `rows` rows from column `column in namespace.data'''
    return namespace.data[column].head(rows)

if __name__ == '__main__':
    # Create a namespace to place our DataFrame in it
    manager = Manager()
    namespace = manager.Namespace()
    namespace.data = pd.DataFrame(pd.np.random.rand(1000, 10))

    # Create 4 processes
    pool = Pool(processes=2)
    for column in namespace.data.columns:
        # Each pool can access the same DataFrame object
        result = pool.apply_async(get_slice, [namespace, column, 5])
        print result._job, column, result.get().tolist()

While reading from the DataFrame is perfectly fine, it gets a little tricky if you want to write back to it. It's better to just stick to immutable objects unless you really need large write-able objects.

Upvotes: 2

Related Questions