Jake12342134
Jake12342134

Reputation: 1767

Merge operator only emits the results of the first observable in list

I'm trying to use the merge operator to combine four different observables that all emit the same type T. I want to merge the four observables so that I have a single observable that emits T[] and then I can map the resultant list.

However I've noticed that when the merged observable is being subscribed to, only the results of the first observable in the list are being emitted. Why is this?

I thought it might have something to do with the map that's being used, but if the result of the merged observable is being emitted as a single stream this shouldn't be an issue, surely?

this._signalRService.categoryGroupChangeSignal.pipe(
          takeUntil(this.destroyObservables),
          switchMap(changedCategoryGroupId => {
                      return this.getAllCategoryGroups$().pipe(
                          map(groups => [changedCategoryGroupId, groups])
                      );
              }
          )).subscribe(([changedCategoryGroupId, groups]: [string, CategoryGroup[]]) => {
          //do stuff with merged groups list 
      });

getAllCategoryGroups$ = (): Observable<CategoryGroup[]> => {

        return this.tenantListService.tenantList.pipe(switchMap( (tenantList: Tenant[]) => {
          return merge(
            this._categoryGroupService.getCustomTenantCategoryGroups(tenantList[0].id),
            this._categoryGroupService.getCustomTenantCategoryGroups(tenantList[1].id),
            this._categoryGroupService.getCustomTenantCategoryGroups(tenantList[2].id)
            )
        }));
  };

Upvotes: 0

Views: 1310

Answers (2)

Derrick Awuah
Derrick Awuah

Reputation: 399

If you want a single observable that emits T[], I would recommend using combineLatest. CombineLatest will wait till everything has emitted a value then emit an array of all of the values, and every time a new value is emitted the most recent values for all of them will be emitted. Whereas merge emits them one at a time and only the observable that emits will give you a value.

It is important to note that combineLatest will not emit any values until EVERY observable you provided it, in your case the 3 different ones, have emitted at least 1 value.

The syntax for combineLatest is the same so you would simply just have to add combineLatest to your rxjs imports and change merge to combineLatest in your getAllCategoryGroups$.

getAllCategoryGroups$ = (): Observable<CategoryGroup[]> => {

    return this.tenantListService.tenantList.pipe(switchMap( (tenantList: Tenant[]) => {
      return combineLatest(
        this._categoryGroupService.getCustomTenantCategoryGroups(tenantList[0].id),
        this._categoryGroupService.getCustomTenantCategoryGroups(tenantList[1].id),
        this._categoryGroupService.getCustomTenantCategoryGroups(tenantList[2].id)
        )
    }));

};

Then when you subscribe to it you will receive an array with the same length as the number of observables in your combineLatest

To flatten the array of arrays you can use the map operator which allows you to alter the output the is passed down the observable chain. Once inside of the map there are various ways you can flatten the array I usually just do something like this though:

getAllCategoryGroups$ = (): Observable<CategoryGroup[]> => {

    return this.tenantListService.tenantList.pipe(switchMap( (tenantList: Tenant[]) => {
      return combineLatest(
        this._categoryGroupService.getCustomTenantCategoryGroups(tenantList[0].id),
        this._categoryGroupService.getCustomTenantCategoryGroups(tenantList[1].id),
        this._categoryGroupService.getCustomTenantCategoryGroups(tenantList[2].id)
        ).pipe(
             map(lists => {
                const arrayToReturn = [];
                for (const list of lists) {
                    arrayToReturn.push(...list);
                }

                return arrayToReturn;
             })
        )
    }));

};

Upvotes: 0

Max K
Max K

Reputation: 1121

The merge operator does not output an Array with the values of all your observables. It emits a single value if any of your source observables emits a new value.

What you want to achieve can be done with combineLatest:

return this.tenantListService.tenantList.pipe(switchMap( (tenantList: Tenant[]) => {
          return combineLatest(
            this._categoryGroupService.getCustomTenantCategoryGroups(tenantList[0].id),
            this._categoryGroupService.getCustomTenantCategoryGroups(tenantList[1].id),
            this._categoryGroupService.getCustomTenantCategoryGroups(tenantList[2].id)
            )
        }));

This will output an array with the latest values of your source observables. But it only emits if all of your source observables emitted at least one value!

Upvotes: 0

Related Questions