Reputation: 234
I'm trying to write a websocket rxjs based wrapper.
And I'm struggling with my rxjs understanding.
I have a pause stream which is supposed to pause the pausable buffered streams when an error occures and resume them once i get a "ok" form the websocket.
Somehow only the first subscription on my pauseable buffered streams are fired. From then on only the queue stacks up higher.
I have prepared a jsbin to reproduce the issue.
https://jsbin.com/mafakar/edit?js,console
There the "msg recived" stream only fires for the first subscription. And then the q and observer begin stacking up.
I somehow have the feeling this is about hot and cold obserables but I cannot grasp the issues. I would appreciate any help.
Thank you in advance!
Upvotes: 0
Views: 139
Reputation: 18663
As @Meir pointed out dispose
in a subscribe block is a no no since its behavior is non-deterministic. In general I would avoid the use of Subjects
and rely on factory methods instead. You can see a refactored version here: https://jsbin.com/popixoqafe/1/edit?js,console
A quick breakdown of the changes:
class WebSocketWrapper {
// Inject the pauser from the external state
constructor(pauser) {
// Input you need to keep as a subject
this.input$ = new Rx.Subject();
// Create the socket
this._socket = this._connect();
// Create a stream for the open event
this.open$ = Rx.Observable.fromEvent(this._socket, 'open');
// This concats the external pauser with the
// open event. The result is a pauser that won't unpause until
// the socket is open.
this.pauser$ = Rx.Observable.concat(
this.open$.take(1).map(true)
pauser || Rx.Observable.empty()
)
.startWith(false);
// subscribe and buffer the input
this.input$
.pausableBuffered(this.pauser$)
.subscribe(msg => this._socket.send(msg));
// Create a stream around the message event
this.message$ = Rx.Observable.fromEvent(this._socket, 'message')
// Buffer the messages
.pausableBuffered(this.pauser$)
// Create a shared version of the stream and always replay the last
// value to new subscribers
.shareReplay(1);
}
send(request) {
// Push to input
this.input$.onNext(request);
}
_connect() {
return new WebSocket('wss://echo.websocket.org');
}
}
As an aside you should also avoid relying on internal variables like source
which are not meant for external consumption. Although RxJS 4 is relatively stable, since those are not meant for public consumption they could be changed out from under you.
Upvotes: 1
Reputation: 14375
It is not the cold/hot issue. What you do in your onMessage is subscribe, then dispose. The dispose terminates the sequence. The onMessageStream should be subscribed to only once, for example, in the constructor:
this.onmessageStream.subscribe(message => console.log('--- msg --- ', message.data));
The subscribe block, including the dispose should be removed.
Also, note that you used replaySubject without a count, this means that the queue holds all previous values. Unless this is a desired behavior, considered changing it to .replaySubject(1)
Here is a working jsbin.
Upvotes: 1