Josh Mc
Josh Mc

Reputation: 10224

Websocket epic that receives connection & message requests and emits messages & connection status updates

I am hoping to create an redux-observable epic that can sit separate to the rest of my application. It needs to:

The epic needs to listen for incoming socket connection requests, establish a socket connection and then output status updates when the connection is established or is lost. It also needs to be able to both send and receive messages which can then be processed elsewhere.

The closest I have come to this, is the answer provided in this question:

const somethingEpic = action$ =>
  action$.ofType('START_SOCKET_OR_WHATEVER')
    .switchMap(action =>
      Observable.webSocket('ws://localhost:8081')
        .map(response => ({ type: 'RECEIVED_MESSAGE', paylod: response }))
    );

However I am unsure how to extend this to additionally emit both connection established and disconnected events, and additionally accept messages to be sent to server.

Upvotes: 2

Views: 749

Answers (2)

Josh Mc
Josh Mc

Reputation: 10224

For anyone trying to do exactly what I was, my final code was as follows. I ended up realising that really I needed an epic for connecting, emitting messages back out, and another epic for sending messages.

const notificationTypes = {
  WEBSOCKET_TRY_CONNECT: "WEBSOCKET_TRY_CONNECT",
  WEBSOCKET_TRY_DISCONNECT: "WEBSOCKET_TRY_DISCONNECT",
  WEBSOCKET_CONNECTED: "WEBSOCKET_CONNECTED",
  WEBSOCKET_DISCONNECTED: "WEBSOCKET_DISCONNECTED",
  WEBSOCKET_ERROR: "WEBSOCKET_ERROR",
  WEBSOCKET_MESSAGE_SEND: "WEBSOCKET_MESSAGE_SEND",
  WEBSOCKET_MESSAGE_SENT: "WEBSOCKET_MESSAGE_SENT",
  WEBSOCKET_MESSAGE_RECIEVED: "WEBSOCKET_MESSAGE_RECIEVED"
};

const notificationActions = {
  tryConnect: () => ({ type: notificationTypes.WEBSOCKET_TRY_CONNECT }),
  tryDisconnect: () => ({ type: notificationTypes.WEBSOCKET_TRY_DISCONNECT }),
  sendNotification: message => ({ type: notificationTypes.WEBSOCKET_MESSAGE_SEND, message }),
  sentNotification: message => ({ type: notificationTypes.WEBSOCKET_MESSAGE_SENT, message }),
  receivedNotification: message => ({ type: notificationTypes.WEBSOCKET_MESSAGE_RECIEVED, message }),
  connected: () => ({ type: notificationTypes.WEBSOCKET_CONNECTED }),
  disconnected: () => ({ type: notificationTypes.WEBSOCKET_DISCONNECTED }),
  error: error => ({ type: notificationTypes.WEBSOCKET_ERROR, error })
};

let webSocket$ = null;

const notificationSendEpic = (action$, state$) =>
  action$.pipe(
    ofType(notificationTypes.WEBSOCKET_MESSAGE_SEND),
    mergeMap(action => {
        if (!webSocket$) {
          return of(notificationActions.error(`Attempted to send message while no connection was open.`));
        }
        webSocket$.next(action.message);
        return of(notificationActions.sentNotification(action.message));
    })
  );

const notificationConnectionEpic = (action$, state$) =>
  action$.pipe(
    ofType(notificationTypes.WEBSOCKET_TRY_CONNECT),
    switchMap(action => {

      if (webSocket$) {
        return of(notificationActions.error(`Attempted to open connection when one was already open.`));
      }

      const webSocketOpen$ = new Subject();
      const webSocketClose$ = new Subject();

      const open$ = webSocketOpen$.pipe(take(1),map(() => of(notificationActions.connected())));
      const close$ = webSocketClose$.pipe(take(1),map(() => {
        webSocket$ = null;
        return of(notificationActions.disconnected());
      }));

      webSocket$ = webSocket({
        url: wsLocation,
        openObserver: webSocketOpen$,
        closeObserver: webSocketClose$
      });

      const message$ = webSocket$.pipe(
        takeUntil(action$.ofType(notificationTypes.WEBSOCKET_DISCONNECTED, notificationTypes.WEBSOCKET_TRY_DISCONNECT)),
        map(evt => of(notificationActions.receivedNotification(evt)))
      );

      return merge(message$, open$, close$);

    }),
    mergeMap(v => v)
  );

Upvotes: 3

jayphelps
jayphelps

Reputation: 15401

Generally speaking it sounds like you want something like this:

(note, this is untested code but should be pretty close to runnable)

const somethingEpic = action$ =>
  action$.ofType('START_SOCKET_OR_WHATEVER')
    .switchMap(action => {
      // Subjects are a combination of an Observer *and* an Observable
      // so webSocket can call openObserver$.next(event) and
      // anyone who is subscribing to openObserver$ will receive it
      // because Subjects are "hot"
      const openObserver$ = new Subject();
      const openObserver$ = new Subject();

      // Listen for our open/close events and transform them
      // to redux actions. We could also include values from
      // the events like event.reason, etc if we wanted
      const open$ = openObserver$.map((event) => ({
        type: 'SOCKET_CONNECTED'
      }));
      const close$ = openObserver$.map((event) => ({
        type: 'SOCKET_DISCONNECTED'
      }));

      // webSocket has an overload signature that accepts this object
      const options = {
        url: 'ws://localhost:8081',
        openObserver: openObserver$,
        closeObserver: openObserver$
      };
      const msg$ = Observable.webSocket(options)
        .map(response => ({ type: 'RECEIVED_MESSAGE', payload: response }))
        .catch(e => Observable.of({
          type: 'SOCKET_ERROR',
          payload: e.message
        }))

      // We're merging them all together because we want to listen for
      // and emit actions from all three. For good measure I also included
      // a generic .takeUntil() to demonstrate the most obvious way to stop
      // the websocket (as well as the open/close, which we shouldn't forget!)
      // Also notice how I'm listening for both the STOP_SOCKET_OR_WHATEVER
      // or also a SOCKET_ERROR because we want to stop subscribing
      // to open$/close$ if there is an error.  
      return Observable.merge(open$, close$, msg$)
        .takeUntil(action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR'));
    });

If this epic needs to ever support multiple sockets at a time, you'll need to come up with some sort of way of uniquely identify a particular connection, and modify the code to filter signals based on that. e.g.

.takeUntil(
  action$.ofType('STOP_SOCKET_OR_WHATEVER', 'SOCKET_ERROR')
    .filter(action => action.someHowHaveId === someHowHaveId)
);

Upvotes: 4

Related Questions