Daniel
Daniel

Reputation: 11162

Converting websocket events to observables

I'm using the ws library to create a websocket client to communicate with a (JSON based) websocket server. Every request must have a unique requestId, and the response will have that same requestId

Simplified example of my code

let requestId = 0;

const WebSocket = require('ws');
const ws = new WebSocket('ws://www.host.com/path');

ws.on('message', (json) => {
  let obj = JSON.parse(json);

  if (obj.eventType === 'SomeEvent') {
     ws.send({
       requestId,
       command: "DoSomeStuff"
     });
  }
});

I would like to implement a send method in my file that internally calls ws.send() but returns an observable that resolves when the websocket receives a ws.on('message') where the requestId property matches the requestId.

That way I can subscribe to the Observable and change my code to

...
  if (obj.eventType === 'SomeEvent') {
     mySendFunction({
       requestId,
       command: "DoSomeStuff"
     }).subscribe(result => {
        // Do something with result here
     }
  }
...

Also - sometimes (rarely) the server does not respond to the request. In that case, is there a way to timeout the observable in case a response is not received within a given time period?


I've seen implementations that uses an array to map requestId's to promises and then manually resolving them, but I figure that there must be a smarter way to do this directly with RxJS.

Any ideas?

Upvotes: 2

Views: 1515

Answers (1)

martin
martin

Reputation: 96891

A very simple thing you can do is just using new Observable():

const mySendFunction = options => new Observable(observer => {
  const handler = ws.on('message', json => {
    let obj = JSON.parse(json);

    if (obj.eventType === options.command) {
      observer.next(obj);
      observer.complete();
    }
  });

  ws.send(options);

  return () => {
    // ...cancel handler?
  };
});

Upvotes: 4

Related Questions