Vedran
Vedran

Reputation: 11029

Rxjs nested subscribe with multiple inner subscriptions

Original promise based code I'm trying to rewrite:

parentPromise
    .then((parentResult) => {
        childPromise1
            .then(child1Result => child1Handler(parentResult, child1Result));
        childPromise2
            .then(child1Result => child2Handler(parentResult, child2Result));
        childPromise3
            .then(child1Result => child3Handler(parentResult, child3Result));
    });   

I'm trying to figure a way how to avoid the nested subscriptions anti-pattern in the following scenario:

parent$
    .pipe(takeUntil(onDestroy$))
    .subscribe((parentResult) => {
        child1$
            .pipe(takeUntil(onDestroy$))
            .subscribe(child1Result => child1Handler(parentResult, child1Result));
        child2$
            .pipe(takeUntil(onDestroy$))
            .subscribe(child2Result => child2Handler(parentResult, child2Result));
        child3$
            .pipe(takeUntil(onDestroy$))
            .subscribe(child3Result => child3Handler(parentResult, child3Result));
    });

What would be the correct 'RxJS way' to do this?

Upvotes: 1

Views: 536

Answers (4)

Picci
Picci

Reputation: 17762

If you were using Promises then the corresponding Observables emit only once and then complete.

If this is the case, you can use forkJoin to execute in parallel the child Observables.

So the code could look like

parent$.pipe(
   takeUntil(onDestroy$),
   // wait for parent$ to emit and then move on
   // the following forkJoin executes the child observables in parallel and emit when all children complete - the value emitted is an array with the 3 notifications coming from the child observables
   concatMap(parentResult => forkJoin(child1$, child2$, child3$)).pipe(
      // map returns both the parent and the children notificiations
      map(childrenResults => ({parentResult, childrenResults})
   )
).subscribe(
  ({parentResult, childrenResults}) => {
      child1Handler(parentResult, childrenResults[0]);
      child1Handler(parentResult, childrenResults[1]);
      child1Handler(parentResult, childrenResults[2]);
  }
)

Upvotes: 0

dmcgrandle
dmcgrandle

Reputation: 6060

How about using higher order observables? Something like this:

const parentReplay$ = parent$.pipe(shareReplay(1));

of(
  [child1$, child1Handler],
  [child2$, child2Handler],
  [child3$, child3Handler]
).pipe(
  mergeMap([child$, handler] => parentReplay$.pipe(
    mergeMap(parentResult => child$.pipe(
      tap(childResult => handler(parentResult, childResult))
    )
  )
).subscribe();

Upvotes: 0

mbojko
mbojko

Reputation: 14679

You can: 1) pass parent$ through share, and 2) use flatMap three times, something like:

const sharedParent$ = parent$.pipe(share());

sharedParent$.pipe(
    flatMap(parentResult => forkJoin(of(parentResult), child1$)), 
    takeUntil(onDestroy$)),
.subscribe((results) => child1Handler(...results)); // repeat for all children

(If there's more than 2 children, extracting that into a function with child stream and handler as parameters is a good idea).

That's following the original behavior of waiting with subscribing children until parent$ emits. If you don't need that, you can skip flatMap and just forkJoin sharedParent$ and children.

Upvotes: 0

Zazaeil
Zazaeil

Reputation: 4119

That seems pretty strange to me. You're creating new subscription for each child every time parentResult arrives. Even though those eventually indeed will be destroyed (assuming onDestroy$ implementation is correct), seems wrong.

You probably want withLatestFrom(parent$) and three separate pipes for each child.

It might look something like: child1$.pipe(takeUntil(globalDeath$), withLatestFrom(parent$)).subscribe(([childResult, parentResult]) => ...). Not sure if my JS is correct, can't test it at the moment; but the point is: you're getting the latest result from the parent$ every time child1$ fires. Note that you can reverse the direction if necessary (withLatestFrom(child1$)).

Upvotes: 1

Related Questions