Reputation: 1395
I have a source$
observable collecting a stream of data if there are some events trigger. I want to collect these data which occurred in a specified time into array.
const eventSubject = new Subject();
eventSubject.next(data);
const source$ = eventSubject.asObservable();
source$.pipe(takeUntil(destroyed$)).subscribe(
data => {
console.log(data);
}
);
The above source$
handle emitted data immediately.
Now i want to improve this that wait for a few seconds and collect all data happened in that specified time and emit once. So i modify to use with bufferTime
like below:
const source$ = eventSubject.asObservable();
source$.pipe(takeUntil(destroyed$), bufferTime(2000)).subscribe(
data => {
console.log(data);
}
);
After testing with bufferTime
, I found that it emits every 2s even source is not receiving data. If source not receiving data, it emit empty object.
What i want is only when source$
receiving data, then start to buffer for 2s, then emit value. If source$ not receiving data, it shouldn't emit anything.
I checked the bufferWhen
, windowWhen
, windowTime
not all meeting my requirements. They are emitting every time interval specified.
Is there have other operator can do what i want?
Thanks a lot.
Upvotes: 7
Views: 1548
Reputation: 387
The optimal solution for this case is to use buffer operator with a notifier of debouceTime operator.
For example :
const source$ = eventSubject.asObservable();
source$
.pipe(
// buffer: accumulate emitions to an array,until closing notifier emits. (closing notifier is the argument below)
buffer(
// debounceTime : will grab the emit that afterwards 2 seconds has passed without another emit.
$source.pipe(debounceTime(2000))
).subscribe(
// will return the data that has been emitted througout the 2 seconds in a form of an array , where each item in the array is the emits, by the order they were triggered.
data => {
console.log(data);
}
);
Buffers the source Observable values until closingNotifier emits.
Emits a notification from the source Observable only after a particular time span has passed without another source emission.
This solution in contrast of the filter operator solution, will not keep an interval/timer alive. And its pretty clean and elegant IMO.
Upvotes: 1
Reputation: 3409
I'd go for connect(shared$ => ...)
and buffer(signal$)
.
I think something along these lines:
source$.pipe(
connect(shared$ => shared$.pipe(
buffer(shared$.pipe(
debounceTime(2000)
))
))
)
connect
creates a shared observable so that you can have multiple subscriptions on the source without actually opening those subscriptions to it.
In there I run a buffer
, whose selector is the debounceTime
of the same source, so that it debounces for that much (i.e. will emit the array when source$ doesn't emit for more than 2 seconds)
Maybe what you need is throttleTime(2000, { leading: false, trailing: true })
instead of debounceTime. It depends on your use case.
Upvotes: 2
Reputation: 11380
You can just add a filter
operator to ignore the empty object emission
const source$ = eventSubject.asObservable();
source$.pipe(takeUntil(destroyed$), bufferTime(2000),filter(arr=>arr.length)).subscribe(
data => {
console.log(data);
}
);
Upvotes: 1