Reputation: 13
I'm struggling to keep the correct order of messages received from an asyncio.Queue
.
In my case, multiple publishers (which put
messages to the queue) send messages to a consumer (which get
s the messages from the queue) through a single asyncio.Queue
. In the sample code below, the assertions in the consumer.process
don't hold, i.e., the order of the received messages is violated:
import asyncio
import random
async def publish(publisher_id, consumer_queue, messages):
assert messages == sorted(messages) # sent messages are ordered
for message in messages:
if random.random() < 0.1: # simulate publisher taking a short break
await asyncio.sleep(random.random())
# publish message to queue
asyncio.create_task(consumer_queue.put((publisher_id, message)))
# signal that this publisher is finished
asyncio.create_task(consumer_queue.put((publisher_id, None)))
class Consumer:
def __init__(self, num_publishers):
self.publishers = {publisher: [] for publisher in range(num_publishers)}
self.queue = asyncio.Queue(maxsize=1)
self.finished = asyncio.Event()
async def consume_loop(self):
while True:
publisher_id, message = await self.queue.get()
# delegate processing of the message
asyncio.create_task(self.process(publisher_id, message))
async def process(self, publisher_id, message):
if message is None: # publisher is finished
received = self.publishers.pop(publisher_id)
# assert that if messages sent by a publisher are ordered,
# then the same messages received by the consumer are also ordered
assert received == sorted(received), f'{publisher_id}: {received}'
if not self.publishers:
# all publishers are finished
self.finished.set()
else:
# assert that the publisher is still publishing
assert publisher_id in self.publishers, publisher_id
self.publishers[publisher_id].append(message)
async def main():
num_publishers = 100
consumer = Consumer(num_publishers)
# consumer begins listening for new messages
asyncio.create_task(consumer.consume_loop())
# create publishers that send messages to the consumer
for publisher_id in range(num_publishers):
messages = list(range(100))
asyncio.create_task(publish(publisher_id, consumer.queue, messages))
# wait for the consumer to receive all messages
await consumer.finished.wait()
if __name__ == '__main__':
asyncio.run(main())
Observations:
consumer.process
is not asynchronous.Questions (important are bold):
asyncio.Queue
code?asyncio.Queue
should also operate on FIFO principle for each individual publisher?asyncio
code due to the number of created tasks? I think not, since unbounded queue works fine.Upvotes: 1
Views: 1258
Reputation: 155226
asyncio.Queue
is first-in first-out, which means items are dequeued in the same order in which they were enqueued. The problem with your code is that it the consumer doesn't await the processing code, but spawns it to run in the background. Also, the producers don't await enqueueing, but run that in the background as well.
The consequence is that no backpressure is being applied because, while the queue is technically bounded, the items are dequeued as soon as they appear, and they are transferred to the (unbounded) queue of tasks internally maintained by the event loop. Also, you lose ordering because create_task()
are not guaranteed to run in a particular order.
To fix this, you should:
asyncio.create_task(consumer_queue.put(...))
to await consumer_queue.put(...)
, andasyncio.create_task(self.process(...))
to await self.process(...)
.If you decide you do need backpressure, make the channel bounded.
Upvotes: 2