Durtal
Durtal

Reputation: 1028

Feed ProcessPoolExecutor with results from asyncio

I have a bunch of online data that I want to download and process efficiently. Downloading already takes some time but cpu-bound processing takes much longer. I struggle to implement a combination of async and ProcessPoolExecutor.

import asyncio
import time
import aiohttp
from aiohttp import ClientSession
from concurrent.futures import ProcessPoolExecutor


class WebData:

    def __init__(self, url):
        self.url = url
        self.binary = b''

    async def download(self, client):
        time.sleep(0.2)
        try:
            async with client.get(self.url, timeout=5) as resp:
                self.binary = await resp.read()
                print(f'Downloaded {self.url}')
        except (aiohttp.ClientConnectionError,
                asyncio.exceptions.TimeoutError):
            pass
        return

    def process(self):
        print(f'Start processing {self.url}')
        time.sleep(1)
        print(f'Finished processing {self.url}')


async def main():
    list_urls = [f'https://www.google.com/search?q={i}'
                 for i in range(10)]
    list_obj = [WebData(url) for url in list_urls]

    with ProcessPoolExecutor() as executor:
        async with ClientSession() as session:
            tasks = [obj.download(session) for obj in list_obj]
            await asyncio.gather(*tasks)
            list_futures = [
                executor.submit(obj.process)
                for obj in list_obj]
    return list_futures

res = asyncio.run(main())

This works as expected but it fails to accomplish what I am looking for. It first downloads all data and starts processing it only afterwards, which leaves my cores idle during download. Is there any way I can pipe the downloaded objects to the executor while other objects are still downloading?

I found this thread but it isn't what I need.

Upvotes: 2

Views: 460

Answers (1)

Artyom Vancyan
Artyom Vancyan

Reputation: 5390

You should submit the self.process inside after the coroutine ends. For that, you can have a separate asynchronous method that will await the download method and submit the process to ProcessPoolExecutor.

class WebData:

    def __init__(self, url):
        """The code has not been changed"""

    async def download(self, client):
        """The code has not been changed"""

    def process(self):
        """The code has not been changed"""

    async def execute(self, session, pool):
        await self.download(session)
        pool.submit(self.process)


async def main():
    list_urls = [f'https://www.google.com/search?q={i}' for i in range(10)]
    list_obj = [WebData(url) for url in list_urls]

    with ProcessPoolExecutor() as pool:
        async with ClientSession() as session:
            list_futures = await asyncio.gather(*[obj.execute(session, pool) for obj in list_obj])
    return list_futures

Upvotes: 2

Related Questions