Reputation: 3393
I have the following operators:
const prepare = (value$: Observable<string>) =>
value$.pipe(tap((x) => console.log("remove: ", x)), share());
const performTaskA = (removed$: Observable<string>) =>
removed$.pipe(tap((x) => console.log("pathA: ", x)),);
const performTaskB = (removed$: Observable<string>) =>
removed$.pipe(tap((x) => console.log("pathB: ", x)));
and I call them like this:
const prepared$ = value$.pipe(prepare);
const taskADone$ = prepared$.pipe(performTaskA);
const taskBDone$ = prepared$.pipe(performTaskB);
merge(taskADone$, taskBDone$).subscribe();
Due to the share
in prepare
I would expect 'remove' to be logged only once, however it appears twice.
Why is this not working?
Codesandbox: https://codesandbox.io/s/so-remove-fires-twice-iyk12?file=/src/index.ts
Upvotes: 2
Views: 426
Reputation: 96891
This is happening because your source Observable is of()
that just emits one next
notification and then complete
. Everything in RxJS in synchronous unless you work with time or you intentionally make your code asynchronous (eg. with Promise.resolve
or with asyncScheduler
).
In your demo, share()
receives one next
and one complete
notification immediately which makes its internal state to reset. It will also unsubscribe from its source Obserable because there are no more observers (the second source taskBDone$
you're merging has not subscribed yet). Then taskBDone$
is merged into the chain and share()
creates internally a new instance of Subject
and the whole process repeats.
These are the relevant parts in share()
:
complete
from source https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/share.ts#L120share()
resets its state https://github.com/ReactiveX/rxjs/blob/master/src/internal/operators/share.ts#L163So if your sources are going to be synchronous you should better use shareReplay()
(instead of share()
) that will just replay the entire sequence of events to every new observer.
Your updated demo: https://stackblitz.com/edit/rxjs-jawajw?devtoolsheight=60
Notice, that in your demo if you used of("TEST").pipe(delay(0))
as your source Observable it would work as you expected because delay(0)
would force asynchronous behavior and both source Observables would first subscribe and then in another JavaScript frame would emit their next
and complete
.
Upvotes: 4