growling_egg
growling_egg

Reputation: 317

Trying to understand how to use multithreaded websockets in Python but seem to be stuck with one thread

I have this basic exchange monitor script. I'm trying to create one thread per symbol, apart from the main thread which is handling other work, and have them listen to public Gemini websocket endpoints. I'm getting the first thread running and printing exchange data to the console, but not the second one. I had expected to see data from both threads being printed at approximately the same time. I've tried using the threading library instead of asyncio and encountered the same situation.

I realize my two public API MarketWebsocket classes could be combined to be cleaner, I'm still trying to work out a way to easily add other symbols to the list. Thanks for any nudges in the right direction!

import asyncio
from websockets import connect

symbols_to_watch = [
    "BTCUSD",
    "ETHUSD"
]

class BTCMarketWebsocket:
    disable = False

    async def __aenter__(self):
        symbol = symbols_to_watch[0]
        self._conn = connect("wss://api.gemini.com/v1/marketdata/{}".format(symbol))
        self.websocket = await self._conn.__aenter__()
        return self

    async def __aexit__(self, *args, **kwargs):
        await self._conn.__aexit__(*args, **kwargs)

    async def receive(self):
        return await self.websocket.recv()

class ETHMarketWebsocket:
    disable = False

    async def __aenter__(self):
        symbol = symbols_to_watch[1]
        self._conn = connect("wss://api.gemini.com/v1/marketdata/{}".format(symbol))
        self.websocket = await self._conn.__aenter__()
        return self

    async def __aexit__(self, *args, **kwargs):
        await self._conn.__aexit__(*args, **kwargs)

    async def receive(self):
        return await self.websocket.recv()

async def btcMarketWebsocket():
    async with BTCMarketWebsocket() as btcMarketWebsocket:
        while not btcMarketWebsocket.disable:
            print(await btcMarketWebsocket.receive())

async def ethMarketWebsocket():
    async with ETHMarketWebsocket() as ethMarketWebsocket:
        while not ethMarketWebsocket.disable:
            print(await ethMarketWebsocket.receive())

if __name__ == '__main__':
    asyncio.run(btcMarketWebsocket())
    asyncio.run(ethMarketWebsocket())

Upvotes: 0

Views: 171

Answers (1)

InfoLearner
InfoLearner

Reputation: 15598

You can do

async def multiple_tasks():
  Tasks =[] 
  Tasks.append(btcMarketWebsocket())
  Tasks.append(ethMarketWebsocket())
  await asyncio.gather(*Tasks, return_exceptions=True)
  

if __name__ == '__main__':
 asyncio.get_event_loop().run_until_complete(multiple_tasks())

Upvotes: 1

Related Questions