Reputation: 11819
I am sure that I should be able to find the answer to my problem in the documentation of rxjs, but I am getting nowhere...
Problem: I have two streams, one is more important than the other. Both provide the same information. But in case stream A provides a value for the first time, I want to completely dismiss stream B, because it is now outdated.
It is kind of a cache-thing: provide a value from the cache, but as soon as there is a live value, update the cache and do not read form it anymore.
I have tried several combinations piping the two streams, but I cannot cancel B after A has published...
Which rx-operator could provide the functionality that I need?
Upvotes: 3
Views: 128
Reputation: 6234
If I understand correctly, you can merge the two streams and take the first that answers:
const interval1$ = interval(1000);
const interval2$ = interval(800);
const source1 = interval1$.pipe(
map(x => { return { id: 1, value: x } }),
take(5)
);
const source2 = interval2$.pipe(
map(x => { return { id: 2, value: x } }),
take(5)
);
const s = merge(source1, source2);
const subscribe = s.pipe(take(1)).subscribe(x => console.log(x));
This prints the first that answers, that is the first element of the second stream in this case.
Upvotes: 1
Reputation: 96889
It looks like you can just use merge
and takeUntil
:
merge(
streamA$.pipe(
takeUntil(streamB$)
),
streamB$
)
When streamB$
emits anything then streamA$
will be ignored because takeUntil
will complete the chain.
Upvotes: 4