everestial
everestial

Reputation: 7255

How to read multiple files into multiple threads/processess to optimize data analyses?

I am trying to read 3 different files in python and do something to extract the data outof it. Then I want to merge the data into one big file.

Since each individual files are already big and take sometime doing the data processing, I am thinking if

Can someone suggest some improvement to this code to do what I want.

import pandas as pd

file01_output = ‘’
file02_output = ‘’
file03_output = ‘’

# I want to do all these three “with open(..)” at once.
with open(‘file01.txt’, ‘r’) as file01:
    for line in file01:
        something01 = do something in line
        file01_output += something01

with open(‘file02.txt’, ‘r’) as file01:
    for line in file01:
        something02 = do something in line
        file02_output += something02

with open(‘file03.txt’, ‘r’) as file01:
    for line in file01:
        something03 = do something in line
        file03_output += something03

def merge(a,b,c):
    a = file01_output
    b = file01_output
    c = file01_output

    # compile the list of dataframes you want to merge
    data_frames = [a, b, c]

    df_merged = reduce(lambda  left,right: pd.merge(left,right,
                       on=['common_column'], how='outer'), data_frames).fillna('.')

Upvotes: 0

Views: 1525

Answers (1)

Paul
Paul

Reputation: 5935

There are many ways to use multiprocessing in your problem so I'll just propose one way. Since, as you mentioned, the processing happening on the data in the file is CPU bound you can run that in a separate process and expect to see some improvement (how much improvement, if any, depends on the problem, algorithm, # cores, etc.). For example, the overall structure could look like just having a pool which you map a list of all the filenames which you need to process and in that function you do your computing.

It's easier with a concrete example. Let's pretend we have a list of CSVs 'file01.csv', 'file02.csv', 'file03.csv' which have a NUMBER column and we want to compute whether that number is prime (CPU bound). Example, file01.csv:

NUMBER
1
2
3
...

And the other files look similar but with different numbers to avoid duplicating work. The code to compute the primes could then look like this:

import pandas as pd
from multiprocessing import Pool
from sympy import isprime

def compute(filename):
    # IO (probably not faster)
    my_data_df = pd.read_csv(filename)

    # do some computing (CPU)
    my_data_df['IS_PRIME'] = my_data_df.NUMBER.map(isprime)

    return my_data_df

if __name__ == '__main__':
    filenames = ['file01.csv', 'file02.csv', 'file03.csv']

    # construct the pool and map to the workers
    with Pool(2) as pool:
        results = pool.map(compute, filenames)
    print(pd.concat(results))

I've used the sympy package for a convenient isprime method and I'm sure the structure of my data is quite different but, hopefully, that example illustrates a structure you could use too. The plan of performing your CPU bound computations in a pool (or list of Processes) and then merge/reduce/concatenating the result is a reasonable approach to the problem.

Upvotes: 1

Related Questions