Glueon
Glueon

Reputation: 4017

AIORedis and PUB/SUB aren't asnyc

I used aioredis for writing async service which will listen on a certain channel and run some commands in async manner.

Basically I took a code from examples page to write a small test-app and removed unnecessary parts:

import asyncio
import aioredis

async def reader(ch):
    while (await ch.wait_message()):
        msg = await ch.get_json()
        print('Got Message:', msg)
        i = int(msg['sleep_for'])
        print('Sleep for {}'.format(i))
        await asyncio.sleep(i)
        print('End sleep')


async def main():
    sub = await aioredis.create_redis(('localhost', 6379))
    res = await sub.subscribe('chan:1')
    ch1 = res[0]
    tsk = await reader(ch1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()

Also there is another test-app, which publishes json blobs with a sleep_for field which then used in a subscriber app to emulate some work inside a reader coroutine using a sleep statement.

I expected "sleeps" to run in "parallel" but in fact they appear in a sync manner on the screen, just one after the other.

My guess was that as soon as hit the await ch.get_json(..) (or maybe even await ch.wait_message()) line I should be able to handle next message. On practice it runs like a synchronous code. Where am I wrong? This could be handled using connections pools, but this means that there is something not async and have no idea what exactly.

Upvotes: 4

Views: 4739

Answers (1)

Jashandeep Sohi
Jashandeep Sohi

Reputation: 5153

My guess was that as soon as hit the await ch.get_json(..) (or maybe even await ch.wait_message()) line I should be able to handle next message.

That's not how the async/await syntax works. Every time you hit a await in a coroutine, that coroutine will be "paused", giving control over to the called coroutine. It doesn't automatically process the next message if it's sleeping.

What you should do is use ensure_future to handle each message in a separate coroutine:

import asyncio
import aioredis

async def handle_msg(msg):
    print('Got Message:', msg)
    i = int(msg['sleep_for'])
    print('Sleep for {}'.format(i))
    await asyncio.sleep(i)
    print('End sleep')

async def reader(ch):
    while (await ch.wait_message()):
        msg = await ch.get_json()
        asyncio.ensure_future(handle_msg(msg))

async def main():
    sub = await aioredis.create_redis(('localhost', 6379))
    res = await sub.subscribe('chan:1')
    ch1 = res[0]
    tsk = await reader(ch1)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close() 

Upvotes: 7

Related Questions