Leon
Leon

Reputation: 12491

Reactive Extensions Buffer on count, interval and event

I want to buffer events sent to my server. The trigger to flush the buffer is either the buffer size has been reached, buffer period has been reached or the window has been unloaded.

I buffer events sent to my server by creating a Subject and using buffer with a closing notifier. I use race for the closing notifier and race the buffer period with with window.beforeunload event.

this.event$ = new Subject();
this.bufferedEvent$ = this.event$
    .buffer(
        Observable.race(
            Observable.interval(bufferPeriodMs),
            Observable.fromEvent(window, 'beforeunload')
        )
    )
    .filter(events => events.length > 0)
    .switchMap(events =>
        ajax.post(
            this.baseUrl + RESOURCE_URL,
            {
                entries: events,
            },
            {
                'Content-Type': 'application/json',
            }
       )
    );

The question is, how do I now also limit the size of the buffer. ie, I never want the buffer to be flushed when it has 10 items.

Upvotes: 1

Views: 842

Answers (2)

Richard Matsen
Richard Matsen

Reputation: 23553

One thing that bothers me about the solution using independent triggers is that fullBufferTrigger doesn't know when timeoutTrigger has emitted one of it's buffered values, so given the right event sequence, fullBuffer will trigger early when following timeout.

Ideally, would want fullBufferTrigger to reset when timeoutTrigger fires, but that proves tricky to do.

Using bufferTime()

In RxJS v4 there was an operator bufferWithTimeOrCount(timeSpan, count, [scheduler]), which in RxJS v5 was rolled up into an additional signature of bufferTime() (arguably a mistake from the perspective of clarity).

bufferTime<T>(
  bufferTimeSpan: number, 
  bufferCreationInterval: number, 
  maxBufferSize: number, 
  scheduler?: IScheduler
): OperatorFunction<T, T[]>;

The only remaining question is how to incorporate the window.beforeunload trigger. Looking at the source code for bufferTime, it should flush it's buffer when receiving onComplete.
So, we can handle window.beforeunload by sending an onComplete to the buffered event stream.

The spec for bufferTime does not have an explicit test for onComplete, but I think I've managed to put one together.

Notes:

  • the timeout is set large to take it out of the picture for the test.
  • the source event stream is not affected, to illustrate event8 is added but never emits because the window is destroyed before it occurs.
  • to see the output stream without the beforeunloadTrigger, comment out the line that emits onComplete. Event7 is in the buffer, but will not emit.

Test:

const bufferPeriodMs = 7000  // Set high for this test
const bufferSize = 2
const event$ = new Rx.Subject()

/*
  Create bufferedEvent$
*/
const bufferedEvent$ = event$
  .bufferTime(bufferPeriodMs, null, bufferSize)
  .filter(events => events.length > 0)
const subscription = bufferedEvent$.subscribe(console.log)  

/*
  Simulate window destroy
*/
const destroy = setTimeout( () => {
  subscription.unsubscribe()
}, 4500)

/*
  Simulate Observable.fromEvent(window, 'beforeunload')
*/
const beforeunloadTrigger = new Rx.Subject()
// Comment out the following line, observe that event7 does not emit
beforeunloadTrigger.subscribe(x=> event$.complete())
setTimeout( () => {
  beforeunloadTrigger.next('unload')
}, 4400)

/*
  Test sequence
  Event stream:        '(123)---(45)---6---7-----8--|'
  Destroy window:      '-----------------------x'
  window.beforeunload: '---------------------y'
  Buffered output:     '(12)---(34)---(56)---7'
*/
event$.next('event1')
event$.next('event2')
event$.next('event3')
setTimeout( () => { event$.next('event4'); event$.next('event5') }, 1000)
setTimeout( () => { event$.next('event6') }, 3000)
setTimeout( () => { event$.next('event7') }, 4000)
setTimeout( () => { event$.next('event8') }, 5000)

Working example: CodePen

Upvotes: 1

Richard Matsen
Richard Matsen

Reputation: 23553

This is the working solution I have. Extra console.log()'s are added to show the sequence of events.

The only thing that's a bit bothersome is the .skip(1) in fullBufferTrigger, but it's needed as it will trigger when it's buffer is full (natch), but the buffer in bufferedEvent$ does not seem to have the latest event before it's triggered.

Luckily, with the timeoutTrigger in place, the last event gets emitted. Without timeout, fullBufferTrigger by itself will not emit the final event.

Also, changed buffer to bufferWhen, as the former did not seem to trigger with two triggers, although you'd expect it to from the documentation.
footnote with buffer(race()) the race only completes once, so whichever trigger got there first will thereafter be used and the other triggers dis-regarded. In contrast, bufferWhen(x => race()) evaluates every time an event occurs.

const bufferPeriodMs = 1000

const event$ = new Subject()
event$.subscribe(event => console.log('event$ emit', event))

// Define triggers here for testing individually
const beforeunloadTrigger = Observable.fromEvent(window, 'beforeunload')
const fullBufferTrigger = event$.skip(1).bufferCount(2)
const timeoutTrigger = Observable.interval(bufferPeriodMs).take(10)

const bufferedEvent$ = event$
  .bufferWhen( x => 
    Observable.race(
      fullBufferTrigger,
      timeoutTrigger
    )
  )
  .filter(events => events.length > 0)

// output
fullBufferTrigger.subscribe(x => console.log('fullBufferTrigger', x))
timeoutTrigger.subscribe(x => console.log('timeoutTrigger', x))
bufferedEvent$.subscribe(events => {
  console.log('subscription', events)
})

// Test sequence
const delayBy = n => (bufferPeriodMs * n) + 500 
event$.next('event1')
event$.next('event2')
event$.next('event3')

setTimeout( () => {
  event$.next('event4')
}, delayBy(1))

setTimeout( () => {
  event$.next('event5')
}, delayBy(2))

setTimeout( () => {
  event$.next('event6')
  event$.next('event7')
}, delayBy(3))

Working example: CodePen

Edit: Alternative way to trigger the buffer

Since the combination of bufferWhen and race might be a bit inefficient (the race is restarted each event emission), an alternative is to merge the triggers into one stream and use a simple buffer

const bufferTrigger$ = timeoutTrigger
  .merge(fullBufferTrigger)
  .merge(beforeunloadTrigger)

const bufferedEvent$ = event$
  .buffer(bufferTrigger$)
  .filter(events => events.length > 0)

Upvotes: 1

Related Questions