bawbaw44
bawbaw44

Reputation: 69

How to process websocket messages sequentially

I am receiving dozens of messages per WebSocket which can arrive with a few milliseconds of difference. I need to process these data with operations which can sometimes take a little time (insertions in DB for example). In order to process a new message received, it is imperative that the previous one has finished being processed.

My first idea was to prepare a queue with node.js Bull ( with Redis ), but I'm afraid it's too long to run. The processing of these messages must remain fast.

I tried to use JS iterators/generators ( something I never used until now ) and I tested something like this :

const ws = new WebSocket(`${this.baseUrl}${this.path}`)
const duplex = WebSocket.createWebSocketStream(ws, { encoding: 'utf8' })
    
const messageGenerator = async function* (duplex) {
    for await (const message of duplex) {
      yield message
    }
  }


for await (let msg of messageGenerator(socketApi.duplex)) {
    console.log('start process')
    await this.messageHandler.handleMessage(msg, user)
    console.log('end process')
}

log :

  1. start process
  2. start process
  3. end process
  4. end process

Unfortunately, as you can see, messages continue to be processed without waiting for the previous one to finish. Do you have a solution to this problem? Should I finally use a queue with Redis to process the messages?

Thanks

Upvotes: 2

Views: 1677

Answers (1)

Python Hunter
Python Hunter

Reputation: 2156

I am not a nodeJS guy but I have thought about the same issue multiple times in other languages. I have concluded that it really matters how slow are the message process operations, because if they are too slow (slower than a certain threshold depending on the msg per second value), this can cause a bottleneck on the websocket connection and when this bottleneck builds up it can cause extreme delays in future messages.

If await and async have identical behaviour as in python, if you process any operation using them, your processing will be asynchronous, which means that it indeed will not wait for the previous one to be processed.

So far I have though of two options:

  1. Keep processing the messages asynchronously, but write some additional logic in the code processing them, which manages the order mess. For example, confirm that the previous message has been already processed before proceeding with the current message. This logic can be complex and slow because it runs in a separate thread and doesn't block the websocket messages.
  2. Process the messages synchronously, one by one, but extremely fast by doing one single operation: storing them in Redis. This is way faster than storing them in database and in most cases will be enough fast not to cause bottlenecks in the WS connection. Then use separate process to get these messages from Redis and process them.

Upvotes: 2

Related Questions