Dennis
Dennis

Reputation: 3678

How to detect BinanceSocketManager websocket disconnect in Python?

Binance API & python-binance offers async functionality for non-blocking execution as per discussed in Async basics for Binance.

I am using BinanceSocketManager listening (async non-blocking) to live data via websocket.

In scenarios like network intermittent connection lost, I wish to add an auto-reconnect feature to my project. But I can't seems to find any info with BinanceSocketManager. I was only able to find a guide which uses ThreadedWebsocketManager, but it was not an async implementation.

Does anyone know how to implement a Binance websocket disconnect detection and auto-reconnect mechanism?

Here is some code of what I have so far:

import asyncio
from binance import AsyncClient, BinanceSocketManager


async def main():
    client = await AsyncClient.create()
    await kline_listener(client)

async def kline_listener(client):
    bm = BinanceSocketManager(client)
    async with bm.kline_socket(symbol='BTCUSDT') as stream:
        while True:
            res = await stream.recv()
            print(res)
    # a way detect websocket error/disconnect, callback 'disconnect_callback'

async def disconnect_callback():
    await client.close_connection()
    await main()  # restart client and kline socket

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Upvotes: 7

Views: 2722

Answers (2)

Rayn0r42
Rayn0r42

Reputation: 11

I have tested above code and it proves rather stable. Here are some improvements I have made.

I'm not sure what happens if your internet connection is completely gone when this line is executed:

client = await AsyncClient.create()

This could probably be solved like this (I'm open for better ideas):

        while True:
            try:
                client = await AsyncClient.create()
            except Exception as error_msg:
                print(f"error: {error_msg}")
                # should we add a sleep here?
                # time.sleep(3)
            else:
                print("finally got through the loop")
                break

Surrounding this with a try/except is a good idea:

bm = BinanceSocketManager(client, user_timeout=60)

The call to stream.recv() should be extended with asyncio.wait_for() to cover the situation, when there is no data coming in for a longer period of time. It usually means there's something wrong.

    async with kline_candles as stream:
            for _ in range(5):
                try:
                    res = await asyncio.wait_for(stream.recv(), timeout=60)  # create/await response
                    await process_message(msg=res, client=client)  # process message
                except (asyncio.TimeoutError, websockets.exceptions.ConnectionClosed, asyncio.exceptions.CancelledError, asyncio.exceptions.TimeoutError) as error_msg_1:
                    print(f"Error! in main loop 1:\n{error_msg_1}")
                    await disconnect_callback(client=client)

Upvotes: 1

Wicked Gummy Bear
Wicked Gummy Bear

Reputation: 145

In case someone else is looking at this, for this, you should be looking at the BinanceAPIException. Code could look something like this then:

from binance import AsyncClient, BinanceSocketManager
from binance.exceptions import BinanceAPIException

async def main():

    client = await AsyncClient.create()
    bm = BinanceSocketManager(client, user_timeout=60)

    # start any sockets here, i.e a trade socket
    kline_candles = bm.kline_socket('BNBUSDT', interval=client.KLINE_INTERVAL_1MINUTE)

    # start receiving messages
    try:
        status = await client.get_system_status()
        print(status['msg'])

        async with kline_candles as stream:
            for _ in range(5):
                res = await stream.recv()  # create/await response
                await process_message(msg=res, client=client)  # process message
            
    except BinanceAPIException as e:
        print(e)
        await disconnect_callback(client=client)

async def disconnect_callback(client):
    await client.close_connection()  # close connection
    time.sleep(60)  # wait a minute before restarting
    await main()  # restart client and kline socket

async def process_message(msg, client):
    if msg['e'] == 'error':
        await disconnect_callback(client=client)

        print('ERROR OCCURED')
        
    else:
        candle = msg['k']  # get only the candle info within the general dict

        start_time = datetime.utcfromtimestamp(candle['t']/1000).strftime('%Y-%m-%d %H:%M:%S')
        close_time = datetime.utcfromtimestamp(candle['T']/1000).strftime('%Y-%m-%d %H:%M:%S')

        print(f'__ start: {start_time}, close: {close_time}')
        print(msg)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

The disconnect has not been tested yet, but I assume this will work. If anyone has any additional notes, just let me know.

Upvotes: 5

Related Questions