Reputation: 115
The dataset has billions of data points for each pair. I tried the multiprocessing loop to make it faster. Why multiprocessing/for loop is skipping some elements from the Pairs? Once I run again, this skips some other names randomly and the code ends.
import pandas as pd
import pickle
import time
import concurrent.futures
start = time.perf_counter()
pairs = ['GBPUSD', 'AUDUSD', 'EURUSD', 'EURJPY', 'GBPJPY', 'USDJPY', 'USDCAD', 'EURGBP']
def pickling_joined(p):
df = pd.read_csv(f'C:\\Users\\Ghosh\\Downloads\\dataset\\data_joined\\{p}.csv')
df['LTP'] = (df['Bid'] + df['Ask']) / 2
print(f'\n=====>> Converting Date format for {p} ....')
df['Date'] = df['Date'].apply(pd.to_datetime)
print(f'\n=====>> Date format converted for {p} ....')
df.set_index('Date', inplace=True)
df = pd.DataFrame(df)
with open(f'C:\\Users\\Ghosh\\Downloads\\dataset\\data_pickled\\{p}.pkl', 'wb') as pickle_file:
pickle.dump(df, pickle_file)
print(f'\n=====>> Pickling done for {p} !!!')
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(pickling_joined, pairs)
finish = time.perf_counter()
print(f'Finished in {finish - start} seconds')
Upvotes: 6
Views: 476
Reputation: 59
Java will be a better choice for such action, python will always skip steps if large DF.
Upvotes: 1
Reputation: 416
Python doesn't handle Thread/Multiprocessing well with heavy files, I would recommend DASK here. DASK uses clustering which works like multiprocessing which takes lesser time, and then you can use multiprocessing, in addition, to run it faster.
def pickling_joined(p):
df = dd.read_csv(f'C:\\Users\\Ghosh\\Downloads\\dataset\\data_joined\\{p}.csv')
df['LTP'] = (df['Bid'] + df['Ask']) / 2
print(f'\n=====>> Converting Date format for {p} ....')
df['Date'] = dd.to_datetime(df.Date)
print(f'\n=====>> Date format converted for {p} ....')
df = df.set_index('Date', sorted=True)
df = df.compute()
with open(f'C:\\Users\\Ghosh\\Downloads\\dataset\\data_pickled\\{p}.pkl', 'wb') as pickle_file:
pickle.dump(df, pickle_file)
print(f'\n=O=O=O=O=O>> Pickling done for {p} !!!')
if __name__ == '__main__':
with concurrent.futures.ProcessPoolExecutor() as executor:
executor.map(pickling_joined, pairs)
finish = time.perf_counter()
print(f'\nFinished in {finish - start} seconds')
Upvotes: 6