Reputation: 1912
I started using rxjs
yesterday. i would like to achieve the following behavior:
I want my buffer to be produced with both time and custom events.
Imagine, every 2000 ms my buffer is produced (like standard bufferWithTime
function behavior) and for some special event, a buffer is produced and the timer, is re initialized.
Here is a basic representation :
Content of buffer A
: [1, 2]
, B
: [3, 4, 5]
, C
: [6, 7]
, D
: [8, 9]
.
Time segments A
, B
, D
have the same duration, 2000 ms.
Time segment C
was shorter because special event 7
was triggered and buffer was produced.
How can i do this ?
Upvotes: 0
Views: 352
Reputation: 18665
Can you try this (jsfiddle)?
// Helper functions
function randomDelay(bottom, top) {
return Math.floor( Math.random() * ( 1 + top - bottom ) ) + bottom;
}
// Simulation of random sequence
var source$ = Rx.Observable
.range(1, 30)
.concatMap(function (x) {
return Rx.Observable.of(x).delay(randomDelay(10,500));
});
var intervalBetween = 1000; // should be 2000ms in your case
var dummyStart$ = Rx.Observable.return({});
var specialEvent$ = Rx.Observable.fromEvent(document, 'click');
var opening$ = dummyStart$.concat(specialEvent$).flatMapLatest(function(x){return Rx.Observable.timer(0, intervalBetween)});
var buffers$ = source$.buffer(opening$).skip(1);
source$.subscribe(function(x){console.log('source', x)})
buffers$.subscribe(function(buffer){
console.log('buffer', buffer);
});
The idea here is to use the buffer
operator with the signature Rx.Observable.prototype.window(windowBoundaries);
. That should give you non-overlapping buffers which emit synchronized with the windowBoundaries
signal.
There is a small trick (dummyStart
) to kickoff the timer immediately with a false specialEvent
. That produce an empty buffer as first value, which is removed by skip(1)
.
Upvotes: 1