Reputation: 2256
I am using socket.io to send a series of responses to my front-end. The responses are intended to be sequential, but depending on the connection created by socket.io they're not always guaranteed to come in the correct order (https://github.com/josephg/ShareJS/issues/375).
Assuming each response had a sequence field that held a number (shown as the number in the picture above), the observable should emit these responses in order.
If a response is received out of order and a certain amount of time (n) passes without getting any response, I would like for my observable to emit an error, to signal to my front-end to reset the connection.
Upvotes: 3
Views: 389
Reputation: 20348
A really nice problem. Below a snippet with most important parts commented.
// mock ordered values
const mockMessages = Rx.Observable.fromEvent(document.querySelector('#emit'), 'click')
.map((e, index) => ({
index,
timestamp: e.timeStamp
}))
.delayWhen(() => Rx.Observable.timer(Math.random() * 2000)) // distort order
// there is a lot of mutability in `keepOrder`, but all of it
// is sealed and does not leak to outside environment
const keepOrder = timeoutMs => stream =>
Rx.Observable.defer(() => // need defer to support retries on error
stream.scan((acc, v) => {
acc.buffer.push(v)
acc.buffer.sort((v1, v2) => v1.index - v2.index)
return acc
}, {
lastEmitted: -1,
buffer: []
})
.mergeMap(info => {
const emission = []
while (info.buffer.length && info.lastEmitted + 1 === info.buffer[0].index) {
emission.push(info.buffer.shift())
info.lastEmitted += 1
}
return Rx.Observable.of(emission)
})
.switchMap(emissions => {
if (!emissions.length) { // this condition indicates out of order
return Rx.Observable.timer(timeoutMs)
.mergeMapTo(Rx.Observable
.throw(new Error('ORDER_TIMEOUT')))
} else {
return Rx.Observable.from(emissions)
}
})
)
mockMessages
.do(x => console.log('mocked', x.index))
.let(keepOrder(1000)) // decrease timeoutMs to increase error probablity
.do(x => console.log('ORDERED', x.index))
.retryWhen(es => es
.do(e => console.warn('ERROR', e)))
.subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
<button id="emit">EMIT</button>
Upvotes: 1