Robin Gruschke
Robin Gruschke

Reputation: 234

rxjs pausableBuffered multiple subscriptions

I'm trying to write a websocket rxjs based wrapper.

And I'm struggling with my rxjs understanding.

I have a pause stream which is supposed to pause the pausable buffered streams when an error occures and resume them once i get a "ok" form the websocket.

Somehow only the first subscription on my pauseable buffered streams are fired. From then on only the queue stacks up higher.

I have prepared a jsbin to reproduce the issue.

https://jsbin.com/mafakar/edit?js,console

There the "msg recived" stream only fires for the first subscription. And then the q and observer begin stacking up.

I somehow have the feeling this is about hot and cold obserables but I cannot grasp the issues. I would appreciate any help.

Thank you in advance!

Upvotes: 0

Views: 139

Answers (2)

paulpdaniels
paulpdaniels

Reputation: 18663

As @Meir pointed out dispose in a subscribe block is a no no since its behavior is non-deterministic. In general I would avoid the use of Subjects and rely on factory methods instead. You can see a refactored version here: https://jsbin.com/popixoqafe/1/edit?js,console

A quick breakdown of the changes:

class WebSocketWrapper {
  // Inject the pauser from the external state
  constructor(pauser) {

    // Input you need to keep as a subject
    this.input$ = new Rx.Subject();

    // Create the socket
    this._socket = this._connect();      

    // Create a stream for the open event
    this.open$ = Rx.Observable.fromEvent(this._socket, 'open');

    // This concats the external pauser with the 
    // open event. The result is a pauser that won't unpause until
    // the socket is open.
    this.pauser$ = Rx.Observable.concat(
      this.open$.take(1).map(true)
      pauser || Rx.Observable.empty()
    )
      .startWith(false);

    // subscribe and buffer the input
    this.input$
      .pausableBuffered(this.pauser$)
      .subscribe(msg => this._socket.send(msg));

    // Create a stream around the message event
    this.message$ = Rx.Observable.fromEvent(this._socket, 'message')

      // Buffer the messages
      .pausableBuffered(this.pauser$)

      // Create a shared version of the stream and always replay the last
      // value to new subscribers
      .shareReplay(1);
  }

  send(request) {
    // Push to input
    this.input$.onNext(request);
  }

  _connect() {
    return new WebSocket('wss://echo.websocket.org');
  }
}

As an aside you should also avoid relying on internal variables like source which are not meant for external consumption. Although RxJS 4 is relatively stable, since those are not meant for public consumption they could be changed out from under you.

Upvotes: 1

Meir
Meir

Reputation: 14375

It is not the cold/hot issue. What you do in your onMessage is subscribe, then dispose. The dispose terminates the sequence. The onMessageStream should be subscribed to only once, for example, in the constructor:

this.onmessageStream.subscribe(message => console.log('--- msg --- ', message.data));

The subscribe block, including the dispose should be removed.

Also, note that you used replaySubject without a count, this means that the queue holds all previous values. Unless this is a desired behavior, considered changing it to .replaySubject(1)

Here is a working jsbin.

Upvotes: 1

Related Questions