Amadan
Amadan

Reputation: 198476

Contacting another WebSocket server from inside Django Channels

I have two websocket servers, call them Main and Worker, and this is the desired workflow:

Is this doable? I couldn't find any WS client functionality in Channels. I tried naively to do this (in consumers.py):

import websockets

class SampleConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data):
        async with websockets.connect(url) as worker_ws:
            await worker_ws.send(json.dumps({ 'to': 'Worker' }))
            result = json.loads(await worker_ws.recv())
        await self.send(text_data=json.dumps({ 'to': 'Client' })

However, it seems that the with section blocks (Main doesn't seem to accept any further messages until the response is received from Worker). I suspect it is because websockets runs its own loop, but I don't know for sure. (EDIT: I compared id(asyncio.get_running_loop()) and it seems to be the same loop. I have no clue why it is blocking then.)

The response { "to": "Client" } does not need to be here, I would be okay even if it is in a different method, as long as it triggers when the response from Worker is received.

Is there a way to do this, or am I barking up the wrong tree?

If there is no way to do this, I was thinking of having a thread (or process? or a separate application?) that communicates with Worker, and uses channel_layer to talk to Main. Would this be viable? I would be grateful if I could get a confirmation (and even more so for a code sample).

EDIT I think I see what is going on (though still investigating), but — I believe one connection from Client instantiates one consumer, and while different instances can all run at the same time, within one consumer instance it seems the instance doesn't allow a second method to start until one method has finished. Is this correct? Looking now if moving the request-and-wait-for-response code into a thread would work.

Upvotes: 8

Views: 7857

Answers (3)

Scorpionk
Scorpionk

Reputation: 478

I was in the same position where I wanted to process the message in my Django app whenever I receive it from another WebSocket server.

I took the idea of using the WebSockets client library and keeping it running as a separate process using the manage.py command from this post on the Django forum.

You can define an async coroutine client(websocket_url) to listen to messages received from the WebSocket server.

import asyncio
import websockets


async def client(websocket_url):
    async for websocket in websockets.connect(uri):
        print("Connected to Websocket server")
        try:
            async for message in websocket:
            # Process message received on the connection.
                print(message)
        except websockets.ConnectionClosed:
            print("Connection lost! Retrying..")
            continue #continue will retry websocket connection by exponential back off 

In the above code connect() acts as an infinite asynchronous iterator. More on that here.

You can run the above coroutine inside handle() method of the custom management command class.

runwsclient.py

from django.core.management.base import BaseCommand

class Command(BaseCommand):

    def handle(self, *args, **options):
        URL = "ws://example.com/messages"
        print(f"Connecting to websocket server {URL}")
        asyncio.run(client(URL))

Finally, run the manage.py command.

python manage.py runwsclient

You can also pass handler to client(ws_url, msg_handler) which will process the message so that processing logic will remain outside of the client.

Update 31/05/2022:

I have created a django package to integrate the above functionality with the minimal setup: django-websocketclient

Upvotes: 6

Amadan
Amadan

Reputation: 198476

It seems I managed to do it using the latest idea I posted — launching a thread to handle the connection to Worker. Something like this:

class SampleConsumer(AsyncWebsocketConsumer):
    async def receive(self, text_data):
        threading.Thread(
            target=asyncio.run,
            args=(self.talk_to_worker(
                url,
                { 'to': 'Worker' },
            ),)
        ).start()

    async def talk_to_worker(self, url, message):
        async with websockets.connect(url) as worker_ws:
            await worker_ws.send(json.dumps(message))
            result = json.loads(await worker_ws.recv())
        await self.send(text_data=json.dumps({ 'to': 'Client' })

It may actually be smarter to do it with HTTP requests in each direction (since both endpoints can be HTTP servers), but this seems to work.

Upvotes: 0

Ken4scholars
Ken4scholars

Reputation: 6296

Yes, Django Channels does not provide a websocket client as it is used as a server mainly. From your code, it doesn't seem like you really need a websocket communication between the Main and Worker, as you just fire up a socket, send a single message, receive the response and close the socket. This is the classical use case for regular HTTP, so if you do not really need to keep the connection alive, I suggest you use a regular HTTP endpoint instead and use aioHTTP as a client.

However, if you do really need a client, then you should open the socket once on client connection and close it when the client disconnects. You can do something like this.

import websockets

async def create_ws(on_create, on_message):
    uri = "wss://localhost:8765"
    async with websockets.connect(uri) as websocket:
        await on_create(websocket)
        while True:
            message = await websocket.recv()
            if message:
                await on_message(message)



class WebsocketClient:
    asyn def __init__(self, channel):
        self.channel = channel
        self.ws = None
        await creat_ws(self.on_message)
    
    async def on_create(self, was):
        self.ws = ws

    async def on_message(self, ws, message):
        await self.channel.send(text_data=json.dumps(message)
    
    async def send(self, message):
        self.ws.send(message)

    asunc def close(self):
        self.ws.close()

Then in your consumer, you can use the client as follows:

class SampleConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.ws_client = WebsocketClient(self)
    
    async def receive(self, text_data):
        await self.ws_client.send(text_data)
    
    async def disconnect(self, code):
        await self.ws_client.close()

Upvotes: 1

Related Questions