Anna Ignashkina
Anna Ignashkina

Reputation: 497

Writing to csv from dataframe using multiprocessing and not messing up the output

import numpy as np
import pandas as pd
from multiprocessing import Pool
import threading

#Load the data
df = pd.read_csv('crsp_short.csv', low_memory=False)

def funk(date):
    ...
    # for each date in df.date.unique() do stuff which gives sample dataframe
    # as an output
    #then write it to file

    sample.to_csv('crsp_full.csv', mode='a')

def evaluation(f_list):
    with futures.ProcessPoolExecutor() as pool:
        return pool.map(funk, f_list)

# list_s is a list of dates I want to calculate function funk for   

evaluation(list_s)

I get a csv file as an output with some of the lines messed up because python is writing some pieces from different threads at the same time. I guess I need to use Queues, but I was not able to modify the code so that it worked. Ideas how to do it?Otherwise it takes ages to get the results.

Upvotes: 3

Views: 2839

Answers (1)

Anna Ignashkina
Anna Ignashkina

Reputation: 497

That solved the problem (Pool does the queue for you)

Python: Writing to a single file with queue while using multiprocessing Pool

My version of the code that didn't mess up the output csv file:

import numpy as np
import pandas as pd
from multiprocessing import Pool
import threading

#Load the data
df = pd.read_csv('crsp_short.csv', low_memory=False)

def funk(date):
    ...
    # for each date in df.date.unique() do stuff which gives sample dataframe
    # as an output

    return sample

# list_s is a list of dates I want to calculate function funk for   

def mp_handler():
# 28 is a number of processes I want to run
    p = multiprocessing.Pool(28)
    for result in p.imap(funk, list_s):
        result.to_csv('crsp_full.csv', mode='a')


if __name__=='__main__':
    mp_handler()

Upvotes: 3

Related Questions