geoff111
geoff111

Reputation: 137

AsyncIO and concurrent.futures.ThreadPoolExecutor

I'm building a web scraping API, and most of my scraping is done with AsyncIO coroutines, like this:

async def parse_event(self):
   do scraping

# call the func
asyncio.run(b.parse_event())

This works perfectly fine, but as I'm scraping multiple websites at the same time, I was using concurrent.futures.ThreadPoolExecutor at first to scrape with multiple threads. But since I've implemented the coroutine logic, I cannot now use the asyncio.run method in my thread directly.

Before (without coroutine):

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
     w1_future = executor.submit(self.w1.parse_event)
     w2_future = executor.submit(self.w2.parse_event)
     w3_future = executor.submit(self.w3.parse_event)

After, I would have expected something like below

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
     w1_future = executor.submit(asyncio.run(self.w1.parse_event))
     w2_future = executor.submit(asyncio.run(self.w2.parse_event))
     w3_future = executor.submit(asyncio.run(self.w3.parse_event))

Unfortunately it is not working.

Upvotes: 7

Views: 17600

Answers (1)

MisterMiyagi
MisterMiyagi

Reputation: 50076

Both asyncio and threading are a means to use a single core for concurrent operations. However, this works via different mechanisms: asyncio uses the cooperative concurrency of async/await whereas threading uses the preemptive concurrency of the GIL.
Mixing the two does not speed up execution since both still use the same single core; instead, overhead of both mechanisms will slow down the program and the interaction of the mechanisms complicates writing correct code.

To achieve concurrency between multiple tasks, submit them all to a single asyncio event loop. The equivalent to executor.submit is asyncio.create_task; multiple tasks can be submitted at once using asyncio.gather. Note that both are called inside the loop as opposed to outside the executor.

async def parse_all():
    return await asyncio.gather(
        # all the parsing tasks that should run concurrently
        self.w1.parse_event,
        self.w2.parse_event,
        self.w3.parse_event,
    )

asyncio.run(parse_all())

If you absolutely do want to use separate, threaded event loops for each parse, you must use executor.submit(func, *args) instead of executor.submit(func(args)).

with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
     w1_future = executor.submit(asyncio.run, self.w1.parse_event())
     w2_future = executor.submit(asyncio.run, self.w2.parse_event())
     w3_future = executor.submit(asyncio.run, self.w3.parse_event())

Note that mixing asyncio and threading adds complexity and constraints. You might want to use debug mode to detect some thread and context safety issues. However, thread and context safety constraints/guarantees are often not documented; manually test or inspect the operations for safety if needed.

Upvotes: 17

Related Questions