Vincent
Vincent

Reputation: 6188

fixing a racecondition on rxjs subscription with shareReplay

I'm trying to subscribe to an observable and log it's value.

Flow:

this.myObservable$ = new Subject<string>();
this.myObservable$.next('hello world') //in (angular) service A 

wait a few seconds

let newObservable$ = this.myObservable$.asObservable().subscribe(message => console.log(message)); //subscribe in service B, nothing happens

But this didn't work since I subscribed too late. But even with a shareReplay this does not work.

let newObservable$ = this.myObservable$.asObservable()
      .pipe(
        shareReplay(1),
      )
      .subscribe(message => console.log(message)); //also nothing happens

Can anyone help me understand why I can't 'shareReplay' this observable and get the last emitted value?

Upvotes: 5

Views: 1620

Answers (2)

BizzyBob
BizzyBob

Reputation: 14750

The reason shareReplay isn't working for you is because you are applying it to the derived observable, rather than the source.

You should apply it to the source:

this.mySubject$ = new Subject<string>();
this.myObservable$ = this.mySubject.asObservable.pipe(shareReplay(1));

Now, your new observable will get the replayed value:

let newObservable$ = this.myObservable$.subscribe(
   message => console.log(message)
);

In your case, you were applying the shareReplay in the definition of newObservable, which will not achieve what you wanted.

As others have mentioned, since you already have a subject as a source, you could simply use a ReplaySubject:

this.mySubject$ = new ReplaySubject<string>(1);

Normally, in the context of an Angular service, you would keep the Subject private on a service, and expose a readonly observable publicly to consumers. Something like this:

Service:

private subject$ = new Subject();
public readonly value$ = this.subject$.pipe(shareReplay()) // if you use .pipe(), you don't really need .asObservable()

Component:

data$ = this.service.value$;

Since the shareReplay is defined on the source (value$) observable, the component will always get the replayed emission.

Upvotes: 3

Barremian
Barremian

Reputation: 31145

I see multiple issues here

  1. You could use asObservable method from a multicast like Subject to get the observable. See here for usage of asObservable from RxJS' core dev.

  2. In the assignment newObservable$ = this.myObservable$.subscribe(...) you're actually assigning the subscription rather than the observable.

  3. The issue at hand, shareReplay doesn't appear to work because it obviously isn't piped to the source initially but rather later. It'll work for the subsequent subscriptions.

Check the following illustration

var { Subject, interval, fromEvent, of } = rxjs;
var { shareReplay } = rxjs.operators;

// Scenario 1 (analogous to OP's)

var subSource1 = new Subject();
var myObservable1$ = subSource1.asObservable();
myObservable1$.subscribe(m => console.log('Scenario 1: Initial subscription'));
subSource1.next('Hello world');
newObservable1$ = myObservable1$.pipe(
  shareReplay(1),
)
newObservable1$.subscribe(m => console.log(`Scenario 1: ${m}`));

// Scenario 2

var subSource2 = new Subject();
var myObservable2$ = subSource2.asObservable().pipe(
  shareReplay(1),
);
myObservable2$.subscribe(m => console.log('Scenario 2: Initial subscription'));
subSource2.next('Hello world');
newObservable2$ = myObservable2$;
newObservable2$.subscribe(m => console.log(`Scenario 2: ${m}`));
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>

Upvotes: 1

Related Questions