Reputation: 2892
I cannot find a way where the pipline will retry if the client lost connection with the rxjs websocket class.
this.cexSocket = webSocket({ url: 'wss://ws.cex.io/ws/', WebSocketCtor: websocketModule.default });
this.socket = this.cexSocket
.pipe(retryWhen(errors => {
delay(1000);
return errors;
}))
.pipe(repeatWhen(complete => {
delay(1000);
return complete;
}))
.pipe(groupBy(({ e }) => e));
this.socket.subscribe(this.addEvents.bind(this));
setTimeout(this.checkTicks.bind(this), 5000);
If the client does not receive ticks from the server it should close the connection and authenticate again. Check ticks function:
static checkTicks () {
if (!this.lastTick)
return setTimeout(this.checkTicks.bind(this), 5000);
if (this.lastTick + 10000 > Date.now())
return setTimeout(this.checkTicks.bind(this), 5000);
this.lastTick = null;
//??? this.socket.error() does not work as expected
}
Upvotes: 3
Views: 985
Reputation: 2892
I was able to get a solution by forcing the socket to close if no more ticks were found:
static checkTicks () {
if (!this.lastTick)
return setTimeout(this.checkTicks.bind(this), 5000);
if (this.lastTick + 10000 > Date.now())
return setTimeout(this.checkTicks.bind(this), 5000);
this.lastTick = null;
this.socket.unsubscribe();
/* eslint no-underscore-dangle: 0 */
if (!this.cexSocket._socket)
this.cexSocket._connectSocket();
this.cexSocket._socket.close();
this.socket.subscribe(this.addEvents.bind(this));
setTimeout(this.checkTicks.bind(this), 5000);
}
I also had to unsubscribe in order to get the authentication to run again. Here is my add events function in case someone was wondering about the flow:
static addEvents (eventGroup) {
({
disconnecting: grp => grp.subscribe(msg => console.warn(msg)),
connected: grp => grp.subscribe(() => {
this.authenticate();
}),
ping: grp => grp.subscribe(() => this.send({ e: 'pong' })),
auth: () => this.registerTicker(),
tick: grp => {
this.ticker = grp
.pipe(
map(
({
data: {
symbol1: base,
symbol2: quote,
price
}
}) => ({
base,
quote,
price
}))
)
.pipe(share());
grp.subscribe(() => {
this.lastTick = Date.now();
});
}
})[eventGroup.key](eventGroup);
}
Update:
I rewrote the whole thing. Now I just try to send to the server and the retryWhen will catch it if connection is lost:
export const getTicker = () => {
const ticker = cexSocket
.pipe(retryWhen(errors => {
delay(1000);
return errors;
}))
.pipe(repeatWhen(complete => {
delay(1000);
return complete;
}))
.pipe(groupBy(({ e }) => e))
.pipe(flatMap(grp => {
const o = addEvents(grp);
if (!o)
return Observable.create();
return o;
}));
//This is to make sure the server is still there
const check = interval(15000).pipe(switchMap(() => {
cexSocket.next(JSON.stringify({ e: 'ping' }));
return ticker;
}));
return check;
};
Upvotes: 3