Reputation: 4017
I used aioredis for writing async service which will listen on a certain channel and run some commands in async manner.
Basically I took a code from examples page to write a small test-app and removed unnecessary parts:
import asyncio
import aioredis
async def reader(ch):
while (await ch.wait_message()):
msg = await ch.get_json()
print('Got Message:', msg)
i = int(msg['sleep_for'])
print('Sleep for {}'.format(i))
await asyncio.sleep(i)
print('End sleep')
async def main():
sub = await aioredis.create_redis(('localhost', 6379))
res = await sub.subscribe('chan:1')
ch1 = res[0]
tsk = await reader(ch1)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Also there is another test-app, which publishes json blobs with a sleep_for
field which then used in a subscriber app to emulate some work inside a reader
coroutine using a sleep
statement.
I expected "sleeps" to run in "parallel" but in fact they appear in a sync manner on the screen, just one after the other.
My guess was that as soon as hit the await ch.get_json(..)
(or maybe even await ch.wait_message()
) line I should be able to handle next message. On practice it runs like a synchronous code. Where am I wrong? This could be handled using connections pools, but this means that there is something not async and have no idea what exactly.
Upvotes: 4
Views: 4739
Reputation: 5153
My guess was that as soon as hit the await ch.get_json(..) (or maybe even await ch.wait_message()) line I should be able to handle next message.
That's not how the async/await
syntax works. Every time you hit a await
in a coroutine, that coroutine will be "paused", giving control over to the called coroutine. It doesn't automatically process the next message if it's sleeping.
What you should do is use ensure_future
to handle each message in a separate coroutine:
import asyncio
import aioredis
async def handle_msg(msg):
print('Got Message:', msg)
i = int(msg['sleep_for'])
print('Sleep for {}'.format(i))
await asyncio.sleep(i)
print('End sleep')
async def reader(ch):
while (await ch.wait_message()):
msg = await ch.get_json()
asyncio.ensure_future(handle_msg(msg))
async def main():
sub = await aioredis.create_redis(('localhost', 6379))
res = await sub.subscribe('chan:1')
ch1 = res[0]
tsk = await reader(ch1)
loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
Upvotes: 7