EatSleepCode
EatSleepCode

Reputation: 462

Python Multi-Processing and Combing DFs

I am reading a large data source into pandas and breaking it up into 3 chunks. I want to use multi processing so that I can complete an analysis function simultaneously for each chunk. The output after each function is a dataframe. Then I need to combine those three small dataframes.

#This part creates an empty dataframe with the correct column names
d = {'ID': [''], 'Title': [''],'Organization': [''], 'PI': [''],'PI_Phone': [''], 'PI_Email': [''],
     'Start_Date': [''], 'End_Date': [''],'FY': [''], 'Funding': [''], 'Abstract': [''],
     'URL': [''],'Street': [''], 'City': [''],'State': [''], 'Zip': [''],'Country': ['']}
data = pd.DataFrame(data=d)

def algorithm(df):
    print('Alg Running')
    df['Abstract'] = df['Abstract'].fillna(value='Abstract')
    df['Abstract'] = df['Title'] + ' : ' + df['Abstract']
    wide_net = df[df['Abstract'].str.lower().str.contains('|'.join(tissue+te_abstract+temp_abstract+tx_abstract+armi_abstract+['cell ','tissue','organ ']),na=False)]
    return wide_net

def chunk1():
    print('chunk1')
    therange = 0
    df1 = pd.read_sql(('SELECT * FROM Clean_SBIR LIMIT {},1000;').format(therange), con=conn)
    return algorithm(df1)
def chunk2():
    print('chunk2')
    therange = 1000
    df2 = pd.read_sql(('SELECT * FROM Clean_SBIR LIMIT {},1000;').format(therange), con=conn)
    algorithm(df2)
def chunk3():
    print('chunk3')
    therange = 2000
    df3 = pd.read_sql(('SELECT * FROM Clean_SBIR LIMIT {},1000;').format(therange), con=conn)
    algorithm(df3)

# creating processes
p1 = multiprocessing.Process(target=chunk1())
p2 = multiprocessing.Process(target=chunk2())
p3 = multiprocessing.Process(target=chunk3())

# starting process 1
p1.start()
# starting process 2
p2.start()
# starting process 3
p3.start() 

#This is where I am struggling
results = pd.concat([chunk1(),chunk2(),chunk3()])

# wait until process 1 is finished 
p1.join() 
# wait until process 2 is finished 
p2.join()
# wait until process 3 is finished 
p3.join()


print('done')

My algorithm function returns the correct data and then chunk1 also returns the correct data, but I can't figure out how to combine them because the multiprocessing is getting in the way.

Upvotes: 0

Views: 1737

Answers (1)

Sam Mason
Sam Mason

Reputation: 16214

the above looks somewhat strange, maybe refactor something like:

from multiprocessing import Pool

SQL = 'SELECT * FROM Clean_SBIR LIMIT %s, %s'

def process_data(offset, limit):
    df = pd.read_sql(SQL, conn, params=(offset, limit))
    return algorithm(df)

with Pool(3) as pool:
   jobs = []
   limit = 1000
   for offset in range(0, 3000, limit):
      jobs.append((offset, limit))
   final_df = pd.concat(pool.starmap(process_data, jobs))

basically you were duplicating up code unnecessarily, and not returning results from your chunk processing algorithm.

that said, you probably don't want to be doing anything like this. all data is picked between processes, and is part of the point @Serge was making.

Upvotes: 1

Related Questions