Enilonn
Enilonn

Reputation: 13

Why messages received from an asyncio.Queue don't keep the order they were sent in?

I'm struggling to keep the correct order of messages received from an asyncio.Queue.

In my case, multiple publishers (which put messages to the queue) send messages to a consumer (which gets the messages from the queue) through a single asyncio.Queue. In the sample code below, the assertions in the consumer.process don't hold, i.e., the order of the received messages is violated:

import asyncio
import random


async def publish(publisher_id, consumer_queue, messages):
    assert messages == sorted(messages)  # sent messages are ordered
    for message in messages:
        if random.random() < 0.1:  # simulate publisher taking a short break
            await asyncio.sleep(random.random())
        # publish message to queue
        asyncio.create_task(consumer_queue.put((publisher_id, message)))
    # signal that this publisher is finished
    asyncio.create_task(consumer_queue.put((publisher_id, None)))


class Consumer:
    def __init__(self, num_publishers):
        self.publishers = {publisher: [] for publisher in range(num_publishers)}
        self.queue = asyncio.Queue(maxsize=1)
        self.finished = asyncio.Event()

    async def consume_loop(self):
        while True:
            publisher_id, message = await self.queue.get()
            # delegate processing of the message
            asyncio.create_task(self.process(publisher_id, message))

    async def process(self, publisher_id, message):
        if message is None:  # publisher is finished
            received = self.publishers.pop(publisher_id)
            # assert that if messages sent by a publisher are ordered,
            # then the same messages received by the consumer are also ordered
            assert received == sorted(received), f'{publisher_id}: {received}'
            if not self.publishers:
                # all publishers are finished
                self.finished.set()
        else:
            # assert that the publisher is still publishing
            assert publisher_id in self.publishers, publisher_id
            self.publishers[publisher_id].append(message)


async def main():
    num_publishers = 100
    consumer = Consumer(num_publishers)
    # consumer begins listening for new messages
    asyncio.create_task(consumer.consume_loop())
    # create publishers that send messages to the consumer
    for publisher_id in range(num_publishers):
        messages = list(range(100))
        asyncio.create_task(publish(publisher_id, consumer.queue, messages))
    # wait for the consumer to receive all messages
    await consumer.finished.wait()

if __name__ == '__main__':
    asyncio.run(main())

Observations:

Questions (important are bold):

Upvotes: 1

Views: 1258

Answers (1)

user4815162342
user4815162342

Reputation: 155226

asyncio.Queue is first-in first-out, which means items are dequeued in the same order in which they were enqueued. The problem with your code is that it the consumer doesn't await the processing code, but spawns it to run in the background. Also, the producers don't await enqueueing, but run that in the background as well.

The consequence is that no backpressure is being applied because, while the queue is technically bounded, the items are dequeued as soon as they appear, and they are transferred to the (unbounded) queue of tasks internally maintained by the event loop. Also, you lose ordering because create_task() are not guaranteed to run in a particular order.

To fix this, you should:

  • make your queue unbounded,
  • change asyncio.create_task(consumer_queue.put(...)) to await consumer_queue.put(...), and
  • change asyncio.create_task(self.process(...)) to await self.process(...).

If you decide you do need backpressure, make the channel bounded.

Upvotes: 2

Related Questions