Reputation: 102
I have the web app. That app has endpoint to push some object data to redis
channel.
And another endpoint handles websocket
connection, where that data is fetched from channel and send to client via ws
.
When i connect via ws, messages gets only first connected client.
How to read messages from redis
channel with multiple clients and not create a new subscription?
Websocket handler.
Here i subscribe to channel, save it to app (init_tram_channel
). Then run job where i listen channel and send messages(run_tram_listening
).
@routes.get('/tram-state-ws/{tram_id}')
async def tram_ws(request: web.Request):
ws = web.WebSocketResponse()
await ws.prepare(request)
tram_id = int(request.match_info['tram_id'])
channel_name = f'tram_{tram_id}'
await init_tram_channel(channel_name, request.app)
tram_job = await run_tram_listening(
request=request,
ws=ws,
channel=request.app['tram_producers'][channel_name]
)
request.app['websockets'].add(ws)
try:
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
if msg.data == 'close':
await ws.close()
break
if msg.type == aiohttp.WSMsgType.ERROR:
logging.error(f'ws connection was closed with exception {ws.exception()}')
else:
await asyncio.sleep(0.005)
except asyncio.CancelledError:
pass
finally:
await tram_job.close()
request.app['websockets'].discard(ws)
return ws
Subscribing and saving channel.
Every channel is related to unique object, and in order not to create many channels that related to the same object, i save only one to app.
app['tram_producers']
is dict.
async def init_tram_channel(
channel_name: str,
app: web.Application
):
if channel_name not in app['tram_producers']:
channel, = await app['redis'].subscribe(channel_name)
app['tram_producers'][channel_name] = channel
Running coro for channel listening. I run it via aiojobs:
async def run_tram_listening(
request: web.Request,
ws: web.WebSocketResponse,
channel: Channel
):
"""
:return: aiojobs._job.Job object
"""
listen_redis_job = await spawn(
request,
_read_tram_subscription(
ws,
channel
)
)
return listen_redis_job
Coro where i listen and send messages:
async def _read_tram_subscription(
ws: web.WebSocketResponse,
channel: Channel
):
try:
async for msg in channel.iter():
tram_data = msg.decode()
await ws.send_json(tram_data)
except asyncio.CancelledError:
pass
except Exception as e:
logging.error(msg=e, exc_info=e)
Upvotes: 1
Views: 2242
Reputation: 102
The following code has been found in some aioredis github issue (I've adopted it to my task).
class TramProducer:
def __init__(self, channel: aioredis.Channel):
self._future = None
self._channel = channel
def __aiter__(self):
return self
def __anext__(self):
return asyncio.shield(self._get_message())
async def _get_message(self):
if self._future:
return await self._future
self._future = asyncio.get_event_loop().create_future()
message = await self._channel.get_json()
future, self._future = self._future, None
future.set_result(message)
return message
So, how it works? TramProducer wraps the way we get messages.
As said @Messa
message is received from one Redis subscription only once.
So only one client of TramProducer is retrieving messages from redis, while other clients are waiting for future result that will be set after receiving message from channel.
If self._future
initialized it means that somebody is waiting for message from redis, so we will just wait for self._future
result.
TramProducer usage (i've taken an example from my question):
async def _read_tram_subscription(
ws: web.WebSocketResponse,
tram_producer: TramProducer
):
try:
async for msg in tram_producer:
await ws.send_json(msg)
except asyncio.CancelledError:
pass
except Exception as e:
logging.error(msg=e, exc_info=e)
TramProducer initialization:
async def init_tram_channel(
channel_name: str,
app: web.Application
):
if channel_name not in app['tram_producers']:
channel, = await app['redis'].subscribe(channel_name)
app['tram_producers'][channel_name] = TramProducer(channel)
I think it maybe helpfull for somebody.
Full project here https://gitlab.com/tram-emulator/tram-server
Upvotes: 1
Reputation: 25191
I guess a message is received from one Redis subscription only once, and if there is more than one listeners in your app, then only one of them will get it.
So you need to create something like mini pub/sub inside the application to distribute the messages to all listeners (websocket connections in this case).
Some time ago I've made an aiohttp websocket chat example - not with Redis, but at least the cross-websocket distribution is there: https://github.com/messa/aiohttp-nextjs-demo-chat/blob/master/chat_web/views/api.py
The key is to have an application-wide message_subcriptions
, where every websocket connection registers itself, or perhaps its own asyncio.Queue (I've used Event in my example, but that's suboptimal), and whenever message comes from Redis, it is pushed to all relevant queues.
Of course when websocket connection ends (client unsubscribe, disconnect, failure...) the queue should be removed (and possibly Redis subscription cancelled if it was the last connection listening to it).
Asyncio doesn’t mean we should forget about queues :) Also it’s good to get familiar with combining multiple tasks at once (reading from websocket, reading from message queue, perhaps reading from some notification queue...). Using queues can also help you to handle client reconnects more cleanly (without loss of any messages).
Upvotes: 0