rodent_la
rodent_la

Reputation: 1395

Rxjs buffer the emitted values for specified time after source emitted values

I have a source$ observable collecting a stream of data if there are some events trigger. I want to collect these data which occurred in a specified time into array.

const eventSubject = new Subject();
eventSubject.next(data); 

const source$ = eventSubject.asObservable();
source$.pipe(takeUntil(destroyed$)).subscribe(
    data => {
      console.log(data);
    }
);

The above source$ handle emitted data immediately.

Now i want to improve this that wait for a few seconds and collect all data happened in that specified time and emit once. So i modify to use with bufferTime like below:

const source$ = eventSubject.asObservable();
source$.pipe(takeUntil(destroyed$), bufferTime(2000)).subscribe(
    data => {
      console.log(data);
    }
);

After testing with bufferTime, I found that it emits every 2s even source is not receiving data. If source not receiving data, it emit empty object.

What i want is only when source$ receiving data, then start to buffer for 2s, then emit value. If source$ not receiving data, it shouldn't emit anything.

I checked the bufferWhen, windowWhen, windowTime not all meeting my requirements. They are emitting every time interval specified.

Is there have other operator can do what i want?

Thanks a lot.

Upvotes: 7

Views: 1548

Answers (3)

coderrr22
coderrr22

Reputation: 387

The optimal solution for this case is to use buffer operator with a notifier of debouceTime operator.

For example :

const source$ = eventSubject.asObservable();
source$
  .pipe(
  // buffer: accumulate emitions to an array,until closing notifier emits. (closing notifier is the argument below)
    buffer( 
      // debounceTime : will grab the emit that afterwards 2 seconds has passed without another emit.
      $source.pipe(debounceTime(2000))
).subscribe(
   // will return the data that has been emitted througout the 2 seconds in a form of an array , where each item in the array is the emits, by the order they were triggered.
    data => {
      console.log(data); 
    }
);

buffer:

Buffers the source Observable values until closingNotifier emits.

debounceTime:

Emits a notification from the source Observable only after a particular time span has passed without another source emission.

This solution in contrast of the filter operator solution, will not keep an interval/timer alive. And its pretty clean and elegant IMO.

Upvotes: 1

olivarra1
olivarra1

Reputation: 3409

I'd go for connect(shared$ => ...) and buffer(signal$).

I think something along these lines:

source$.pipe(
  connect(shared$ => shared$.pipe(
    buffer(shared$.pipe(
      debounceTime(2000)
    ))
  ))
)

connect creates a shared observable so that you can have multiple subscriptions on the source without actually opening those subscriptions to it.

In there I run a buffer, whose selector is the debounceTime of the same source, so that it debounces for that much (i.e. will emit the array when source$ doesn't emit for more than 2 seconds)

Maybe what you need is throttleTime(2000, { leading: false, trailing: true }) instead of debounceTime. It depends on your use case.

Upvotes: 2

Fan Cheung
Fan Cheung

Reputation: 11380

You can just add a filter operator to ignore the empty object emission

const source$ = eventSubject.asObservable();
source$.pipe(takeUntil(destroyed$), bufferTime(2000),filter(arr=>arr.length)).subscribe(
    data => {
      console.log(data);
    }
);

Upvotes: 1

Related Questions