codenivorous
codenivorous

Reputation: 115

Multiprocessing/for loop is skipping elements randomly

The dataset has billions of data points for each pair. I tried the multiprocessing loop to make it faster. Why multiprocessing/for loop is skipping some elements from the Pairs? Once I run again, this skips some other names randomly and the code ends.


import pandas as pd
import pickle
import time
import concurrent.futures

start = time.perf_counter()
pairs = ['GBPUSD', 'AUDUSD', 'EURUSD', 'EURJPY', 'GBPJPY', 'USDJPY', 'USDCAD', 'EURGBP']


def pickling_joined(p):
    df = pd.read_csv(f'C:\\Users\\Ghosh\\Downloads\\dataset\\data_joined\\{p}.csv')
    df['LTP'] = (df['Bid'] + df['Ask']) / 2
    print(f'\n=====>> Converting Date format for {p} ....')
    df['Date'] = df['Date'].apply(pd.to_datetime)
    print(f'\n=====>> Date format converted for {p} ....')
    df.set_index('Date', inplace=True)
    df = pd.DataFrame(df)
    with open(f'C:\\Users\\Ghosh\\Downloads\\dataset\\data_pickled\\{p}.pkl', 'wb') as pickle_file:
        pickle.dump(df, pickle_file)
    print(f'\n=====>> Pickling done for {p} !!!')


if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor.map(pickling_joined, pairs)
    finish = time.perf_counter()
    print(f'Finished in {finish - start} seconds')

Upvotes: 6

Views: 476

Answers (2)

eternal
eternal

Reputation: 59

Java will be a better choice for such action, python will always skip steps if large DF.

Upvotes: 1

Sudipto Ghosh
Sudipto Ghosh

Reputation: 416

Python doesn't handle Thread/Multiprocessing well with heavy files, I would recommend DASK here. DASK uses clustering which works like multiprocessing which takes lesser time, and then you can use multiprocessing, in addition, to run it faster.

def pickling_joined(p):
    df = dd.read_csv(f'C:\\Users\\Ghosh\\Downloads\\dataset\\data_joined\\{p}.csv')
    df['LTP'] = (df['Bid'] + df['Ask']) / 2
    print(f'\n=====>> Converting Date format for {p} ....')
    df['Date'] = dd.to_datetime(df.Date)
    print(f'\n=====>> Date format converted for {p} ....')
    df = df.set_index('Date', sorted=True)
    df = df.compute()
    with open(f'C:\\Users\\Ghosh\\Downloads\\dataset\\data_pickled\\{p}.pkl', 'wb') as pickle_file:
        pickle.dump(df, pickle_file)
    print(f'\n=O=O=O=O=O>> Pickling done for {p} !!!')


if __name__ == '__main__':
    with concurrent.futures.ProcessPoolExecutor() as executor:
        executor.map(pickling_joined, pairs)
    finish = time.perf_counter()
    print(f'\nFinished in {finish - start} seconds')

Upvotes: 6

Related Questions