catcharecap
catcharecap

Reputation: 61

Python Asyncio & Websocket how to avoid global variable?

I have a python socket server using asyncio and websockets. When the websocket is active 100+ devices will connect and hold their connection waiting for commands/messages.

There are two threads the first thread accepts connections and adds their details to a global variable then waits for messages from the device:

async def thread1(websocket, path):
    client_address = await websocket.recv()
    CONNECTIONS[client_address] = websocket
    async for message in websocket:
        ... do something with message

start_server = websockets.serve(thread1, host, port)
asyncio.get_event_loop().run_until_complete(start_server)
asyncio.ensure_future(thread2())
asyncio.get_event_loop().run_forever()

The second thread processes some user data and once it needs to send a command it accesses a global variable to get the websocket info:

thread2()
    ...some data processing 
    soc = CONNECTIONS[ipaddress] 
    await soc.send("some message")

My question: What's the best way to allow another thread to send messages?

I can keep the global variable safe using thread locking and a function made only to process that data, however global variables aren't ideal. I cannot send information between threads since thread1 is stuck waiting to receive messages.

Upvotes: 4

Views: 1122

Answers (1)

alex_noname
alex_noname

Reputation: 32233

The first thing I would like to say is the incorrect use of the term thread. You use asyncio and here the concept is used - coroutine (coroutine is wrapped into a asyncio task). How it differs from threads can be found, for example, here.

The websockets server spawns a new task for each incoming connection (there are the same number of connections and spawned tasks). I don't see anything wrong with the global object, at least in a small script. However, below I gave an example where I placed this in a separate class.

Also, in this case, special synchronization between coroutines is not required, since they are implemented through cooperative multitasking (in fact, all are executed in one thread, transferring control at certain points.)

Here is a simple example in which the server stores a dictionary of incoming connections and starts a task that every 2 seconds, notifies all clients and sends them the current time. The server also prints confirmation from clients to the console.

# ws_server.py
import asyncio
import websockets
import datetime


class Server:
    def __init__(self, host, port):
        self.host = host
        self.port = port
        self.connections = {}
        self.is_active = False
        self.server = None

    async def start(self):
        self.is_active = True
        self.server = await websockets.serve(self.handler, self.host, self.port)
        asyncio.create_task(self.periodic_notifier())

    async def stop(self):
        self.is_active = False
        self.server.close()
        await self.wait_closed()

    async def wait_closed(self):
        await self.server.wait_closed()

    async def handler(self, websocket, path):
        self.connections[websocket.remote_address] = websocket
        try:
            async for message in websocket:
                print(message)
        except ConnectionClosedError as e:
            pass
        del self.connections[websocket.remote_address]
        print(f"Connection {websocket.remote_address} is closed")

    async def periodic_notifier(self):
        while self.is_active:
            await asyncio.gather(
                *[ws.send(f"Hello time {datetime.datetime.now()}") for ws in self.connections.values()],
                return_exceptions=True)
            await asyncio.sleep(2)


async def main():
    server = Server("localhost", 8080)
    await server.start()
    await server.wait_closed()

asyncio.run(main())
# ws_client.py
import asyncio
import websockets


async def client():
    uri = "ws://localhost:8080"
    async with websockets.connect(uri) as websocket:
        async for message in websocket:
            print(message)
            await websocket.send(f"ACK {message}")


asyncio.run(client())

Upvotes: 6

Related Questions