Reputation: 16521
I have a django service (say 'brisbane'
) that is pushing updates to client browsers when database models are saved using channels and signals thus:
def invalidated_serialized_model_receiver(self, sender, **kwargs):
...
async_to_sync(get_channel_layer().group_send)(name, update)
This works and gives nice real time updates.
'brisbane'
now needs to interact with another service (same code) 'sydney'
so that it can be similarly updated in real time with respect to changes in sydney's
data. This uses a consumer running in another process looking something like this:
async def remote_site_consume(site):
socket_url = site.urn(protocol='ws', resource_name='/ws/watch/')
async with websockets.connect(socket_url) as websocket:
await websocket.send(json.dumps({'type': 'watch_messages'}))
...
async for event in websocket:
event = json.loads(event)
await get_event_handler(event)(site, websocket, event)
The signal may get sent legitimately from within the event handler which is where the problem occurs. When this happens, it throws a RuntimeError
'You cannot use AsyncToSync in the same thread as an async event loop - just await the async function directly.'
I can't just use await though because the signal is sent from threads with no event loop as well.
Upvotes: 1
Views: 542
Reputation: 16521
I'm trying with this replacement for async_to_sync which seems to work in local tests at least:
def invalidated_serialized_model_receiver(self, sender, **kwargs):
...
create_or_use_loop(self.channel_layer.group_send(name, update))
def create_or_use_loop(awaitable):
try:
event_loop = asyncio.get_event_loop()
except RuntimeError:
event_loop = None
if event_loop and event_loop.is_running():
event_loop.call_soon_threadsafe(event_loop.create_task, awaitable)
else:
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(awaitable)
finally:
loop.close()
asyncio.set_event_loop(event_loop)
This feels very clunky though.
Upvotes: 1