Ruslan Tereshchenko
Ruslan Tereshchenko

Reputation: 3

Why observable source does not emit values when used in race (or merge) but emits when I manually subscribe to it

I have three observable sources in my code that emit values of the same type.

    const setTitle$ = params$.do(
        params => this.titleService.setTitle( `${params[1].appname} - ${this.pagename}` )
    ).switchMap(
        () => Observable.of(true)
    );

    const openDocument$ = params$.switchMap(
        params => this.openDocument(params[0].id)
    );

    const saveDocument$ = params$.switchMap(
        params => this.saveDocument(params[0].id)
    );

When i use them in race like this

setTitle$.race(
    openDocument$,
    saveDocument$
).subscribe();

works only setTitle and when i subscribe manually to another two sorces like

const openDocument$ = params$.switchMap(
    params => this.openDocument(params[0].id)
).subscribe();

const saveDocument$ = params$.switchMap(
    params => this.saveDocument(params[0].id)
).subscribe();

then they work too. Help me understand why it's going on and how to force to work all sources in race, merge, etc.

Upvotes: 0

Views: 404

Answers (2)

Ruslan Tereshchenko
Ruslan Tereshchenko

Reputation: 3

Problem was with shared params source.

const params$ = this.route.params.map(
        routeParams => {

            return {
                id: <string>routeParams['id']
            };

        }
    ).combineLatest(
        this.config.getConfig()
    ).share();

I have shared it with share operator. But in this article from the first comment to my question i found this:

When using multiple async pipes on streams with default values, the .share() operator might cause problems:

The share() will publish the first value of the stream on the first subscription. The first async pipe will trigger that subscription and get that initial value. The second async pipe however will subscribe after that value has already been emitted and therefore miss that value.

The solution for this problem is the .shareReplay(1) operator, which will keep track of the previous value of the stream. That way all the async pipes will get the last value.

I replaced share() with shareReplay(1) and all sources began emitting values.

const params$ = this.route.params.map(
        routeParams => {

            return {
                id: <string>routeParams['id']
            };

        }
    ).combineLatest(
        this.config.getConfig()
    ).shareReplay(1);

Thanks to everyone for help!

Upvotes: 0

CozyAzure
CozyAzure

Reputation: 8468

From the documentation, the .race() operator does this:

The observable to emit first is used.

That is why, you will only get ONE emission, because only one out of the three observables that emits first will get emitted.

What you are looking for is .forkJoin() or .combineLatest().

If you want all the observables to execute in parallel and wait for ALL of them to come back as one observables, use .forkJoin():

Observable
    .forkJoin([...setTitle$, openDocument$, saveDocument$])
    .subscribe(([setTitle, openDocument, saveDocument]) => {
        //do something with your your results.
        //all three observables must be completed. If any of it was not completed, the other 2 observables will wait for it
    })

If you however wants to listen to every emission of all the observables regardless when they are emitted, use .combineLatest():

Observable
    .combineLatest(setTitle$, openDocument$, saveDocument$)
    .subscribe(([setTitle, openDocument, saveDocument]) => {
        //do something with your your results.
        // as long as any of the observables completed, it will be emitted here.
    });

Upvotes: 1

Related Questions