Reputation: 722
I am trying to hack together a basic generic client interface from the websockets documentation. I'm using this example as a starting point.
The following code sends 10 strings in a loop but either none of them are coming back from the echo server, or they are not reaching the consumer
method
I have tried replacing a method in the code below with the following ...
async def consumer_handler(self, websocket, path):
try:
logger.debug("Beginning consumer")
while True:
try:
message = await asyncio.wait_for(websocket.recv(), timeout=1.0)
logger.debug(f"Recieved message {message}")
await self.consumer(message)
except asyncio.TimeoutError:
logger.debug("Consumer timeout")
finally:
logger.debug("Ended consumer")
... but that doesnt make any difference. Further, neither the "Consumer Timeout" or "Received Message" logs are printed, so it seems like this loop is not running after the producer_handler is started (consumer_handler does get started as the "Beginning consumer" string is logged). I also note that the "Ended consumer" and "Ended producer" strings are never logged.
import asyncio
import websockets
from queue import Queue
import logging
from functools import partial
from concurrent.futures import ThreadPoolExecutor
import time
logging.basicConfig(format='%(asctime)-15s %(threadName)s %(message)s')
logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
class Application:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=8)
def getExecutor(self):
return self.executor
def close(self):
self.executor.shutdown()
class Client:
###########################################################################
# Business Logic
def sendMessage(self, message):
self.messageQueue.put(message)
async def consumer(self, message):
logger.debug(f"Consumed message {message}")
###########################################################################
###########################################################################
# General
def __init__(self, app):
self.app = app
self.messageQueue = Queue()
self.ws = None
async def producer(self):
logger.debug("In producer")
return self.messageQueue.get()
def connect(self, uri):
self.app.getExecutor().submit(partial(self._connect, uri))
def _connect(self, uri):
asyncio.run(self.__connect(uri), debug=True)
async def __connect(self, uri):
logger.debug("Connecting")
self.ws = await websockets.connect(uri)
await self.handler(self.ws, uri)
def close(self):
self.app.getExecutor().submit(self._close)
def _close(self):
asyncio.run(self.__close(), debug=True)
async def __close(self):
logger.debug("Ending client")
await self.ws.close()
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# remaining part of class is from
# https://websockets.readthedocs.io/en/stable/intro.html#both
async def consumer_handler(self, websocket, path):
try:
logger.debug("Beginning consumer")
async for message in websocket:
logger.debug(f"Recieved message {message}")
await self.consumer(message)
finally:
logger.debug("Ended consumer")
async def producer_handler(self, websocket, path):
logger.debug("Beginning producer")
try:
while True:
message = await self.producer()
await websocket.send(message)
logger.debug(f"Sent: {message}")
finally:
logger.debug("Ended producer")
async def handler(self, websocket, path):
consumer_task = asyncio.create_task(
self.consumer_handler(websocket, path))
producer_task = asyncio.create_task(
self.producer_handler(websocket, path))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
def main():
ADDRESS = "ws://echo.websocket.org/"
logger.debug("Beginning client")
app = Application()
try:
client = Client(app)
try:
client.connect(ADDRESS)
time.sleep(2)
for i in range(10):
client.sendMessage(f"Hello {i}")
time.sleep(0.5)
time.sleep(3)
finally:
client.close()
time.sleep(2)
finally:
app.close()
if __name__ == '__main__':
main()
Upvotes: 1
Views: 4686
Reputation: 722
The problem was that the producer_handler
task was not giving up the event loop for some reason despite the wait on a websocket.send
(which I think is the expectation and I suspect that this will be a problem for anyone following the documentation). To overcome this, I added a asyncio.sleep
call which does give up the event loop, allowing the consumer_handler
to be reentered.
async def producer_handler(self, websocket, path):
logger.debug("Beginning producer")
try:
while True:
message = await self.producer()
await websocket.send(message)
logger.debug(f"Sent: {message}")
# The fix! This gives up the event loop so that consumer_handler can be revisited
await asyncio.sleep(1)
finally:
logger.debug("Ended producer")
The fact that the handlers were not shutting down correctly was also an issue with threading. I changed the __connect
and close
functions to
async def __connect(self, uri):
self.loop = asyncio.get_event_loop()
logger.debug("Connecting")
self.ws = await websockets.connect(uri)
await self.handler(self.ws, uri)
def close(self):
asyncio.run_coroutine_threadsafe(self.__close(), self.loop)
and now everything is as expected.
The complete, working example....
import asyncio
import websockets
from queue import Queue
import logging
from functools import partial
from concurrent.futures import ThreadPoolExecutor
import time
logging.basicConfig(format='%(asctime)-15s %(threadName)s %(message)s')
logger = logging.getLogger("")
logger.setLevel(logging.DEBUG)
class Application:
def __init__(self):
self.executor = ThreadPoolExecutor(max_workers=8)
def getExecutor(self):
return self.executor
def close(self):
self.executor.shutdown()
class Client:
###########################################################################
# Business Logic
def sendMessage(self, message):
self.messageQueue.put(message)
async def consumer(self, message):
logger.debug(f"Consumed message {message}")
###########################################################################
###########################################################################
# General
def __init__(self, app):
self.app = app
self.messageQueue = Queue()
self.ws = None
async def producer(self):
logger.debug("In producer")
return self.messageQueue.get()
def connect(self, uri):
self.app.getExecutor().submit(partial(self._connect, uri))
def _connect(self, uri):
asyncio.run(self.__connect(uri), debug=True)
async def __connect(self, uri):
self.loop = asyncio.get_event_loop()
logger.debug("Connecting")
self.ws = await websockets.connect(uri)
await self.handler(self.ws, uri)
def close(self):
asyncio.run_coroutine_threadsafe(self.__close(), self.loop)
async def __close(self):
logger.debug("Ending client")
await self.ws.close()
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# remaining part of class is from
# https://websockets.readthedocs.io/en/stable/intro.html#both
async def consumer_handler(self, websocket, path):
try:
logger.debug("Beginning consumer")
async for message in websocket:
logger.debug(f"Recieved message {message}")
await self.consumer(message)
finally:
logger.debug("Ended consumer")
async def producer_handler(self, websocket, path):
logger.debug("Beginning producer")
try:
while True:
message = await self.producer()
await websocket.send(message)
logger.debug(f"Sent: {message}")
await asyncio.sleep(1)
finally:
logger.debug("Ended producer")
async def handler(self, websocket, path):
'''This simply schedules the sequential execution of the producer_task
and consumer_task. It does NOT run them in parallel!'''
producer_task = asyncio.ensure_future(
self.producer_handler(websocket, path))
consumer_task = asyncio.ensure_future(
self.consumer_handler(websocket, path))
done, pending = await asyncio.wait(
[consumer_task, producer_task],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
def main():
ADDRESS = "ws://echo.websocket.org/"
logger.debug("Beginning client")
app = Application()
try:
client = Client(app)
try:
client.connect(ADDRESS)
time.sleep(2)
for i in range(10):
client.sendMessage(f"Hello {i}")
time.sleep(0.5)
time.sleep(1)
finally:
client.close()
time.sleep(3)
finally:
app.close()
if __name__ == '__main__':
main()
Upvotes: 1