Sven van den Boogaart
Sven van den Boogaart

Reputation: 12323

rxjs how to complete observable?

To learn rxjs im playing around with it. My code:

// RxJS v6+
import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs';

const source = interval(1000);

const example = source.pipe(
  map(value => value +1),
  map(value => {
    if(value === 40) {
      finish();
    }
    else if (value % 5 === 0){
      return 'can devide by 5 we did some magic';
  }else{
     return value;
  } })
);
const subscribe = example.subscribe(
  val => console.log(val), 
  error => console.log("Error handled: " , error), 
  () => console.log('resolved'));

My idea was to run it 40 time and than finish the observable (it could be another requirement e.g. see if the value is 10 at 10:00 (main goal is to do an evaluation with value and force a finish)). Im looking for an alternative to the placeholder finish() because finish does not exist. How can I get to the resolve function () => console.log('resolved') of the subscribe method?

I found How can I complete Observable in RxJS but the answer is from 2015 and im assuming by now there is an answer for the current rxjs version.

Upvotes: 7

Views: 15867

Answers (4)

BizzyBob
BizzyBob

Reputation: 14740

I know this is an old question, but there is a clear solution that hasn't been mentioned. To complete an observable based on a condition, simply use the takeWhile operator:

const source = interval(1000);

const example = source.pipe(
  takeWhile(n => n < 40),
  map(n => n % 5 ? n : 'can divide by 5 we did some magic')
);

All of the take* operators will cause the observable to complete:

  • takeWhile - complete when the given condition fails
  • takeUntil - complete when the given observable emits
  • take - complete when the given number of emissions has occurred

In the above example, since we just want the first 40 values, we could simply use take(40) instead of takeWhile(n => n < 40).

The takeWhile operator can be used when the expression is more complex:

takeWhile(n => n < 40 && !isTimeExpired())

Upvotes: 2

Tony
Tony

Reputation: 20082

Actually is still the same you only need to use pipe operator. You can view example here

import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

const source = interval(1000);
const timer$ = timer(5000);
const example = source.pipe(takeUntil(timer$));
const subscribe = example.subscribe(val => console.log(val));

Upvotes: 8

Muhammed Albarmavi
Muhammed Albarmavi

Reputation: 24404

both answers mention takeuntil and take are correct but another way is to use the subscription object to unsubscribe it just another option

const subx= example.subscribe(val =>  { 
   console.log(val); 
   if (val == 40) {
    subx.unsubscribe() 
   }
 });

demo 🚀

Updated 😎

in case you have a many subscribers and you want to put the condtion that complate a source observable take operator can do the job here

const source = interval(1000).pipe(take(5)); // 👈

source.pipe(map(res => res * 10)).subscribe(val => {
  console.log("🤯", val);
});

source.subscribe(val => {
  console.log(val);
});

demo 👓

Upvotes: 3

MoxxiManagarm
MoxxiManagarm

Reputation: 9124

My idea was to run it 40 time`

For that you can add take(40). In general there are several operators like take that can complete an observable. Check out https://www.learnrxjs.io/operators/filtering/take.html

// RxJS v6+
import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs';

const source = interval(1000);

const example = source.pipe(
  take(40),
  map(value => value +1),
  map(value => {
    if(value === 40) {
      finish();
    }
    else if (value % 5 === 0){
      return 'can devide by 5 we did some magic';
  }else{
     return value;
  } })
);
const subscribe = example.subscribe(
  val => console.log(val), 
  error => console.log("Error handled: " , error), 
  () => console.log('resolved'));

Upvotes: 2

Related Questions