rogerwhite
rogerwhite

Reputation: 345

Python asyncio runs slower

I am new to Python and parallel execution and asyncio. Am I doing this incorrectly? My code runs slower (or at best equal) to the time it takes the scrip to run in a traditional manner, without asyncio.

import asyncio, os, time, pandas as pd
start_time = time.time()

async def main():
    coroutines = list()
    for root, dirs, files in os.walk('.', topdown=True):
        for file in files:
            coroutines.append(cleaner(file))
        await asyncio.gather(*coroutines)

async def cleaner(file):
 df = pd.read_csv(file, sep='\n', header=None, engine='python', quoting=3)
 df = df[0].str.strip(' \t"').str.split('[,|;: \t]+', 1, expand=True).rename(columns={0: 'email', 1: 'data'}) 
 df[['email', 'data']].to_csv('x1', sep=':', index=False, header=False, mode='a', compression='gzip')


asyncio.run(main())
print("--- %s seconds ---" % (time.time() - start_time))

Upvotes: 0

Views: 1575

Answers (1)

tdelaney
tdelaney

Reputation: 77347

Your workload appears to be read file --> process with pandas --> write file. This is an ideal candidate for multiprocessing because each work item is very much independent. pandas routines that read/write the file system, like any blocking operation, are not good candidates for asyncio unless you run them in asyncio's thread or process pools.

Conversely, these multiple operations are good candidates for true parallel execution which asyncio doesn't give you. (its thread and process pools are good choices too).

import multiprocessing as mp
import os

def walk_all_files(path):
    for root, dirs, files in os.walk('.', topdown=True):
        for file in files:
            yield os.path.join(root, file)

def cleaner(path):
    return "sparkly"

def clean_all(path="."):
    files = list(walk_all_files(path))
    # using cpu*2 assuming that there is a lot of cpu heavy
    # work that can be done by some processes while others
    # wait on I/O. This is only a guess.
    cpu_count = min(len(files), mp.cpu_count()*2)
    with mp.Pool(cpu_count) as pool:
        # assuming processing is fairly long but also kindof random depending on
        # file contents, setting chunksize to 1 so that subprocess gets new work
        # item from parent on each round. You could set it higher to have fewer
        # interactions between parent and worker.
        result = pool.map(cleaner, files, chunksize=1)

if __name__ == "__main__":
    clean_all(".")

Upvotes: 1

Related Questions