michaelForb
michaelForb

Reputation: 57

Kucoin websocket latency

I collect tick data from kucoin's api using python and websocket. The endpoint id ticker:all so it mean's i receive all ticks from all symbol.

async def websocketConnect():
    async def event(msg):
        print(time.time())
        print(msg["data"]["time"])

        

# Subscribe to ALL tickers
topic = "/market/ticker:all"
publicClient = WsToken(url="https://api.kucoin.com")
wsClientTick = await KucoinWsClient.create(None, publicClient, event, private=False)
await wsClientTick.subscribe(topic)

while True:
    await asyncio.sleep(1)


if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(websocketConnect())

When i compare current timestamp with the tick time i can a large fluctuation from 100ms to more than 30 seconds.

Is there anything i can do do make it more stable ? Is it due to the large amount of data i receive ?

EDIT :

So i have added a "task" according to asyncio doc to compute each message from the socket separately. Now, the delta between current milliseconds and tick time is moving up and down but do not increase from 100ms to 30s, it is better but i would like to make it more stable.

async def compute(msg):
    print(int(time.time() * 1000) - int(msg["data"]["time"]), flush=True)

async def websocketConnect():
    async def event(msg):
        task = asyncio.create_task(compute(msg))
        await task



# Subscribe to ALL tickers
topic = "/market/ticker:all"
publicClient = WsToken(url="https://api.kucoin.com")
wsClientTick = await KucoinWsClient.create(None, publicClient, 
event, private=False)
await wsClientTick.subscribe(topic)

while True:
    await asyncio.sleep(1)


if __name__ == "__main__":
    loop = asyncio.new_event_loop()
    loop.run_until_complete(websocketConnect())

EDIT 2 :

So in fact i can have a delay that is most of the time under 100ms, but as soon as i start to do some manipulation with pandas for example i have a progressive increase in delay for processing incoming ticks

Can someone help ?

Upvotes: 1

Views: 1932

Answers (2)

Matthieu
Matthieu

Reputation: 11

I was finally able to make it works using multithreading.

Here is it a brief explanation

import multiprocessing as mp
from kucoin.asyncio import KucoinSocketManager
from kucoin.client import Client
import asyncio

# function to get ticker using asyncio
async def websocket_get_tickers(client, pipe_send):

    async def compute(msg):
        
        # fill your df with the data needed
        
        # send the df through the pipe
        pipe_send.send(my_dataframe)
        
    # callback function that receives messages from the socket
    async def handle_evt(msg):
        task = asyncio.create_task(compute(msg))
        await task

    loop = asyncio.get_event_loop()
    ksm = await KucoinSocketManager.create(loop, client, handle_evt)

    topic = '/market/ticker:all'
    await ksm.subscribe(topic)

# function to process data
def process_data(pipe_recv):
    
    # get dataframe
    while pipe_recv.poll():
        my_dataframe = pipe_recv.recv()

    # do something with the dataframe

def run_asyncio_functions(client, pipe_send):
    loop = asyncio.new_event_loop() 
    kucoin_task = loop.create_task(websocket_get_tickers(client, pipe_send))
    loop.run_forever()

def run_process_function(pipe_recv):
    while True:
        process_data(pipe_recv)

if __name__ == "__main__":
    # create Kucoin client
    client = Client(api_key, api_secret, api_passphrase)
    # define a pipe between your 2 processes to share data
    pipe_recv, pipe_send = mp.Pipe(duplex = False)
    p1 = mp.Process(target=run_process_function, args=(pipe_recv,)).start()
    run_asyncio_functions(client, pipe_send)

Using this approach there is no more latency in the app

Upvotes: 0

Alan Marx
Alan Marx

Reputation: 11

You're using async, so everything is still running on the same thread. Try threading or multi-processing modules to create and update a file that your main code can read.

Upvotes: 1

Related Questions