p1ncher
p1ncher

Reputation: 102

How to implement single-producer multi-consumer with aioredis pub/sub

I have the web app. That app has endpoint to push some object data to redis channel.
And another endpoint handles websocket connection, where that data is fetched from channel and send to client via ws.

When i connect via ws, messages gets only first connected client.

How to read messages from redis channel with multiple clients and not create a new subscription?

Websocket handler.
Here i subscribe to channel, save it to app (init_tram_channel). Then run job where i listen channel and send messages(run_tram_listening).

@routes.get('/tram-state-ws/{tram_id}')
async def tram_ws(request: web.Request):
    ws = web.WebSocketResponse()
    await ws.prepare(request)
    tram_id = int(request.match_info['tram_id'])
    channel_name = f'tram_{tram_id}'

    await init_tram_channel(channel_name, request.app)
    tram_job = await run_tram_listening(
        request=request,
        ws=ws,
        channel=request.app['tram_producers'][channel_name]
    )

    request.app['websockets'].add(ws)
    try:
        async for msg in ws:
            if msg.type == aiohttp.WSMsgType.TEXT:
                if msg.data == 'close':
                    await ws.close()
                    break
            if msg.type == aiohttp.WSMsgType.ERROR:
                logging.error(f'ws connection was closed with exception {ws.exception()}')
            else:
                await asyncio.sleep(0.005)
    except asyncio.CancelledError:
        pass
    finally:
        await tram_job.close()
        request.app['websockets'].discard(ws)

    return ws

Subscribing and saving channel.
Every channel is related to unique object, and in order not to create many channels that related to the same object, i save only one to app. app['tram_producers'] is dict.

async def init_tram_channel(
        channel_name: str,
        app: web.Application
):
    if channel_name not in app['tram_producers']:
        channel, = await app['redis'].subscribe(channel_name)
        app['tram_producers'][channel_name] = channel

Running coro for channel listening. I run it via aiojobs:

async def run_tram_listening(
        request: web.Request,
        ws: web.WebSocketResponse,
        channel: Channel
):
    """
    :return: aiojobs._job.Job object
    """
    listen_redis_job = await spawn(
        request,
        _read_tram_subscription(
            ws,
            channel
        )
    )
    return listen_redis_job

Coro where i listen and send messages:

async def _read_tram_subscription(
        ws: web.WebSocketResponse,
        channel: Channel
):
    try:
        async for msg in channel.iter():
            tram_data = msg.decode()
            await ws.send_json(tram_data)
    except asyncio.CancelledError:
        pass
    except Exception as e:
        logging.error(msg=e, exc_info=e)

Upvotes: 1

Views: 2242

Answers (2)

p1ncher
p1ncher

Reputation: 102

The following code has been found in some aioredis github issue (I've adopted it to my task).

class TramProducer:
    def __init__(self, channel: aioredis.Channel):
        self._future = None
        self._channel = channel

    def __aiter__(self):
        return self

    def __anext__(self):
        return asyncio.shield(self._get_message())

    async def _get_message(self):
        if self._future:
            return await self._future

        self._future = asyncio.get_event_loop().create_future()
        message = await self._channel.get_json()
        future, self._future = self._future, None
        future.set_result(message)
        return message

So, how it works? TramProducer wraps the way we get messages.
As said @Messa

message is received from one Redis subscription only once.

So only one client of TramProducer is retrieving messages from redis, while other clients are waiting for future result that will be set after receiving message from channel.

If self._future initialized it means that somebody is waiting for message from redis, so we will just wait for self._future result.

TramProducer usage (i've taken an example from my question):

async def _read_tram_subscription(
        ws: web.WebSocketResponse,
        tram_producer: TramProducer
):
    try:
        async for msg in tram_producer:
            await ws.send_json(msg)
    except asyncio.CancelledError:
        pass
    except Exception as e:
        logging.error(msg=e, exc_info=e)

TramProducer initialization:

async def init_tram_channel(
        channel_name: str,
        app: web.Application
):
    if channel_name not in app['tram_producers']:
        channel, = await app['redis'].subscribe(channel_name)
        app['tram_producers'][channel_name] = TramProducer(channel)

I think it maybe helpfull for somebody.
Full project here https://gitlab.com/tram-emulator/tram-server

Upvotes: 1

Messa
Messa

Reputation: 25191

I guess a message is received from one Redis subscription only once, and if there is more than one listeners in your app, then only one of them will get it.

So you need to create something like mini pub/sub inside the application to distribute the messages to all listeners (websocket connections in this case).

Some time ago I've made an aiohttp websocket chat example - not with Redis, but at least the cross-websocket distribution is there: https://github.com/messa/aiohttp-nextjs-demo-chat/blob/master/chat_web/views/api.py

The key is to have an application-wide message_subcriptions, where every websocket connection registers itself, or perhaps its own asyncio.Queue (I've used Event in my example, but that's suboptimal), and whenever message comes from Redis, it is pushed to all relevant queues.

Of course when websocket connection ends (client unsubscribe, disconnect, failure...) the queue should be removed (and possibly Redis subscription cancelled if it was the last connection listening to it).

Asyncio doesn’t mean we should forget about queues :) Also it’s good to get familiar with combining multiple tasks at once (reading from websocket, reading from message queue, perhaps reading from some notification queue...). Using queues can also help you to handle client reconnects more cleanly (without loss of any messages).

Upvotes: 0

Related Questions