darkmame
darkmame

Reputation: 11

Stream producer and consumer with asyncio gather python

I wrote a script for a socket server that simply listens for incoming connections and processes the incoming data. The chosen architecture is the asyncio.start_server for the socket management and the asyncio.Queues for passing the data between the producer and consumer coroutines. The problem is that the consume(q1) function is executed only once (at the first script startup). Then it is not more executed. Is the line run_until_complete(asyncio.gather()) wrong?

import asyncio
import functools

async def handle_readnwrite(reader, writer, q1): #Producer coroutine
    data = await reader.read(1024)
    message = data.decode()
    await writer.drain()
    await q1.put(message[3:20])
    await q1.put(None)
    writer.close() #Close the client socket

async def consume(q1): #Consumer coroutine
    while True:
        # wait for an item from the producer
        item = await q1.get()
        if item is None:
            logging.debug('None items') # the producer emits None to indicate that it is done
            break
        do_something(item)

loop = asyncio.get_event_loop()
q1 = asyncio.Queue(loop=loop)
producer_coro = asyncio.start_server(functools.partial(handle_readnwrite, q1=q1), '0.0.0.0', 3000, loop=loop)
consumer_coro = consume(q1)
loop.run_until_complete(asyncio.gather(consumer_coro,producer_coro))

try:
    loop.run_forever()
except KeyboardInterrupt:
    pass

loop.close()

Upvotes: 1

Views: 1139

Answers (1)

user4815162342
user4815162342

Reputation: 155046

handle_readnwrite always enqueues the None terminator, which causes consume to break (and therefore finish the coroutine). If consume should continue running and process other messages, the None terminator must not be sent after each message.

Upvotes: 2

Related Questions