Alexander Mattoni
Alexander Mattoni

Reputation: 455

Reconnect websocket rxjs

I'm trying to get my websocket code to automatically attempt a reconnect (indefinitely) until successful. By sending a "ping" message every x seconds, I can detect when a pipe is broken, and the closeObserver is called.

However, I'm not sure how to get a reconnect sequence to initiate.


const notificationConnectionEpic: Epic<ActionTypes, any, RootState> = (
  action$,
  state$
) =>
  action$.pipe(
    filter(isActionOf(actions.connectNotificationPipeline.request)),
    switchMap(async action => {
      const resp = await requireValidToken(action$, state$, params =>
        AdminHubs.getHubNotificationsToken({
          ...params,
          id: action.payload.hubId
        })
      );

      return resp.pipe(
        switchMap(v => {
          if (isAction(v)) {
            return of(v);
          }
          if (!v.ok) {
            return of(
              actions.connectNotificationPipeline.failure({
                hubId: action.payload.hubId,
                error: v.error
              })
            );
          }

          const webSocketOpen$ = new Subject();
          const webSocketClose$ = new Subject();
          const webSocket$ = webSocket<AdminHubs.HubNotification>({
            url: v.value,
            openObserver: webSocketOpen$,
            closeObserver: webSocketClose$
          });

          const message$ = webSocket$.pipe(
            map(message => actions.receiveNotification({ message })),
            takeUntil(action$.ofType(HubActionConsts.NOTIFICATION_PIPE_CLOSED))
          );

          const ping$ = interval(1000).pipe(
            map(_ => webSocket$.next("ping" as any)),
            ignoreElements()
          );

          const open$ = webSocketOpen$.pipe(
            take(1),
            map(_ =>
              actions.connectNotificationPipeline.success({
                hubId: action.payload.hubId
              })
            )
          );

          const close$ = webSocketClose$.pipe(
            // called when a network drop happens. handle reconnect?
          ); // also happens on net error
          return merge(message$, open$, ping$, close$);
        })
      );
    }),
    mergeMap(v => v)
  );

Upvotes: 2

Views: 1400

Answers (2)

BaptisteC
BaptisteC

Reputation: 106

I have been struggling with this the last few weeks and decided to create a package called SuperSocket - in case it helps anyone! It should work as a drop in replacement for the WebSocket observer implementation you are using.

SuperSocket sits on top of the existing WebSocket implementation and, amongst other features, make sure it reconnects until it successfully does. Of course you can setup a max retry to avoid infinite loop and unnecessary CPU load :)

Upvotes: 0

Kevin Ghadyani
Kevin Ghadyani

Reputation: 7297

When the WebSocket connection closes, just dispatch actions.connectNotificationPipeline.request again. That will re-run this code and create a new WebSocket connection.

Upvotes: 1

Related Questions