Mark
Mark

Reputation: 722

A generic websocket client in python using websockets library and asyncio

I am trying to hack together a basic generic client interface from the websockets documentation. I'm using this example as a starting point.

The following code sends 10 strings in a loop but either none of them are coming back from the echo server, or they are not reaching the consumer method

I have tried replacing a method in the code below with the following ...

async def consumer_handler(self, websocket, path):
    try:
        logger.debug("Beginning consumer")
        while True:
            try:
                message = await asyncio.wait_for(websocket.recv(), timeout=1.0)
                logger.debug(f"Recieved message {message}")
                await self.consumer(message)
            except asyncio.TimeoutError:
                logger.debug("Consumer timeout")
    finally:
        logger.debug("Ended consumer")

... but that doesnt make any difference. Further, neither the "Consumer Timeout" or "Received Message" logs are printed, so it seems like this loop is not running after the producer_handler is started (consumer_handler does get started as the "Beginning consumer" string is logged). I also note that the "Ended consumer" and "Ended producer" strings are never logged.

import asyncio
import websockets
from queue import Queue
import logging
from functools import partial
from concurrent.futures import ThreadPoolExecutor
import time

logging.basicConfig(format='%(asctime)-15s %(threadName)s %(message)s')
logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)

class Application:
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=8)

    def getExecutor(self):
        return self.executor
    
    def close(self):
        self.executor.shutdown()


class Client:
    
    ###########################################################################
    # Business Logic
        
    def sendMessage(self, message):
        self.messageQueue.put(message)
    
    async def consumer(self, message):
        logger.debug(f"Consumed message {message}")
        
    ###########################################################################
    
    ###########################################################################
    # General
    
    def __init__(self, app):
        
        self.app = app
        
        self.messageQueue = Queue()
        self.ws = None
    
    async def producer(self):
        logger.debug("In producer")
        return self.messageQueue.get()
    
    
    def connect(self, uri):
        self.app.getExecutor().submit(partial(self._connect, uri))
    
    def _connect(self, uri):
        asyncio.run(self.__connect(uri), debug=True)
    
    async def __connect(self, uri):
        logger.debug("Connecting")
        self.ws = await websockets.connect(uri)  
        await self.handler(self.ws, uri)
    
    def close(self):
         self.app.getExecutor().submit(self._close)
         
    def _close(self):
        asyncio.run(self.__close(), debug=True)
    
    async def __close(self):
        logger.debug("Ending client")
        await self.ws.close()

    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    # remaining part of class is from 
    # https://websockets.readthedocs.io/en/stable/intro.html#both

    async def consumer_handler(self, websocket, path):
        try:
            logger.debug("Beginning consumer")
            async for message in websocket:
                logger.debug(f"Recieved message {message}")
                await self.consumer(message)
        finally:
            logger.debug("Ended consumer")

            
    async def producer_handler(self, websocket, path):
        logger.debug("Beginning producer")
        try:
            while True:
                message = await self.producer()
                await websocket.send(message)
                logger.debug(f"Sent: {message}")
        finally:
            logger.debug("Ended producer")
            
    
    async def handler(self, websocket, path):
        consumer_task = asyncio.create_task(
            self.consumer_handler(websocket, path))
        producer_task = asyncio.create_task(
            self.producer_handler(websocket, path))
        done, pending = await asyncio.wait(
            [consumer_task, producer_task],
            return_when=asyncio.FIRST_COMPLETED,
        )
        for task in pending:
            task.cancel()
        
def main():
    
    ADDRESS = "ws://echo.websocket.org/"
    
    logger.debug("Beginning client")
    
    app = Application()
    try:
        client = Client(app)
        try:
            client.connect(ADDRESS)
        
            time.sleep(2)
            
            for i in range(10):
                client.sendMessage(f"Hello {i}")
                time.sleep(0.5)
            
            time.sleep(3)
            
        finally:
            client.close()
            time.sleep(2)
    finally:
        app.close()

    
if __name__ == '__main__':
    main()

Upvotes: 1

Views: 4686

Answers (1)

Mark
Mark

Reputation: 722

