Reputation: 1
I'm using this example provided in python docs on asyncio streams, to write data and read from a socket.
I am consuming rabbitmq
and sending the messages through the socket and waiting for a response. I've setup the reader
and writer
in __init__()
:
self.reader, self.writer = await asyncio.open_connection(self.host, self.port, loop=self.loop)
and in consuming messages, I just send the message to the socket and read the response, then publish the response to another queue (after some processing):
async def process_airtime(self, message: aio_pika.IncomingMessage):
async with message.process():
logger.info('Send: %r' % message.body)
self.writer.write(message.body)
data = await self.reader.read(4096)
logger.info('Received: %r' % data)
await self.publish(data) # publishing to some other queue
The problem is when i try to consume like say 10 messages, all other messges raise this error, although the last message successfully gets a response..
RuntimeError: read() called while another coroutine is already waiting for incoming data
This is the response i get(i've truncated some response..):
2022-02-05 10:59:17,123 INFO [__main__:130] request_consumer [*] waiting for messages...
Send: b'\x00\x00\x00"000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
Send: b'\x00\x00\x00"000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
Send: b'\x00\x00\x00"000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000'
...
2022-02-05 10:59:17,194 ERROR [asyncio:1707] base_events Task exception was never retrieved
future: <Task finished name='Task-53' coro=<consumer() done, defined at /home/steven/workspace/python/onfon/cheka/venv/lib/python3.8/site-packages/aio_pika/queue.py:25> exception=RuntimeError('read() called while another coroutine is already waiting for incoming data')>
Traceback (most recent call last):
File "/home/steven/workspace/python/onfon/cheka/venv/lib/python3.8/site-packages/aio_pika/queue.py", line 27, in consumer
return await create_task(callback, message, loop=loop)
File "request_consumer.py", line 122, in process_airtime
data = await self.reader.read(4096)
File "/usr/lib/python3.8/asyncio/streams.py", line 684, in read
await self._wait_for_data('read')
File "/usr/lib/python3.8/asyncio/streams.py", line 503, in _wait_for_data
raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data
2022-02-05 10:59:17,194 ERROR [asyncio:1707] base_events Task exception was never retrieved
future: <Task finished name='Task-54' coro=<consumer() done, defined at /home/steven/workspace/python/onfon/cheka/venv/lib/python3.8/site-packages/aio_pika/queue.py:25> exception=RuntimeError('read() called while another coroutine is already waiting for incoming data')>
Traceback (most recent call last):
File "/home/steven/workspace/python/onfon/cheka/venv/lib/python3.8/site-packages/aio_pika/queue.py", line 27, in consumer
return await create_task(callback, message, loop=loop)
File "request_consumer.py", line 122, in process_airtime
data = await self.reader.read(4096)
File "/usr/lib/python3.8/asyncio/streams.py", line 684, in read
await self._wait_for_data('read')
File "/usr/lib/python3.8/asyncio/streams.py", line 503, in _wait_for_data
raise RuntimeError(
RuntimeError: read() called while another coroutine is already waiting for incoming data
Received: b'\xf2>D\x95\n\xe0\x80 \x00\x00\x00\x00\x00\x00\x00"0000000000000000000000000000000000000000000000'
My question is, what should i, probably, do to make read() be called again only after a current coroutine read
ing has finished. Will that affect performance or is there some way i can read on, say, different threads?
I will appreciate if someone points me the right direction. I use python3.8 on linux
Upvotes: 0
Views: 3434
Reputation: 40763
Simple answer is to only read()
from one task.
It sounds like you are using a callback to consume RMQ messages. If so, then aio_pika will consume messages asynchronously (ie. concurrently) if it has a multiple messages. That is, it will create a new task for each callback/message and leave it to its own devices.
Given that you have a read() whilst processing a message, that doesn't really make sense for your read calls. How will you know which read is for which message. You need to find some way to sync your reads to each message. There are a few ways you can do this:
read()
read()
, it
puts the results onto a queue, from which any task can read. asyncio
queues are task safe (unlike read()
).I am assuming you current code looks something a bit like:
async def init_rmq_consumer():
# connect to RMQ and create a queue
...
queue = ...
# start consuming messages off of the queue
await queue.consume(process_airtime)
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(init_rmq_consumer())
loop.run_forever()
Using the queue as an iterator might look something like:
async def main():
# connect to RMQ and create a queue
...
queue = ...
# start consuming messages off of the queue *serially*
async with queue.iterator() as queue_iter:
async for message in queue_iter:
# we will not fetch a new message from the queue until we are
# finished with this one.
await process_airtime(message)
if __name__ == '__main__':
asyncio.run(main())
Upvotes: 1