Reputation: 12491
I want to buffer events sent to my server. The trigger to flush the buffer is either the buffer size has been reached, buffer period has been reached or the window has been unloaded.
I buffer events sent to my server by creating a Subject and using buffer
with a closing notifier. I use race
for the closing notifier and race the buffer period with with window.beforeunload
event.
this.event$ = new Subject();
this.bufferedEvent$ = this.event$
.buffer(
Observable.race(
Observable.interval(bufferPeriodMs),
Observable.fromEvent(window, 'beforeunload')
)
)
.filter(events => events.length > 0)
.switchMap(events =>
ajax.post(
this.baseUrl + RESOURCE_URL,
{
entries: events,
},
{
'Content-Type': 'application/json',
}
)
);
The question is, how do I now also limit the size of the buffer. ie, I never want the buffer to be flushed when it has 10 items.
Upvotes: 1
Views: 842
Reputation: 23553
One thing that bothers me about the solution using independent triggers is that fullBufferTrigger
doesn't know when timeoutTrigger
has emitted one of it's buffered values, so given the right event sequence, fullBuffer will trigger early when following timeout.
Ideally, would want fullBufferTrigger
to reset when timeoutTrigger
fires, but that proves tricky to do.
bufferTime()
In RxJS v4 there was an operator bufferWithTimeOrCount(timeSpan, count, [scheduler])
, which in RxJS v5 was rolled up into an additional signature of bufferTime()
(arguably a mistake from the perspective of clarity).
bufferTime<T>(
bufferTimeSpan: number,
bufferCreationInterval: number,
maxBufferSize: number,
scheduler?: IScheduler
): OperatorFunction<T, T[]>;
The only remaining question is how to incorporate the window.beforeunload
trigger. Looking at the source code for bufferTime
, it should flush it's buffer when receiving onComplete
.
So, we can handle window.beforeunload
by sending an onComplete to the buffered event stream.
The spec for bufferTime does not have an explicit test for onComplete, but I think I've managed to put one together.
Notes:
Test:
const bufferPeriodMs = 7000 // Set high for this test
const bufferSize = 2
const event$ = new Rx.Subject()
/*
Create bufferedEvent$
*/
const bufferedEvent$ = event$
.bufferTime(bufferPeriodMs, null, bufferSize)
.filter(events => events.length > 0)
const subscription = bufferedEvent$.subscribe(console.log)
/*
Simulate window destroy
*/
const destroy = setTimeout( () => {
subscription.unsubscribe()
}, 4500)
/*
Simulate Observable.fromEvent(window, 'beforeunload')
*/
const beforeunloadTrigger = new Rx.Subject()
// Comment out the following line, observe that event7 does not emit
beforeunloadTrigger.subscribe(x=> event$.complete())
setTimeout( () => {
beforeunloadTrigger.next('unload')
}, 4400)
/*
Test sequence
Event stream: '(123)---(45)---6---7-----8--|'
Destroy window: '-----------------------x'
window.beforeunload: '---------------------y'
Buffered output: '(12)---(34)---(56)---7'
*/
event$.next('event1')
event$.next('event2')
event$.next('event3')
setTimeout( () => { event$.next('event4'); event$.next('event5') }, 1000)
setTimeout( () => { event$.next('event6') }, 3000)
setTimeout( () => { event$.next('event7') }, 4000)
setTimeout( () => { event$.next('event8') }, 5000)
Working example: CodePen
Upvotes: 1
Reputation: 23553
This is the working solution I have. Extra console.log()'s are added to show the sequence of events.
The only thing that's a bit bothersome is the .skip(1)
in fullBufferTrigger, but it's needed as it will trigger when it's buffer is full (natch), but the buffer in bufferedEvent$
does not seem to have the latest event before it's triggered.
Luckily, with the timeoutTrigger
in place, the last event gets emitted. Without timeout, fullBufferTrigger by itself will not emit the final event.
Also, changed buffer
to bufferWhen
, as the former did not seem to trigger with two triggers, although you'd expect it to from the documentation.
footnote with buffer(race())
the race only completes once, so whichever trigger got there first will thereafter be used and the other triggers dis-regarded. In contrast, bufferWhen(x => race())
evaluates every time an event occurs.
const bufferPeriodMs = 1000
const event$ = new Subject()
event$.subscribe(event => console.log('event$ emit', event))
// Define triggers here for testing individually
const beforeunloadTrigger = Observable.fromEvent(window, 'beforeunload')
const fullBufferTrigger = event$.skip(1).bufferCount(2)
const timeoutTrigger = Observable.interval(bufferPeriodMs).take(10)
const bufferedEvent$ = event$
.bufferWhen( x =>
Observable.race(
fullBufferTrigger,
timeoutTrigger
)
)
.filter(events => events.length > 0)
// output
fullBufferTrigger.subscribe(x => console.log('fullBufferTrigger', x))
timeoutTrigger.subscribe(x => console.log('timeoutTrigger', x))
bufferedEvent$.subscribe(events => {
console.log('subscription', events)
})
// Test sequence
const delayBy = n => (bufferPeriodMs * n) + 500
event$.next('event1')
event$.next('event2')
event$.next('event3')
setTimeout( () => {
event$.next('event4')
}, delayBy(1))
setTimeout( () => {
event$.next('event5')
}, delayBy(2))
setTimeout( () => {
event$.next('event6')
event$.next('event7')
}, delayBy(3))
Working example: CodePen
Since the combination of bufferWhen
and race
might be a bit inefficient (the race is restarted each event emission), an alternative is to merge the triggers into one stream and use a simple buffer
const bufferTrigger$ = timeoutTrigger
.merge(fullBufferTrigger)
.merge(beforeunloadTrigger)
const bufferedEvent$ = event$
.buffer(bufferTrigger$)
.filter(events => events.length > 0)
Upvotes: 1