Lanklaas
Lanklaas

Reputation: 2892

rxjs websocket - internet connection loss

I cannot find a way where the pipline will retry if the client lost connection with the rxjs websocket class.

this.cexSocket = webSocket({ url: 'wss://ws.cex.io/ws/', WebSocketCtor: websocketModule.default });

    this.socket = this.cexSocket
        .pipe(retryWhen(errors => {
            delay(1000);
            return errors;
        }))
        .pipe(repeatWhen(complete => {
            delay(1000);
            return complete;
        }))
        .pipe(groupBy(({ e }) => e));

    this.socket.subscribe(this.addEvents.bind(this));

    setTimeout(this.checkTicks.bind(this), 5000);

If the client does not receive ticks from the server it should close the connection and authenticate again. Check ticks function:

static checkTicks () {
    if (!this.lastTick)
        return setTimeout(this.checkTicks.bind(this), 5000);
    if (this.lastTick + 10000 > Date.now())
        return setTimeout(this.checkTicks.bind(this), 5000);

    this.lastTick = null;
    //??? this.socket.error() does not work as expected
}

Upvotes: 3

Views: 985

Answers (1)

Lanklaas
Lanklaas

Reputation: 2892

I was able to get a solution by forcing the socket to close if no more ticks were found:

static checkTicks () {
    if (!this.lastTick)
        return setTimeout(this.checkTicks.bind(this), 5000);
    if (this.lastTick + 10000 > Date.now())
        return setTimeout(this.checkTicks.bind(this), 5000);

    this.lastTick = null;
    this.socket.unsubscribe();
    /* eslint no-underscore-dangle: 0 */
    if (!this.cexSocket._socket)
        this.cexSocket._connectSocket();
    this.cexSocket._socket.close();
    this.socket.subscribe(this.addEvents.bind(this));
    setTimeout(this.checkTicks.bind(this), 5000);
}

I also had to unsubscribe in order to get the authentication to run again. Here is my add events function in case someone was wondering about the flow:

static addEvents (eventGroup) {
    ({
        disconnecting: grp => grp.subscribe(msg => console.warn(msg)),
        connected: grp => grp.subscribe(() => {
            this.authenticate();
        }),
        ping: grp => grp.subscribe(() => this.send({ e: 'pong' })),
        auth: () => this.registerTicker(),
        tick: grp => {
            this.ticker = grp
                .pipe(
                    map(
                        ({ 
                            data: { 
                                symbol1: base, 
                                symbol2: quote, 
                                price
                            }
                        }) => ({
                            base,
                            quote,
                            price
                        }))
                )
                .pipe(share());
            grp.subscribe(() => {
                this.lastTick = Date.now();
            });
        }
    })[eventGroup.key](eventGroup);
}

Update:

I rewrote the whole thing. Now I just try to send to the server and the retryWhen will catch it if connection is lost:

export const getTicker = () => {
const ticker = cexSocket
    .pipe(retryWhen(errors => {
        delay(1000);
        return errors;
    }))
    .pipe(repeatWhen(complete => {
        delay(1000);
        return complete;
    }))
    .pipe(groupBy(({ e }) => e))
    .pipe(flatMap(grp => {
        const o = addEvents(grp);
        if (!o)
            return Observable.create();
        return o;
    }));

//This is to make sure the server is still there
const check = interval(15000).pipe(switchMap(() => {
    cexSocket.next(JSON.stringify({ e: 'ping' }));
    return ticker;
}));

return check;
};

Upvotes: 3

Related Questions