Reputation: 446
We have a batch processing system which we are looking to modify to use multiple threads. The process takes in a delimited file and performs calculations on it via pandas.
I would like to split up the dataframe into N chunks if the total amount of records exceeds a threshold. Each chunk should then be fed to a thread from a threadpool executor to get the calculations done, then at the end I would wait for the threads to sync and concatenate the resulting DFs into one.
Problem is that I'm not sure how to split a Pandas DF like this. Let's say there's going to be an arbitrary number of threads, 2 (as an example), and i want to start the split if the record number is over 200000
So the idea would be, if I send a file with 200001 records, thread 1 would get 100000, and thread 2 would get 100001. If I send one with 1000000, thread 1 would get 500000 and thread 2 would get 500000.
(If the total records don't exceed this threshold, I'd just execute the process on a single thread)
I have seen related solutions, but none have applied to my case.
Upvotes: 1
Views: 2813
Reputation: 539
Below, I've included example code of how to split. Then, using ThreadPoolExecutor
, it will execute the code with eight threads, in my case (you can use the Thread
library too). The process_pandas
function is just a dummy function; you can use whatever you want:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor as th
threshold = 300
block_size = 100
num_threads = 8
big_list = pd.read_csv('pandas_list.csv',delimiter=';',header=None)
blocks = []
if len(big_list) > threshold:
for i in range((len(big_list)//block_size)):
blocks.append(big_list[block_size*i:block_size*(i+1)])
i=i+1
if i*block_size < len(big_list):
blocks.append(big_list[block_size*i:])
else:
blocks.append(big_list)
def process_pandas(df):
print('Doing calculations...')
indexes = list(df.index.values)
df.loc[indexes[0], 2] = 'changed'
return df
with th(num_threads) as ex:
results = ex.map(process_pandas,blocks)
final_dataframe = pd.concat(results, axis=0)
Upvotes: 1
Reputation: 113930
def do_something(df):
if len(df) > some_threshold:
pivot = len(df)//2
threading.Thread(target=do_something,args=(df[:pivot]).start()
return do_something(df[:pivot])
actually_do_something_with_smallish_df(df)
maybe?
Upvotes: 3