Reputation: 41
import { of, Subject, interval } from 'rxjs';
import {
tap,
startWith,
map,
delay,
publishReplay,
publish,
refCount,
withLatestFrom,
switchMap,
take
} from 'rxjs/operators';
const a$ = new Subject();
const b$ = a$.pipe(
map(a => {
console.log('expensive calculation');
return a;
}),
publishReplay(),
refCount()
);
const c$ = b$.pipe(withLatestFrom(a$));
b$.subscribe(b => console.log(`b$`));
c$.subscribe(c => console.log(`c$`)); // my problem, why c$ not emit value
of(0).subscribe(a$);
I don't know why 'c$' is not printed here is my online code, https://stackblitz.com/edit/rxjs-pc5y8d?devtoolsheight=60&file=index.ts
Upvotes: 4
Views: 905
Reputation: 41
Thanks for your answer!I still have some questions:
const c$ = b$.pipe(withLatestFrom(a$) /* (3) */);
b$.subscribe(observer('b$') /* (1) */);
c$.subscribe(observer('c$') /* (2) */);
in this example, can i explain like this?
when b$ is subscribe, it add a first subscriber in b$'s subscriber array
when c$ is subscribe, it first add a subscriber of a$, and add a second subscriber in b$'s subscriber array
when a$ emit an value, it first notify b$'s subscriber array, then notify withLatestFrom observable
when b$'s second subscriber is notified, withLatestFrom observable have not emit value, so it not print anything
@Andrei Gătej
Upvotes: 0
Reputation: 11944
I will start with an interesting observation: if you comment the line with where b$
is subscribed(b$.subscribe(observer('b$'));
), the next
callback of the c$
observer will be executed:
const a$ = new Subject();
const b$ = a$.pipe(
map(a => {
console.log('expensive calculation');
return a;
}),
publish(),
refCount()
);
function observer(name: string) {
return {
next: (value: number) => {
console.log(`observer ${name}: ${value}`);
},
complete: () => {
console.log(`observer ${name}: complete`);
}
};
}
const c$ = b$.pipe(withLatestFrom(a$));
// b$.subscribe(observer('b$'));
c$.subscribe(observer('c$'));
of(0).subscribe(a$);
/*
Console output:
expensive calculation
observer c$: 0,0
observer c$: complete
*/
Another observation is that the same next
callback will be invoked also if you change the order of the subscribers: first you subscribe to c$
, then to b$
:
const a$ = new Subject();
const b$ = a$.pipe(
map(a => {
console.log('expensive calculation');
return a;
}),
publish(),
refCount()
);
function observer(name: string) {
return {
next: (value: number) => {
console.log(`observer ${name}: ${value}`);
},
complete: () => {
console.log(`observer ${name}: complete`);
}
};
}
const c$ = b$.pipe(
withLatestFrom(a$)
);
c$.subscribe(observer('c$'));
b$.subscribe(observer('b$'));
of(0).subscribe(a$);
/*
Console output:
expensive calculation
observer c$: 0,0
observer b$: 0
observer c$: complete
observer b$: complete
*/
We will first understand the why behind these observations and then we will come up with a solution.
Firstly, it's important to know that a Subject
keeps track of its subscribers by using a subscribers list. When the subject emits a value(subject.next()
), all of the Subject's subscribers will receive that value, based on the order in which they have subscribed to the Subject.
In the first example:
const a$ = new Subject();
const b$ = a$.pipe(/* ... */);
const c$ = b$/* subscriber 2 */.pipe(withLatestFrom(a$/* subscriber 1 */));
// b$.subscribe(observer('b$'));
c$.subscribe(observer('c$'));
when c$
is subscribed, it will first subscribe to each argument passed to withLatestFrom
, then to b$
. But since b$
is based on a$
, it means that the a$
Subject will end up having 2 subscribers.
In order for withLatestFrom
to emit a value, 2 conditions must be fulfilled at the same time:
b$
) must emit a valuewithLatestFrom
's arguments(which are observables) have emitted at least onceIn this case, since the only withLatestFrom
's argument is subscribed first to a$
, all of the above conditions will be fulfilled and this is why observer c$: 0,0
will be printed to the console.
In the second example:
const a$ = new Subject();
const b$ = a$.pipe(/* ... */);
const c$ = b$.pipe(
withLatestFrom(a$)
);
c$.subscribe(observer('c$')); /* b$'s first subscription */
b$.subscribe(observer('b$')); /* b$'s second subscription */
of(0).subscribe(a$);
until b$.subscribe(observer('b$'));
, the same thing that I described above happens. What b$.subscribe(observer('b$'));
does it does is to add another subscriber to a Subject instance, but this time it won't be a$
, it will be subject that belongs to publish()
. When the first subscription takes place, the publish
will create a Subject instance and will add that new subscriber to it, but it will also subscribe to the source(this happens internally). Once again, this happens only on the first subscription. On subsequent subscriptions, the subscribers will be added to the Subject maintained by publish
.
The a$
Subject will have 2 subscribers too. One from withLatestFrom
and one from the b$
's first subscription.
So, in this case the console will output: observer c$: 0,0
and then observer b$: 0
.
The initial problem was that observer c$: 0,0
won't be in the console output.
const c$ = b$.pipe(withLatestFrom(a$) /* (3) */);
b$.subscribe(observer('b$') /* (1) */);
c$.subscribe(observer('c$') /* (2) */);
The reason it happens is because b$
is subscribed first, which means that a$
will have its first subscriber(here is where it happens). Also the publish
's Subject will be (1)
. Next, when c$
is subscribed, (2)
will become the second subscriber of the publish
's Subject. It's important to notice that a$
won't be subscribed again. However, a$
will get its second subscriber, which is (3)
.
When the a$
emits, the first subscriber to receive the value will be the ones caused by b$
, to that second condition of withLatestFrom
won't be fulfilled, since its source has emitted, but none of the withLatestFrom
's observables have emitted anything yet.
A solution would be this:
/* ... */
const c$ = b$.pipe(
delay(0),
withLatestFrom(a$)
);
b$.subscribe(observer('b$'));
c$.subscribe(observer('c$'));
of(0).subscribe(a$);
By using delay(0),
, we ensure that no matter the order of the subscribers, withLatestFrom
's observable will be the first to receive the value. This was needed because, in this situation, b$
is of the form a$.pipe(...)
and withLatestFrom
's argument is a$
. With delay(0)
, the withLatestFrom
's observable will always receive values first.
As a side note, observeOn(asyncScheduler)
could also be used, instead of delay(0)
. In both cases, the output is:
expensive calculation
observer b$: 0
observer b$: complete
observer c$: 0,0
observer c$: complete
Upvotes: 2
Reputation: 795
withLatestFrom
must emit a value first
so If you use BehaviorSubject you can do the following:
const _a = new BehaviorSubject<string>('Hello');
const a$ = _a.asObservable();
a$.subscribe();
and it should work.
Upvotes: 0