Reputation: 12323
To learn rxjs im playing around with it. My code:
// RxJS v6+
import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs';
const source = interval(1000);
const example = source.pipe(
map(value => value +1),
map(value => {
if(value === 40) {
finish();
}
else if (value % 5 === 0){
return 'can devide by 5 we did some magic';
}else{
return value;
} })
);
const subscribe = example.subscribe(
val => console.log(val),
error => console.log("Error handled: " , error),
() => console.log('resolved'));
My idea was to run it 40 time and than finish the observable (it could be another requirement e.g. see if the value is 10 at 10:00 (main goal is to do an evaluation with value and force a finish)).
Im looking for an alternative to the placeholder finish() because finish does not exist. How can I get to the resolve function () => console.log('resolved')
of the subscribe method?
I found How can I complete Observable in RxJS but the answer is from 2015 and im assuming by now there is an answer for the current rxjs version.
Upvotes: 7
Views: 15867
Reputation: 14740
I know this is an old question, but there is a clear solution that hasn't been mentioned. To complete an observable based on a condition, simply use the takeWhile
operator:
const source = interval(1000);
const example = source.pipe(
takeWhile(n => n < 40),
map(n => n % 5 ? n : 'can divide by 5 we did some magic')
);
All of the take*
operators will cause the observable to complete:
takeWhile
- complete when the given condition failstakeUntil
- complete when the given observable emitstake
- complete when the given number of emissions has occurredIn the above example, since we just want the first 40 values, we could simply use take(40)
instead of takeWhile(n => n < 40)
.
The takeWhile operator can be used when the expression is more complex:
takeWhile(n => n < 40 && !isTimeExpired())
Upvotes: 2
Reputation: 20082
Actually is still the same you only need to use pipe operator. You can view example here
import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
const source = interval(1000);
const timer$ = timer(5000);
const example = source.pipe(takeUntil(timer$));
const subscribe = example.subscribe(val => console.log(val));
Upvotes: 8
Reputation: 24404
both answers mention takeuntil and take are correct but another way is to use the subscription object to unsubscribe it just another option
const subx= example.subscribe(val => {
console.log(val);
if (val == 40) {
subx.unsubscribe()
}
});
Updated 😎
in case you have a many subscribers and you want to put the condtion that complate a source observable take operator can do the job here
const source = interval(1000).pipe(take(5)); // 👈
source.pipe(map(res => res * 10)).subscribe(val => {
console.log("🤯", val);
});
source.subscribe(val => {
console.log(val);
});
Upvotes: 3
Reputation: 9124
My idea was to run it 40 time`
For that you can add take(40)
. In general there are several operators like take
that can complete an observable. Check out https://www.learnrxjs.io/operators/filtering/take.html
// RxJS v6+
import { withLatestFrom, map } from 'rxjs/operators';
import { interval } from 'rxjs';
const source = interval(1000);
const example = source.pipe(
take(40),
map(value => value +1),
map(value => {
if(value === 40) {
finish();
}
else if (value % 5 === 0){
return 'can devide by 5 we did some magic';
}else{
return value;
} })
);
const subscribe = example.subscribe(
val => console.log(val),
error => console.log("Error handled: " , error),
() => console.log('resolved'));
Upvotes: 2