jeanpaul62
jeanpaul62

Reputation: 10591

Observables: Chaining promises & firing events on each promise

I just read this SO question RxJS Promise Composition (passing data) and had a question.

The setting is the same as the aforementioned question: I have 3 promises.

 const p1 = () => Promise.resolve(1);
 const p2 = x => { const val = x + 1; return Promise.resolve(val); };
 const p3 = x => {
      const isEven = x => x % 2 === 0;
      return Promise.resolve(isEven(x));
 };

And I'm chaining them, as per the answer, like this:

 var chainedPromises$ = 
     Rx.Observable.just()
             .flatMap(p1)
             .flatMap(p2)
             .flatMap(p3);

However, chainedPromises$ only fires up once, at the end of all the promise chain, with the resolved value true. Namely:

chainedPromises$.subscribe(function(x){console.log(x)}); // Logs `true`

How can I change chainedPromises$ so that it fires once at the end of each promise? Ie:

chainedPromises$.subscribe(function(x){console.log(x)});
// Logs `1`
// Logs `2`
// Logs `true`

Upvotes: 3

Views: 658

Answers (3)

frontsideair
frontsideair

Reputation: 528

Looks like you need to emit the generated value and use it in the next step of the chain. I don't know if there's sugar for this, but I got it working by making this distinction explicit.

Observable
  .fromPromise(p1())
  .flatMap(val => Observable.merge(
    Observable.fromPromise(p2(val))
      .flatMap(val => Observable.merge(
        Observable.fromPromise(p3(val)),
        Observable.of(val)
      )),
    Observable.of(val)
  ))
  .subscribe(console.log);

You can see it working.

Upvotes: 2

Keith
Keith

Reputation: 24241

For me you wasting the whole point of an observable here.

If you have a set of Promises you want a subscriber to receive as a stream you simply pass on to next, you now have a stream you can subscribe too.

Below is an example. ->

const p1 = () => Promise.resolve(1);
const p2 = x => { const val = x + 1; return  Promise.resolve(val); };
const p3 = x => {
  const isEven = x => x % 2 === 0;
  return Promise.resolve(isEven(x));
};

const chainedProm = (observer) => {
  let params;
  return async (p) => 
    observer.next(params = await p(params));
};

var observable = Rx.Observable.create(async function (observer) {
  const doProm = chainedProm(observer);
  await doProm(p1);
  await doProm(p2);
  await doProm(p3);
  observer.complete();
});


observable.subscribe(function(x){console.log(x)});
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.9/Rx.min.js"></script>

Upvotes: 1

martin
martin

Reputation: 96979

Just don't use mergeMap that is made to "map" one value to another and use concat that subscribes to an Observable only if the previous Observable completed. This way you'll achieve the same effect where you process Observables one at a time but each emission will be propagated:

Observable.concat(p1, p2, p3)
  .subscribe(console.log);

Edit:

Observable.of(null)
  .expand((value, index) => {
    switch (index) {
      case 0:
        return p1();
      case 1:
        return p2(value);
      case 2:
        return p3(value);
      default:
        return Observable.empty();
    }
  })
  .filter((value, index) => index > 0) // Ignore the `null` value that was necessary to start the chain
  .subscribe(console.log);

This prints:

1
2
true

See live demo (open console): https://stackblitz.com/edit/rxjs5-fucqhq?file=index.ts

Upvotes: 1

Related Questions