Reputation: 70
I create a websocket connection using python websockets.
I define an infinite background task to listen for server messages and pass them to the handler as they come.
To subscribe to a live quote I first need to send out a channel opening request. I send the request and to keep track of the channel status I create an asyncio.Event.
As soon as a CHANNEL_OPENED message from the server is received, I set the event in my message handler.
Meanwhile my main execution is waiting on this event but it never makes progress. My log chain tells me that websocket connection was successful and the channel is open. If I add breakpoints to the message handler and inspect runtime variables at the CHANNEL_OPENED point I can see the event as set but still am unable to proceed. I wonder what I am doing wrong here?
My main is basically this
import asyncio
from api import ApiClient
async def main():
api = ApiClient(
uid=_uid,
remember_token=_remember_token
)
asyncio.create_task(api.message_handler())
api.request_channel(1) # sends the open channel request and creates the event in api.open_channels (a dictionary of channel numbers to asyncio.Event)
event = api.open_channels.get(channel)
await event.wait() # Stuck here
log.info("Subscribing options quotes for tracked symbols.") # this point never reached
for symbol in symbols: # Symbols is a list of strings
response = api.get(f"/option-chains/{symbol}")
options = [OptionInstrument(**option) for option in response.json()["data"]["items"]]
for option in options:
subscription = asyncio.create_task(api.subscribe(option, channel))
await subscription
if __name__ == "__main__":
asyncio.run(main())
The channel was requested like this:
def request_channel(self, channel: int):
if self.open_channels.get(channel):
log.info(f"Channel {channel} was already requested. Skipping request.")
return
self.open_channels[channel] = asyncio.Event()
self.streamer.send(
dumps(
{
"type": "CHANNEL_REQUEST",
"channel": channel,
"service": "FEED",
"parameters": {"contract": "AUTO"},
}
)
)
log.debug(f"Requested opening of channel {channel}")
In my ApiClient definition I have the message handling loop
async def message_handler(self):
while True:
await self._process_message(self.streamer.recv())
async def _process_message(self, message: str):
message = loads(message)
log.debug(message)
match message["type"]:
case "AUTH_STATE" | "SETUP":
if message.get("state") == "AUTHORIZED":
log.debug("Websocket streamer setup and authorized by the server.")
case "CHANNEL_OPENED":
self.open_channels[message["channel"]].set()
log.debug(f"Channel {message["channel"]} open.")
case "KEEPALIVE":
self.streamer.send(dumps({"type": "KEEPALIVE", "channel": message["channel"]}))
log.debug("Extended keep alive")
case "ERROR":
log.error(f"Streamer had an error. Restarting. Error message:\n\t{message}")
self.streamer.close()
self.open_channels = dict()
self._streamer = None
case _:
log.warning(f"Unexpected message type received:\n\t{message}")
the api streamer object was configured with websockets:
@property
def streamer(self) -> ClientConnection:
if not self._streamer:
self._streamer = self._setup_streamer_websocket()
return self._streamer
def _setup_streamer_websocket(self) -> ClientConnection:
websocket = connect(self.quote_streamer_url)
setup_message_payload = {
"type": "SETUP",
"channel": 0,
"keepaliveTimeout": 60,
"acceptKeepaliveTimeout": 60,
"version": "0.1",
}
websocket.send(message=dumps(setup_message_payload))
auth_message_payload = {
"type": "AUTH",
"channel": 0,
"token": self.quote_streamer_token,
}
websocket.send(dumps(auth_message_payload))
return websocket
I get the following runtime log messages, including all server messages logged by api._process_message:
2024-08-03 11:57:02,405 - [INFO] [api.login:95]: Logged in.
2024-08-03 11:57:03,593 - [DEBUG] [api._get_streamer_credentials:152]: Got credentials.
2024-08-03 11:57:04,586 - [DEBUG] [api.request_channel:246]: Requested opening of channel 1
2024-08-03 11:57:04,715 - [DEBUG] [api._process_message:202]: {'type': 'SETUP', 'channel': 0, 'keepaliveTimeout': 60, 'acceptKeepaliveTimeout': 60, 'version': '1.0-1.0.1-20240426-113644'}
2024-08-03 11:57:04,716 - [DEBUG] [api._process_message:202]: {'type': 'AUTH_STATE', 'channel': 0, 'state': 'UNAUTHORIZED'}
2024-08-03 11:57:04,717 - [DEBUG] [api._process_message:202]: {'type': 'AUTH_STATE', 'channel': 0, 'state': 'AUTHORIZED', 'userId': <redacted>}
2024-08-03 11:57:04,717 - [DEBUG] [api._process_message:206]: Websocket streamer setup and authorized by the server.
2024-08-03 11:57:04,717 - [DEBUG] [api._process_message:202]: {'type': 'CHANNEL_OPENED', 'channel': 1, 'service': 'FEED', 'parameters': {'contract': 'AUTO', 'subFormat': 'LIST'}}
2024-08-03 11:57:04,718 - [DEBUG] [api._process_message:210]: Channel 1 open.
2024-08-03 11:57:34,715 - [DEBUG] [api._process_message:202]: {'type': 'KEEPALIVE', 'channel': 0}
2024-08-03 11:57:34,716 - [DEBUG] [api._process_message:214]: Extended keep alive
2024-08-03 11:58:04,715 - [DEBUG] [api._process_message:202]: {'type': 'KEEPALIVE', 'channel': 0}
2024-08-03 11:58:04,716 - [DEBUG] [api._process_message:214]: Extended keep alive
After this last log I am stuck in the keep alive loop waiting on the Event.
I have no idea what I am doing wrong, have tried with defining new event loops, changing async logic and adding awaits in all server interactions but it didnt change my results for the better.
Miniconda environment information:
python 3.12.4
websockets 12.0
Upvotes: 0
Views: 29
Reputation: 13484
Since your self.streamer.recv()
is blocking, message_handler
never yields control back to asyncio event loop and it cannot resume the main task.
Upvotes: 1