Reputation: 345
I am new to Python and parallel execution and asyncio. Am I doing this incorrectly? My code runs slower (or at best equal) to the time it takes the scrip to run in a traditional manner, without asyncio.
import asyncio, os, time, pandas as pd
start_time = time.time()
async def main():
coroutines = list()
for root, dirs, files in os.walk('.', topdown=True):
for file in files:
coroutines.append(cleaner(file))
await asyncio.gather(*coroutines)
async def cleaner(file):
df = pd.read_csv(file, sep='\n', header=None, engine='python', quoting=3)
df = df[0].str.strip(' \t"').str.split('[,|;: \t]+', 1, expand=True).rename(columns={0: 'email', 1: 'data'})
df[['email', 'data']].to_csv('x1', sep=':', index=False, header=False, mode='a', compression='gzip')
asyncio.run(main())
print("--- %s seconds ---" % (time.time() - start_time))
Upvotes: 0
Views: 1575
Reputation: 77347
Your workload appears to be read file --> process with pandas --> write file. This is an ideal candidate for multiprocessing because each work item is very much independent. pandas
routines that read/write the file system, like any blocking operation, are not good candidates for asyncio unless you run them in asyncio's thread or process pools.
Conversely, these multiple operations are good candidates for true parallel execution which asyncio doesn't give you. (its thread and process pools are good choices too).
import multiprocessing as mp
import os
def walk_all_files(path):
for root, dirs, files in os.walk('.', topdown=True):
for file in files:
yield os.path.join(root, file)
def cleaner(path):
return "sparkly"
def clean_all(path="."):
files = list(walk_all_files(path))
# using cpu*2 assuming that there is a lot of cpu heavy
# work that can be done by some processes while others
# wait on I/O. This is only a guess.
cpu_count = min(len(files), mp.cpu_count()*2)
with mp.Pool(cpu_count) as pool:
# assuming processing is fairly long but also kindof random depending on
# file contents, setting chunksize to 1 so that subprocess gets new work
# item from parent on each round. You could set it higher to have fewer
# interactions between parent and worker.
result = pool.map(cleaner, files, chunksize=1)
if __name__ == "__main__":
clean_all(".")
Upvotes: 1