Reputation: 749
I am working with a streaming API from Kucoin (crypto exchange) which sends order_book
updates, and I am trying to process inbound order_book
websocket messages using a message handler method and then only return once all messages are processed, but I think I am designing the pattern wrong or otherwise misunderstanding async & websockets.
What I want is something like this - below code doesn't work, but for illustrative purposes it's what I'm trying to achieve, so please bear with my lizard brain on this.
async for message in websocket:
message_handled = handler_method(message) # this updates self.order_book
if websocket.index(message) + 1 == len(websocket): # check if this is final message
return self.order_book
What is the correct way to do this so all the message
s are processed by handler_method()
before returning?
EDIT:
Please see example debug logs for the websocket.
client - event = data_received(<1016 bytes>)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741271,"change":"2346.05,buy,120","timestamp":1627660760709},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741272,"change":"2345.1,buy,33","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741273,"change":"2346.1,buy,3360","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741274,"change":"2346.15,sell,0","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741275,"change":"2346.85,sell,0","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741276,"change":"2346.05,buy,540","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client - event = data_received(<849 bytes>)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741277,"change":"2344.8,buy,2145","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741278,"change":"2344.8,buy,1667","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741279,"change":"2346.1,buy,3780","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741280,"change":"2344.8,buy,1189","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741281,"change":"2344.8,buy,711","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client - event = data_received(<1690 bytes>)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741282,"change":"2346.05,buy,120","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741283,"change":"2338.1,buy,407","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741284,"change":"2360.25,sell,9681","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741285,"change":"2337.8,buy,20","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741286,"change":"2338.1,buy,6","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741287,"change":"2334.1,buy,0","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741288,"change":"2346.1,buy,3360","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741289,"change":"2346.2,buy,120","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741290,"change":"2346.1,buy,3780","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741291,"change":"2354.3,sell,0","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)```
Upvotes: 1
Views: 2685
Reputation: 749
Not sure if this will be helpful to anyone else in the future but I ended up solving this by calling websocket.recv()
and then using a while loop to process the ws.messages
queue of messages directly. See below example:
message = json.loads(await ws.recv())
handler_method(message)
while len(ws.messages) > 0:
message = json.loads(ws.messages.popleft())
handler_method(message)
return something
What this does is: recv
receives all the messages from the websocket connection, loads them up in the ws.messages
queue (a collections.deque
object), and calls ws.messages.popleft()
once which yields an individual message to handle. Then we simply call ws.messages.popleft()
inside a while loop until there are no remaining messages, and finally return
after the conclusion of the while loop.
Upvotes: 1
Reputation: 56517
It seems you are designing this thing, so let me give you some hints. First of all, it is typical in the networking to send one message in separate chunks. But in order for that to work correctly you need to do some additional stuff. The most importantly: you need boundaries, a way to communicate to your server the begining and end of this chunked message.
For example you may assume that messages come in strict order: for a given data set X with chunks A_1,...,A_k you design your protocol to first send "new data set with k chunks" message and then each chunk A_i sequentially. That would be similar to HTTP and its "Content-Length:" header which is followed with content of concrete length. A variant of that is that you send "new data set" message, then all A_i sequentially and "end of data set" message. This is how HTTP with "Transfer-Encoding: chunked" header works.
An alternative is to send "new data set of Id T with k chunks" message and then add the id "T" to each chunk. This allows you to process things asynchronously (for example interleave two data sets at the same time). So it is obviously beneficial, but harder to implement correctly.
Once you design your protocol you have to implement your server side. Depending on the approach it will have a different implementation. For example for the sequential scenario this is quite easy. Here's a pseudocode:
while True:
message = await websocket.receive() # <-- assumed to be the initial "new data set" message
# TODO: break if disconnected or invalid message
total_chunks_length = get_chunks_length(message)
total_processed_data = []
while total_chunks_length > 0:
message = await websocket.receive()
processed_data = await handler_method(message)
total_processed_data.append(processed_data)
total_chunks_length -= 1
await websocket.send(total_processed_data)
This becomes more difficult with the second, asynchronous design, because you have to keep track of ids. I won't go into details here, hopefuly you can come up with a solution.
Upvotes: 1