steven ondieki
steven ondieki

Reputation: 1

How to wait asyncio stream read() until current coroutine finished reading

I'm using this example provided in python docs on asyncio streams, to write data and read from a socket.

I am consuming rabbitmq and sending the messages through the socket and waiting for a response. I've setup the reader and writer in __init__() :


    self.reader, self.writer = await asyncio.open_connection(self.host, self.port, loop=self.loop)

and in consuming messages, I just send the message to the socket and read the response, then publish the response to another queue (after some processing):

    async def process_airtime(self, message: aio_pika.IncomingMessage):
        async with message.process():
            logger.info('Send: %r' % message.body)
            self.writer.write(message.body)

            data = await self.reader.read(4096)
            logger.info('Received: %r' % data)

            await self.publish(data) # publishing to some other queue

The problem is when i try to consume like say 10 messages, all other messges raise this error, although the last message successfully gets a response..

RuntimeError: read() called while another coroutine is already waiting for incoming data

This is the response i get(i've truncated some response..):

2022-02-05 10:59:17,123 INFO [__main__:130] request_consumer [*] waiting for messages...
Send: b'\x00\x00\x00"000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
Send: b'\x00\x00\x00"000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
Send: b'\x00\x00\x00"000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'

...


2022-02-05 10:59:17,194 ERROR [asyncio:1707] base_events Task exception was never retrieved
future: <Task finished name='Task-53' coro=<consumer() done, defined at /home/steven/workspace/python/onfon/cheka/venv/lib/python3.8/site-packages/aio_pika/queue.py:25> exception=RuntimeError('read() called while another coroutine is already waiting for incoming data')>
Traceback (most recent call last):
  File "/home/steven/workspace/python/onfon/cheka/venv/lib/python3.8/site-packages/aio_pika/queue.py", line 27, in consumer
    return await create_task(callback, message, loop=loop)
  File "request_consumer.py", line 122, in process_airtime
    data = await self.reader.read(4096)
  File "/usr/lib/python3.8/asyncio/streams.py", line 684, in read
    await self._wait_for_data('read')
  File "/usr/lib/python3.8/asyncio/streams.py", line 503, in _wait_for_data
    raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data
2022-02-05 10:59:17,194 ERROR [asyncio:1707] base_events Task exception was never retrieved
future: <Task finished name='Task-54' coro=<consumer() done, defined at /home/steven/workspace/python/onfon/cheka/venv/lib/python3.8/site-packages/aio_pika/queue.py:25> exception=RuntimeError('read() called while another coroutine is already waiting for incoming data')>
Traceback (most recent call last):
  File "/home/steven/workspace/python/onfon/cheka/venv/lib/python3.8/site-packages/aio_pika/queue.py", line 27, in consumer
    return await create_task(callback, message, loop=loop)
  File "request_consumer.py", line 122, in process_airtime
    data = await self.reader.read(4096)
  File "/usr/lib/python3.8/asyncio/streams.py", line 684, in read
    await self._wait_for_data('read')
  File "/usr/lib/python3.8/asyncio/streams.py", line 503, in _wait_for_data
    raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data
Received: b'\xf2>D\x95\n\xe0\x80 \x00\x00\x00\x00\x00\x00\x00"0000000000000000000000000000000000000000000000'

My question is, what should i, probably, do to make read() be called again only after a current coroutine reading has finished. Will that affect performance or is there some way i can read on, say, different threads?

I will appreciate if someone points me the right direction. I use python3.8 on linux

Upvotes: 0

Views: 3434

Answers (1)

Dunes
Dunes

Reputation: 40763

Simple answer is to only read() from one task.

It sounds like you are using a callback to consume RMQ messages. If so, then aio_pika will consume messages asynchronously (ie. concurrently) if it has a multiple messages. That is, it will create a new task for each callback/message and leave it to its own devices.

Given that you have a read() whilst processing a message, that doesn't really make sense for your read calls. How will you know which read is for which message. You need to find some way to sync your reads to each message. There are a few ways you can do this:

  • Put a lock around calls to read()
  • Creating a separate task that is solely responsible for calling read(), it puts the results onto a queue, from which any task can read. asyncio queues are task safe (unlike read()).
  • Or perhaps most simply, by using the queue as an iterator (and not spawning a new task to handle the message).

I am assuming you current code looks something a bit like:

async def init_rmq_consumer():
    # connect to RMQ and create a queue
    ...
    queue = ...

    # start consuming messages off of the queue
    await queue.consume(process_airtime)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(init_rmq_consumer())
    loop.run_forever()

Using the queue as an iterator might look something like:

async def main():
    # connect to RMQ and create a queue
    ...
    queue = ...

    # start consuming messages off of the queue *serially*
    async with queue.iterator() as queue_iter:
         async for message in queue_iter:
              # we will not fetch a new message from the queue until we are
              # finished with this one.
              await process_airtime(message)

if __name__ == '__main__':
    asyncio.run(main())

Upvotes: 1

Related Questions