Raheel
Raheel

Reputation: 9024

Multiprocessing with pandas read csv and threadpool executor

I have a huge csv to parse by chyunk and write to multiple files

I am using pandas read_csv function to get chunks by chunks. It was working fine but slower than the performance we need. So i decided to do this parsing in threads

pool = ThreadPoolExecutor(2)
            with ThreadPoolExecutor(max_workers=2) as executor:
                futures = executor.map(process, [df for df in pd.read_csv(
                    downloaded_file, chunksize=chunksize, compression='gzip', low_memory=False, skipinitialspace=True, encoding='utf-8')], file_index)
                for future in concurrent.futures.as_costmpleted(futures):
                    pass

Here is my function that has a responsibility to parse and write to csv

def process(df, file_index):
    """
    Process the csv chunk in a separate thread
        :param df:
        :param file_index:
        :param chunk_index:
    """
    chunk_index = random.randint(1, 200)
    print "start processing chunk"
    # some heaving processing...
    handle = open(outfile_name)
    df.to_csv(outfile_name, index=False,
                          compression='gzip', sep='\t', quoting=1, encoding='utf-8')
    handle.close()
    del df
    print "end processing chunk"
    return True

I never see my print debug lines and the cpu and memory reach to 100% and my script get killed.

It looks like the read_csv it self is always yielding and the executor.map is still waiting for the first argument.

Thanks

Upvotes: 0

Views: 5560

Answers (1)

havanagrawal
havanagrawal

Reputation: 1039

Have you considered keeping the second argument to the executor.map function lazy (generator)?

pool = ThreadPoolExecutor(2)
df_generator = pd.read_csv(downloaded_file, 
                           chunksize=chunksize,
                           compression='gzip', 
                           low_memory=False, 
                           skipinitialspace=True, 
                           encoding='utf-8')

with ThreadPoolExecutor(max_workers=2) as executor:
    futures = executor.map(process, df_generator, file_index)
    for future in concurrent.futures.as_completed(futures):
        pass

df.read_csv with a given chunksize will return a generator object and ensure iteration is lazy. This should ideally not cause memory overflow, if your chunksize is chosen well.

Upvotes: 1

Related Questions