alt-f4
alt-f4

Reputation: 2326

Why is my consumer working separately from my producer in the queue?

My objective is to call an API asynchronously, and write the results (from each call) into a file (1 call -> 1 file). I thought one way to implement this is using a queue. My intention is to have responses pushed into the queue as soon as they are ready by the producers, and then have consumers processing (writing) files as soon as they are available.

Confusion: Looking at the print statements when I run my code, I see that first the producers are done AND THEN the consumers start consuming my output. This does not seem to go with my intention of Consumers working on tasks as soon as they are made available. I have also considered using multiple processes (1 for consumers, 1 for producers), but I am not sure if I am complicating things this way.

I have created an illustration of the current status:

import aiohttp
import asyncio


async def get_data(session, day):
    async with session.post(url=SOME_URL, json=SOME_FORMAT, headers=HEADERS) as response:
        return await response.text()


async def producer(q, day):
    async with aiohttp.ClientSession() as session:
        result = await get_data(session, day)
        await q.put(result)


async def consumer(q):
    while True:
        outcome = await q.get()
        print("Consumed:", outcome) # assuming I write files here
        q.task_done()


async def main():
    queue = asyncio.Queue()
    days = [day for day in range(20)]  # Here I normally use calendar dates instead of range
    producers = [asyncio.create_task(producer(queue, day) for day in days]
    consumer = asyncio.create_task(consumer(queue)
    await asyncio.gather(*producers)
    await queue.join()
    consumer.cancel()

    if __name__ == '__main__':
        asyncio.run(main())

Am I on the right track?

Upvotes: 0

Views: 146

Answers (1)

Roy2012
Roy2012

Reputation: 12503

Your code is generally fine (except for a couple of syntax errors, which I guess are the result of bad copy-paste). All the producers are indeed created before the consumer starts working because they have nothing to wait for. But, if there's real work that the producers need to do you'll see that they complete the work only after the consumer starts working, and then things work file.

Here's an edited version of your code, plus output that demonstrates that things are indeed working.

import aiohttp
import asyncio

async def get_data(session, day):
    print(f"get data, day {day}")
    async with session.get(url="https://www.google.com") as response:
        res = await response.text()
    print(f"got data, day {day}")
    return res[:100]

async def producer(q, day):
    async with aiohttp.ClientSession() as session:
        result = await get_data(session, day)
        await q.put(result)

async def consumer(q):
    print("Consumer stated")
    while True:
        outcome = await q.get()
        print("Consumed:", outcome) # assuming I write files here
        asyncio.sleep(1)
        q.task_done()

async def main():
    queue = asyncio.Queue()
    days = [day for day in range(20)]  # Here I normally use calendar dates instead of range
    producers = [asyncio.create_task(producer(queue, day)) for day in days]
    print("main: producer tasks created")
    consumer_task = asyncio.create_task(consumer(queue))
    print("main: consumer task created")
    await asyncio.gather(*producers)
    print("main: gathered producers")
    await queue.join()
    consumer_task.cancel()

if __name__ == '__main__':
    asyncio.run(main())

output:

main: producer tasks created
main: consumer task created
get data, day 0
get data, day 1
get data, day 2
get data, day 3
...
get data, day 19
Consumer stated
got data, day 1
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
queue_so.py:21: RuntimeWarning: coroutine 'sleep' was never awaited
  asyncio.sleep(1)
RuntimeWarning: Enable tracemalloc to get the object allocation traceback
got data, day 10
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 19
got data, day 11
got data, day 14
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 15
got data, day 17
got data, day 6
got data, day 18
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 7
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 8
got data, day 9
got data, day 2
got data, day 12
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 0
got data, day 5
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 4
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 3
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 13
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
got data, day 16
Consumed: <!doctype html><html itemscope="" itemtype="http://schema.org/WebPage" lang="en"><head><meta content
main: gathered producers

Upvotes: 2

Related Questions