Will Hardwick-Smith
Will Hardwick-Smith

Reputation: 1049

How to preserve a 'complete' event across two RxJS observables?

I have an observable const numbers = from([1,2,3]) which will emit 1, 2, 3, then complete.

I need to map this to another observable e.g. like this:

const mapped = numbers.pipe(
   concatMap(number => Observable.create(observer => {
     observer.next(number);
   }))
 );

But now the resulting observable mapped emits 1, 2, 3 but not the complete event.

How can I preserve the complete event in mapped?

Upvotes: 2

Views: 150

Answers (1)

ariels - IGNORE AI
ariels - IGNORE AI

Reputation: 551

Your code gives me just "1" (with RxJS 6); are you sure you see 3 values?

Rx.from([1,2,3]).pipe(
   op.concatMap(number => Rx.Observable.create(observer => {
     observer.next(number);
   }))
 ).forEach(x => console.log(x)).then(() => console.log('done'))

You're never completing the created Observable (it emits one value but never calls observer.complete()). This works:

Rx.from([1,2,3]).pipe(
   op.concatMap(number => Rx.Observable.create(observer => {
     observer.next(number); observer.complete();
   }))
 ).forEach(x => console.log(x)).then(() => console.log('done'))

This all shows how hard it is to use Rx.Observable.create() correctly. The point of using Rx is to write your code using higher-level abstractions. A large part of this is preferring to use operators in preference to observers. E.g. in your case (which is admittedly simple):

Rx.from([1,2,3])
  .pipe(op.concatMap(number => Rx.of(number)))
  .forEach(x => console.log(x)).then(() => console.log('done'))

Upvotes: 2

Related Questions