Reputation: 11162
I'm using the ws library to create a websocket client to communicate with a (JSON based) websocket server. Every request must have a unique requestId, and the response will have that same requestId
let requestId = 0;
const WebSocket = require('ws');
const ws = new WebSocket('ws://www.host.com/path');
ws.on('message', (json) => {
let obj = JSON.parse(json);
if (obj.eventType === 'SomeEvent') {
ws.send({
requestId,
command: "DoSomeStuff"
});
}
});
I would like to implement a send method in my file that internally calls ws.send()
but returns an observable that resolves when the websocket receives a ws.on('message')
where the requestId property matches the requestId.
That way I can subscribe to the Observable and change my code to
...
if (obj.eventType === 'SomeEvent') {
mySendFunction({
requestId,
command: "DoSomeStuff"
}).subscribe(result => {
// Do something with result here
}
}
...
Also - sometimes (rarely) the server does not respond to the request. In that case, is there a way to timeout the observable in case a response is not received within a given time period?
I've seen implementations that uses an array to map requestId's to promises and then manually resolving them, but I figure that there must be a smarter way to do this directly with RxJS.
Any ideas?
Upvotes: 2
Views: 1515
Reputation: 96891
A very simple thing you can do is just using new Observable()
:
const mySendFunction = options => new Observable(observer => {
const handler = ws.on('message', json => {
let obj = JSON.parse(json);
if (obj.eventType === options.command) {
observer.next(obj);
observer.complete();
}
});
ws.send(options);
return () => {
// ...cancel handler?
};
});
Upvotes: 4