Raza
Raza

Reputation: 1222

Close websocket connection in redux-observable

I'm currently using redux-observable to handle a websocket connection but can't figure out what's the best way to handle the close event.

const webSocket$ = Observable.webSocket({
  url: 'ws://localhost:8080'
});

export const fetchMessagesEpic = (action$) =>
  action$.ofType(FETCH.MESSAGES)
    .switchMap((action) =>
      webSocket$
        .catch((err) => { console.log(err);return true; }) <-- I can see the close event here
        .map((msg) => addMessage(msg)));

Should I be handling it in catch (sending a new action from there and then using takeUntil in the same epic?

Upvotes: 2

Views: 2233

Answers (1)

jayphelps
jayphelps

Reputation: 15401

The Observable.webSocket (aka WebSocketSubject) from rxjs only sends the close event down the error path if the CloseEvent's wasClean value is false.

rxjs source code

socket.onclose = (e: CloseEvent) => {
  this._resetState();
  const closeObserver = this.closeObserver;
  if (closeObserver) {
    closeObserver.next(e);
  }

  //  v------------------------ Right here
  if (e.wasClean) {
    observer.complete();
  } else {
    observer.error(e);
  }
};

So that means that in your case the connection is not closing correctly. There is sometimes a reason field on the CloseEvent that may or may not give you insight, or the Chrome Dev Tools Network and Console tabs might have more too.

When a socket is closed cleanly by the remote server, then the correct way to do something special is by passing in a closeObserver. Either a { next: func } observer directly, or more likely a Subject (which is both an Observable and an an Observer)

// a stream of CloseEvents
const webSocketClose$ = new Subject();
const webSocket$ = Observable.webSocket({
  url: 'ws://localhost:8080',
  closeObserver: webSocketClose$
});

export const fetchMessagesEpic = (action$) =>
  action$.ofType(FETCH.MESSAGES)
    .switchMap((action) => {
      const message$ = webSocket$
        .map((msg) => addMessage(msg))
        .catch((e) => Observable.of({
          type: 'SOCKET_ERROR',
          error: e
        }));

      const close$ = webSocketClose$
        .take(1) // you probably only want one
        .map((event) => ({
          type: 'SOCKET_CLOSE_EVENT',
          wasClean: event.wasClean,
          code: event.code,
          reason: event.reason
        }));

      return Observable.merge(message$, close$);
    });

This code makes a couple assumptions:

  • That you want to handle event.wasClean === true errors. This example uses both closeObserver and catch, but you don't have to. It's not clear if you only care about unclean closes or all of them. It's also important to know that unclean CloseEvents are not the only errors WebSocketSubject can throw. Errors from socket.onerror are sent there as well as JS exceptions.
  • that our subscription to webSocket$ will complete() when the socket closes, which it does. If it didn't we'd want to webSocket$.takeUntil(webSocketClose$).map(...)

Separately I wanted to point out that rxjs's catch operator requires you to return an Observable, but your example returned a boolean. This would produce an error. If you only want to log the error instead of handling it, use the do operator.

// BAD
.catch((err) => { console.log(err);return true; })

// GOOD
.do({ error: (err) => console.log(err) })

Upvotes: 3

Related Questions