elthwi
elthwi

Reputation: 749

Returning only once all websocket messages are processed in Python asyncio

I am working with a streaming API from Kucoin (crypto exchange) which sends order_book updates, and I am trying to process inbound order_book websocket messages using a message handler method and then only return once all messages are processed, but I think I am designing the pattern wrong or otherwise misunderstanding async & websockets.

What I want is something like this - below code doesn't work, but for illustrative purposes it's what I'm trying to achieve, so please bear with my lizard brain on this.

async for message in websocket:
    message_handled = handler_method(message)  # this updates self.order_book
    if websocket.index(message) + 1 == len(websocket): # check if this is final message
        return self.order_book

What is the correct way to do this so all the messages are processed by handler_method() before returning?

EDIT:

Please see example debug logs for the websocket.

client - event = data_received(<1016 bytes>)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741271,"change":"2346.05,buy,120","timestamp":1627660760709},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741272,"change":"2345.1,buy,33","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741273,"change":"2346.1,buy,3360","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741274,"change":"2346.15,sell,0","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741275,"change":"2346.85,sell,0","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741276,"change":"2346.05,buy,540","timestamp":1627660760710},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client - event = data_received(<849 bytes>)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741277,"change":"2344.8,buy,2145","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741278,"change":"2344.8,buy,1667","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741279,"change":"2346.1,buy,3780","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741280,"change":"2344.8,buy,1189","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741281,"change":"2344.8,buy,711","timestamp":1627660760724},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client - event = data_received(<1690 bytes>)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741282,"change":"2346.05,buy,120","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741283,"change":"2338.1,buy,407","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741284,"change":"2360.25,sell,9681","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741285,"change":"2337.8,buy,20","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741286,"change":"2338.1,buy,6","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741287,"change":"2334.1,buy,0","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741288,"change":"2346.1,buy,3360","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741289,"change":"2346.2,buy,120","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741290,"change":"2346.1,buy,3780","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)
client < Frame(fin=True, opcode=1, data=b'{"data":{"sequence":1627402741291,"change":"2354.3,sell,0","timestamp":1627660760755},"subject":"level2","topic":"/contractMarket/level2:ETHUSDTM","type":"message"}', rsv1=False, rsv2=False, rsv3=False)```

Upvotes: 1

Views: 2685

Answers (2)

elthwi
elthwi

Reputation: 749

Not sure if this will be helpful to anyone else in the future but I ended up solving this by calling websocket.recv() and then using a while loop to process the ws.messages queue of messages directly. See below example:

message = json.loads(await ws.recv())
handler_method(message)

while len(ws.messages) > 0:
    message = json.loads(ws.messages.popleft())
    handler_method(message)

return something

What this does is: recv receives all the messages from the websocket connection, loads them up in the ws.messages queue (a collections.deque object), and calls ws.messages.popleft() once which yields an individual message to handle. Then we simply call ws.messages.popleft() inside a while loop until there are no remaining messages, and finally return after the conclusion of the while loop.

Upvotes: 1

freakish
freakish

Reputation: 56517

It seems you are designing this thing, so let me give you some hints. First of all, it is typical in the networking to send one message in separate chunks. But in order for that to work correctly you need to do some additional stuff. The most importantly: you need boundaries, a way to communicate to your server the begining and end of this chunked message.

For example you may assume that messages come in strict order: for a given data set X with chunks A_1,...,A_k you design your protocol to first send "new data set with k chunks" message and then each chunk A_i sequentially. That would be similar to HTTP and its "Content-Length:" header which is followed with content of concrete length. A variant of that is that you send "new data set" message, then all A_i sequentially and "end of data set" message. This is how HTTP with "Transfer-Encoding: chunked" header works.

An alternative is to send "new data set of Id T with k chunks" message and then add the id "T" to each chunk. This allows you to process things asynchronously (for example interleave two data sets at the same time). So it is obviously beneficial, but harder to implement correctly.

Once you design your protocol you have to implement your server side. Depending on the approach it will have a different implementation. For example for the sequential scenario this is quite easy. Here's a pseudocode:

while True:
    message = await websocket.receive()  # <-- assumed to be the initial "new data set" message

    # TODO: break if disconnected or invalid message

    total_chunks_length = get_chunks_length(message)
    total_processed_data = []
    while total_chunks_length > 0:
        message = await websocket.receive()
        processed_data = await handler_method(message)
        total_processed_data.append(processed_data)
        total_chunks_length -= 1

    await websocket.send(total_processed_data)

This becomes more difficult with the second, asynchronous design, because you have to keep track of ids. I won't go into details here, hopefuly you can come up with a solution.

Upvotes: 1

Related Questions