Reputation: 57
I collect tick data from kucoin's api using python and websocket. The endpoint id ticker:all so it mean's i receive all ticks from all symbol.
async def websocketConnect():
async def event(msg):
print(time.time())
print(msg["data"]["time"])
# Subscribe to ALL tickers
topic = "/market/ticker:all"
publicClient = WsToken(url="https://api.kucoin.com")
wsClientTick = await KucoinWsClient.create(None, publicClient, event, private=False)
await wsClientTick.subscribe(topic)
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(websocketConnect())
When i compare current timestamp with the tick time i can a large fluctuation from 100ms to more than 30 seconds.
Is there anything i can do do make it more stable ? Is it due to the large amount of data i receive ?
EDIT :
So i have added a "task" according to asyncio doc to compute each message from the socket separately. Now, the delta between current milliseconds and tick time is moving up and down but do not increase from 100ms to 30s, it is better but i would like to make it more stable.
async def compute(msg):
print(int(time.time() * 1000) - int(msg["data"]["time"]), flush=True)
async def websocketConnect():
async def event(msg):
task = asyncio.create_task(compute(msg))
await task
# Subscribe to ALL tickers
topic = "/market/ticker:all"
publicClient = WsToken(url="https://api.kucoin.com")
wsClientTick = await KucoinWsClient.create(None, publicClient,
event, private=False)
await wsClientTick.subscribe(topic)
while True:
await asyncio.sleep(1)
if __name__ == "__main__":
loop = asyncio.new_event_loop()
loop.run_until_complete(websocketConnect())
EDIT 2 :
So in fact i can have a delay that is most of the time under 100ms, but as soon as i start to do some manipulation with pandas for example i have a progressive increase in delay for processing incoming ticks
Can someone help ?
Upvotes: 1
Views: 1932
Reputation: 11
I was finally able to make it works using multithreading.
Here is it a brief explanation
import multiprocessing as mp
from kucoin.asyncio import KucoinSocketManager
from kucoin.client import Client
import asyncio
# function to get ticker using asyncio
async def websocket_get_tickers(client, pipe_send):
async def compute(msg):
# fill your df with the data needed
# send the df through the pipe
pipe_send.send(my_dataframe)
# callback function that receives messages from the socket
async def handle_evt(msg):
task = asyncio.create_task(compute(msg))
await task
loop = asyncio.get_event_loop()
ksm = await KucoinSocketManager.create(loop, client, handle_evt)
topic = '/market/ticker:all'
await ksm.subscribe(topic)
# function to process data
def process_data(pipe_recv):
# get dataframe
while pipe_recv.poll():
my_dataframe = pipe_recv.recv()
# do something with the dataframe
def run_asyncio_functions(client, pipe_send):
loop = asyncio.new_event_loop()
kucoin_task = loop.create_task(websocket_get_tickers(client, pipe_send))
loop.run_forever()
def run_process_function(pipe_recv):
while True:
process_data(pipe_recv)
if __name__ == "__main__":
# create Kucoin client
client = Client(api_key, api_secret, api_passphrase)
# define a pipe between your 2 processes to share data
pipe_recv, pipe_send = mp.Pipe(duplex = False)
p1 = mp.Process(target=run_process_function, args=(pipe_recv,)).start()
run_asyncio_functions(client, pipe_send)
Using this approach there is no more latency in the app
Upvotes: 0
Reputation: 11
You're using async, so everything is still running on the same thread. Try threading or multi-processing modules to create and update a file that your main code can read.
Upvotes: 1