Reputation: 6188
I'm trying to subscribe to an observable and log it's value.
Flow:
this.myObservable$ = new Subject<string>();
this.myObservable$.next('hello world') //in (angular) service A
wait a few seconds
let newObservable$ = this.myObservable$.asObservable().subscribe(message => console.log(message)); //subscribe in service B, nothing happens
But this didn't work since I subscribed too late. But even with a shareReplay this does not work.
let newObservable$ = this.myObservable$.asObservable()
.pipe(
shareReplay(1),
)
.subscribe(message => console.log(message)); //also nothing happens
Can anyone help me understand why I can't 'shareReplay' this observable and get the last emitted value?
Upvotes: 5
Views: 1620
Reputation: 14750
The reason shareReplay
isn't working for you is because you are applying it to the derived observable, rather than the source.
You should apply it to the source:
this.mySubject$ = new Subject<string>();
this.myObservable$ = this.mySubject.asObservable.pipe(shareReplay(1));
Now, your new observable will get the replayed value:
let newObservable$ = this.myObservable$.subscribe(
message => console.log(message)
);
In your case, you were applying the shareReplay
in the definition of newObservable
, which will not achieve what you wanted.
As others have mentioned, since you already have a subject as a source, you could simply use a ReplaySubject
:
this.mySubject$ = new ReplaySubject<string>(1);
Normally, in the context of an Angular service, you would keep the Subject
private on a service, and expose a readonly observable publicly to consumers. Something like this:
Service:
private subject$ = new Subject();
public readonly value$ = this.subject$.pipe(shareReplay()) // if you use .pipe(), you don't really need .asObservable()
Component:
data$ = this.service.value$;
Since the shareReplay
is defined on the source (value$
) observable, the component will always get the replayed emission.
Upvotes: 3
Reputation: 31145
I see multiple issues here
You could use asObservable
method from a multicast like Subject
to get the observable. See here for usage of asObservable
from RxJS' core dev.
In the assignment newObservable$ = this.myObservable$.subscribe(...)
you're actually assigning the subscription rather than the observable.
The issue at hand, shareReplay
doesn't appear to work because it obviously isn't piped to the source initially but rather later. It'll work for the subsequent subscriptions.
Check the following illustration
var { Subject, interval, fromEvent, of } = rxjs;
var { shareReplay } = rxjs.operators;
// Scenario 1 (analogous to OP's)
var subSource1 = new Subject();
var myObservable1$ = subSource1.asObservable();
myObservable1$.subscribe(m => console.log('Scenario 1: Initial subscription'));
subSource1.next('Hello world');
newObservable1$ = myObservable1$.pipe(
shareReplay(1),
)
newObservable1$.subscribe(m => console.log(`Scenario 1: ${m}`));
// Scenario 2
var subSource2 = new Subject();
var myObservable2$ = subSource2.asObservable().pipe(
shareReplay(1),
);
myObservable2$.subscribe(m => console.log('Scenario 2: Initial subscription'));
subSource2.next('Hello world');
newObservable2$ = myObservable2$;
newObservable2$.subscribe(m => console.log(`Scenario 2: ${m}`));
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>
Upvotes: 1