Reputation: 11
I wrote a script for a socket server that simply listens for incoming connections and processes the incoming data. The chosen architecture is the asyncio.start_server
for the socket management and the asyncio.Queues
for passing the data between the producer and consumer coroutines. The problem is that the consume(q1)
function is executed only once (at the first script startup). Then it is not more executed. Is the line run_until_complete(asyncio.gather())
wrong?
import asyncio
import functools
async def handle_readnwrite(reader, writer, q1): #Producer coroutine
data = await reader.read(1024)
message = data.decode()
await writer.drain()
await q1.put(message[3:20])
await q1.put(None)
writer.close() #Close the client socket
async def consume(q1): #Consumer coroutine
while True:
# wait for an item from the producer
item = await q1.get()
if item is None:
logging.debug('None items') # the producer emits None to indicate that it is done
break
do_something(item)
loop = asyncio.get_event_loop()
q1 = asyncio.Queue(loop=loop)
producer_coro = asyncio.start_server(functools.partial(handle_readnwrite, q1=q1), '0.0.0.0', 3000, loop=loop)
consumer_coro = consume(q1)
loop.run_until_complete(asyncio.gather(consumer_coro,producer_coro))
try:
loop.run_forever()
except KeyboardInterrupt:
pass
loop.close()
Upvotes: 1
Views: 1139
Reputation: 155046
handle_readnwrite
always enqueues the None
terminator, which causes consume
to break (and therefore finish the coroutine). If consume
should continue running and process other messages, the None
terminator must not be sent after each message.
Upvotes: 2