kasongoyo
kasongoyo

Reputation: 1876

How do I buffer until last stream in RxJs

So far I can buffer using buffer(observable) which buffer until provided observable emit or bufferCount(count) which wait until certain count of streams get emitted or using bufferTime(time) and bufferToggle() all of which does not solve my case which requires to buffer until the last emission and return all emissions as an array?

Upvotes: 1

Views: 1690

Answers (2)

Andrei Gătej
Andrei Gătej

Reputation: 11934

You could try this:

src$.pipe(
   buffer(notifier$.pipe(last(null, 'default value')))
)

last() will emit the latest emitted value when the source completes(ends). We're providing a default value in case the source did not emit anything, but you'd still want to get the collected values.


EDIT

if you want to buffer until the last emission of the source, you could try this:

src$ = src$.pipe(publish(), refCount());

src$.pipe(
   buffer(src$.pipe(last()))
);

publish() + refCount() will make sure the source won't be subscribed multiple times, by placing a Subject instance between the data producer and the data consumers.


EDIT 2

Here's a concrete example:

const src$ = fromEvent(document, 'click')
  .pipe(
    take(3),
    map((_, idx) => idx + 1), 
    publish(), 
    refCount(),
  );

src$.pipe(
  buffer(src$.pipe(last()))
).subscribe(console.warn); // after clicking 3 times: [1, 2, 3]

StackBlitz.

It's important that the source(src$) does not emit and does not complete synchronously. If this happens, when the observable passed to buffer completes, the outer observable will complete as well.

Upvotes: 1

Tal Ohana
Tal Ohana

Reputation: 1138

It sounds like you are looking for toArray operator.

From their docs:

Collects all source emissions and emits them as an array when the source completes.

Upvotes: 2

Related Questions