ZackDeRose
ZackDeRose

Reputation: 2256

rxjs: how to order responses via Observables

enter image description here

I am using socket.io to send a series of responses to my front-end. The responses are intended to be sequential, but depending on the connection created by socket.io they're not always guaranteed to come in the correct order (https://github.com/josephg/ShareJS/issues/375).

Assuming each response had a sequence field that held a number (shown as the number in the picture above), the observable should emit these responses in order.

If a response is received out of order and a certain amount of time (n) passes without getting any response, I would like for my observable to emit an error, to signal to my front-end to reset the connection.

Upvotes: 3

Views: 389

Answers (1)

artur grzesiak
artur grzesiak

Reputation: 20348

A really nice problem. Below a snippet with most important parts commented.

// mock ordered values
const mockMessages = Rx.Observable.fromEvent(document.querySelector('#emit'), 'click')
  .map((e, index) => ({
    index,
    timestamp: e.timeStamp
  }))
  .delayWhen(() => Rx.Observable.timer(Math.random() * 2000)) // distort order

// there is a lot of mutability in `keepOrder`, but all of it
// is sealed and does not leak to outside environment
const keepOrder = timeoutMs => stream =>
  Rx.Observable.defer(() => // need defer to support retries on error
    stream.scan((acc, v) => {
      acc.buffer.push(v)
      acc.buffer.sort((v1, v2) => v1.index - v2.index)
      return acc
    }, {
      lastEmitted: -1,
      buffer: []
    })
    .mergeMap(info => {
      const emission = []
      while (info.buffer.length && info.lastEmitted + 1 === info.buffer[0].index) {
        emission.push(info.buffer.shift())
        info.lastEmitted += 1
      }
      return Rx.Observable.of(emission)
    })
    .switchMap(emissions => {
      if (!emissions.length) { // this condition indicates out of order
        return Rx.Observable.timer(timeoutMs)
          .mergeMapTo(Rx.Observable
            .throw(new Error('ORDER_TIMEOUT')))
      } else {
        return Rx.Observable.from(emissions)
      }
    })
  )


mockMessages
  .do(x => console.log('mocked', x.index))
  .let(keepOrder(1000)) // decrease timeoutMs to increase error probablity
  .do(x => console.log('ORDERED', x.index))
  .retryWhen(es => es
    .do(e => console.warn('ERROR', e)))
  .subscribe()
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>

<button id="emit">EMIT</button>

Upvotes: 1

Related Questions