Awaish Kumar
Awaish Kumar

Reputation: 557

How to use asyncio with ProcessPoolExecutor

I am searching for huge number of addresses on web, I want to use both asyncio and ProcessPoolExecutor in my task to quickly search the addresses.

    async def main():
        n_jobs = 3
        addresses = [list of addresses]
        _addresses = list_splitter(data=addresses, n=n_jobs)
        with ProcessPoolExecutor(max_workers=n_jobs) as executor:
             futures_list = []
             for _address in _addresses:
                futures_list +=[asyncio.get_event_loop().run_in_executor(executor, execute_parallel, _address)]

                for f in tqdm(as_completed(futures_list, loop=asyncio.get_event_loop()), total=len(_addresses)):
                results = await f

asyncio.get_event_loop().run_until_complete(main())

expected: I want to execute_parallel function should run in parallel.

error:

    Traceback (most recent call last):
  File "/home/awaish/danamica/scraping/skraafoto/aerial_photos_scraper.py", line 228, in <module>
    asyncio.run(main())
  File "/usr/local/lib/python3.7/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/usr/local/lib/python3.7/asyncio/base_events.py", line 584, in run_until_complete
    return future.result()
  File "/home/awaish/danamica/scraping/skraafoto/aerial_photos_scraper.py", line 224, in main
    results = await f
  File "/usr/local/lib/python3.7/asyncio/tasks.py", line 533, in _wait_for_one
    return f.result()  # May raise f.exception().
TypeError: can't pickle coroutine objects

Upvotes: 4

Views: 6268

Answers (1)

Jason Shaffner
Jason Shaffner

Reputation: 99

I'm not sure I'm answering the correct question, but it appears the intent of your code is to run your execute_parallel function across several processes using Asyncio. As opposed to using ProcessPoolExecutor, why not try something like using a normal multiprocessing Pool and setting up separate Asyncio loops to run in each. You might set up one process per core and let Asyncio work its magic within each process.

async def run_loop(addresses):
    loop = asyncio.get_event_loop()
    loops = [loop.create_task(execute_parallel, address) for address in addresses]
    loop.run_until_complete(asyncio.wait(loops))

def main():
    n_jobs = 3
    addresses = [list of addresses]
    _addresses = list_splitter(data=addresses, n=n_jobs)
    with multiprocessing.Pool(processes=n_jobs) as pool:
        pool.imap_unordered(run_loop, _addresses)

I've used Pool.imap_unordered with great success, but depending on your needs you may prefer Pool.map or some other functionality. You can play around with chunksize or with the number of addresses in each list to achieve optimal results (ie, if you're getting a lot of timeouts you may want to reduce the number of addresses being processed concurrently)

Upvotes: 3

Related Questions