Reputation: 9024
I have a huge csv to parse by chyunk and write to multiple files
I am using pandas read_csv function to get chunks by chunks. It was working fine but slower than the performance we need. So i decided to do this parsing in threads
pool = ThreadPoolExecutor(2)
with ThreadPoolExecutor(max_workers=2) as executor:
futures = executor.map(process, [df for df in pd.read_csv(
downloaded_file, chunksize=chunksize, compression='gzip', low_memory=False, skipinitialspace=True, encoding='utf-8')], file_index)
for future in concurrent.futures.as_costmpleted(futures):
pass
Here is my function that has a responsibility to parse and write to csv
def process(df, file_index):
"""
Process the csv chunk in a separate thread
:param df:
:param file_index:
:param chunk_index:
"""
chunk_index = random.randint(1, 200)
print "start processing chunk"
# some heaving processing...
handle = open(outfile_name)
df.to_csv(outfile_name, index=False,
compression='gzip', sep='\t', quoting=1, encoding='utf-8')
handle.close()
del df
print "end processing chunk"
return True
I never see my print debug lines and the cpu and memory reach to 100% and my script get killed.
It looks like the read_csv
it self is always yielding and the executor.map
is still waiting for the first argument.
Thanks
Upvotes: 0
Views: 5560
Reputation: 1039
Have you considered keeping the second argument to the executor.map
function lazy (generator)?
pool = ThreadPoolExecutor(2)
df_generator = pd.read_csv(downloaded_file,
chunksize=chunksize,
compression='gzip',
low_memory=False,
skipinitialspace=True,
encoding='utf-8')
with ThreadPoolExecutor(max_workers=2) as executor:
futures = executor.map(process, df_generator, file_index)
for future in concurrent.futures.as_completed(futures):
pass
df.read_csv
with a given chunksize will return a generator object and ensure iteration is lazy. This should ideally not cause memory overflow, if your chunksize is chosen well.
Upvotes: 1