Reputation: 386
I have outward stream of drawing from an user and I need wait till data of the user profile come.
So in general I want buffer data till an event happens and then play this data and skip buffering.
I can do it with external array like the follow code show (stackblitz):
import { of, interval } from 'rxjs';
import { map, take } from 'rxjs/operators';
const numbers = interval(1000);
const source = numbers.pipe(
take(4)
);
let allowPerform = false;
setTimeout(_=>{allowPerform = true},2001);
const fifo=[];
source.subscribe(x => {
fifo.push(x);
if (!allowPerform) {
console.log('skip perform for:', x);
return;
}
let item ;
while ( fifo.length) {
item = fifo.shift();
console.log('perform for:', item);
}
});
it's output:
skip perform for: 0
skip perform for: 1
perform for: 0
perform for: 1
perform for: 2
perform for: 3
However how to do it in RXjs's way?
Upvotes: 1
Views: 741
Reputation: 11934
Here could be a way to do this:
// Buffer until the `notifier` emits
// Then do not buffer anymore, just send the values
const src$ = interval(1000).pipe(take(10), publish(), refCount());
const notifier$ = timer(2001);
concat(
src$.pipe(buffer(notifier$), mergeAll()),
src$
).subscribe(console.log, null, () => console.warn('complete'));
Using publish(), refCount()
will multicast the emitted values to all the consumers. This is achieved by placing a Subject
between the source and the data consumers, meaning that the source won't be subscribed multiple times.
src$.pipe(buffer(notifier$), mergeAll()),
will buffer until notifier$
emits. But, because notifier$
also completes, the entire observable passed will complete, allowing for the next observable(src$
) to be subscribed.
mergeAll()
is used because buffer
will emit an array of the collected values and with mergeAll()
we can get the values separately.
Upvotes: 2