The problem was that the producer_handler task was not giving up the event loop for some reason despite the wait on a websocket.send (which I think is the expectation and I suspect that this will be a problem for anyone following the documentation). To overcome this, I added a asyncio.sleep call which does give up the event loop, allowing the consumer_handler to be reentered.

async def producer_handler(self, websocket, path):
    logger.debug("Beginning producer")
    try:
        while True:
            message = await self.producer()
            await websocket.send(message)
            logger.debug(f"Sent: {message}")
            
            # The fix! This gives up the event loop so that consumer_handler can be revisited
            await asyncio.sleep(1)
    finally:
        logger.debug("Ended producer")

The fact that the handlers were not shutting down correctly was also an issue with threading. I changed the __connect and close functions to

async def __connect(self, uri):
    self.loop = asyncio.get_event_loop()
    logger.debug("Connecting")
    self.ws = await websockets.connect(uri)  
    await self.handler(self.ws, uri)

def close(self):
    asyncio.run_coroutine_threadsafe(self.__close(), self.loop)

and now everything is as expected.

The complete, working example....

import asyncio
import websockets
from queue import Queue
import logging
from functools import partial
from concurrent.futures import ThreadPoolExecutor
import time

logging.basicConfig(format='%(asctime)-15s %(threadName)s %(message)s')
logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)

class Application:
    def __init__(self):
        self.executor = ThreadPoolExecutor(max_workers=8)

    def getExecutor(self):
        return self.executor
    
    def close(self):
        self.executor.shutdown()


class Client:
    
    ###########################################################################
    # Business Logic
        
    def sendMessage(self, message):
        self.messageQueue.put(message)
    
    async def consumer(self, message):
        logger.debug(f"Consumed message {message}")
        
    ###########################################################################
    
    ###########################################################################
    # General
    
    def __init__(self, app):
        
        self.app = app
        
        self.messageQueue = Queue()
        self.ws = None
    
    async def producer(self):
        logger.debug("In producer")
        return self.messageQueue.get()
    
    
    def connect(self, uri):
        self.app.getExecutor().submit(partial(self._connect, uri))
    
    def _connect(self, uri):
        asyncio.run(self.__connect(uri), debug=True)
    
    async def __connect(self, uri):
        self.loop = asyncio.get_event_loop()
        logger.debug("Connecting")
        self.ws = await websockets.connect(uri)  
        await self.handler(self.ws, uri)
    
    def close(self):
        asyncio.run_coroutine_threadsafe(self.__close(), self.loop)

    async def __close(self):
        logger.debug("Ending client")
        await self.ws.close()

    #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
    # remaining part of class is from 
    # https://websockets.readthedocs.io/en/stable/intro.html#both

    async def consumer_handler(self, websocket, path):
        try:
            logger.debug("Beginning consumer")
            async for message in websocket:
                logger.debug(f"Recieved message {message}")
                await self.consumer(message)
        finally:
            logger.debug("Ended consumer")

            
    async def producer_handler(self, websocket, path):
        logger.debug("Beginning producer")
        try:
            while True:
                message = await self.producer()
                await websocket.send(message)
                logger.debug(f"Sent: {message}")
                await asyncio.sleep(1)
        finally:
            logger.debug("Ended producer")
            

    async def handler(self, websocket, path):
        '''This simply schedules the sequential execution of the producer_task
        and consumer_task. It does NOT run them in parallel!'''
        producer_task = asyncio.ensure_future(
            self.producer_handler(websocket, path))
        consumer_task = asyncio.ensure_future(
            self.consumer_handler(websocket, path))
        done, pending = await asyncio.wait(
            [consumer_task, producer_task],
            return_when=asyncio.FIRST_COMPLETED,
        )
        for task in pending:
            task.cancel()

def main():
    
    ADDRESS = "ws://echo.websocket.org/"
    
    logger.debug("Beginning client")
    
    app = Application()
    try:
        client = Client(app)
        try:
            client.connect(ADDRESS)
        
            time.sleep(2)
            
            for i in range(10):
                client.sendMessage(f"Hello {i}")
                time.sleep(0.5)

            time.sleep(1)
            
        finally:
            client.close()
            time.sleep(3)
    finally:
        app.close()

    
if __name__ == '__main__':
    main()

Upvotes: 1

Related Questions