Reputation: 428
I have a use case where I need an Observable to skip its next emission whenever another notifier Observable emits.
source: |---X---X---X---X---X---X---X---X---X---X--|>
notifier: |-------------X---------X----------X-------|>
result: |---X---X---X-------X---X-------X-------X--|>
Basically, I want an operator called skipNextWhen
that takes in the notifier observable and skips the next emission from the source.
I tried using an implementation that uses the pausable
operator (re-implemented using switchMap
), but couldn't get it to work.
pausable.ts
import { Observable } from 'rxjs/Observable';
import { Subject } from 'rxjs/Subject';
import 'rxjs/add/observable/never';
import 'rxjs/add/operator/startWith';
declare module 'rxjs/Observable' {
interface Observable<T> {
pausable: typeof pausable;
}
}
function pausable<T>(notifier: Observable<boolean>): Observable<T> {
return notifier.startWith(false).switchMap((paused) => {
if (paused) {
return Observable.never();
} else {
const source = new Subject();
this.subscribe(source);
return source;
}
});
}
Observable.prototype.pausable = pausable;
skipNextWhen.ts
import { Observable } from 'rxjs/Observable';
import './pausable';
declare module 'rxjs/Observable' {
interface Observable<T> {
skipNextWhen: typeof skipNextWhen;
}
}
function skipNextWhen<T, R>(other: Observable<T>): Observable<R> {
const notifier = Observable.merge(this.map(() => false),
other.map(() => true));
return this.pausable(notifier);
}
Observable.prototype.skipNextWhen = skipNextWhen;
Is there a more suitable operator that I should consider using instead? The behavior I'm seeing with my current implementation is that the result Observable emits once, and then never again - even if the notifier Observable never emits.
Upvotes: 11
Views: 2732
Reputation: 6029
I've started a (very) small library of some rxjs utils I've wanted. It happens to have a function to do exactly what you ask: skipAfter
. From the docs:
source: -1-----2-----3-----4-----5-|
skip$: ----0----------0-0----------
result: -1-----------3-----------5-|
The library is here: https://github.com/simontonsoftware/s-rxjs-utils
Upvotes: 5
Reputation: 96889
I can think of two solutions to this:
Using .filter()
, .do()
and a few side-effects.
This is mayne easier to understand solution even though it's not that "Rx" way:
function skipNextWhen(other) {
let skipNext = false;
return this.merge(other.do(() => skipNext = true).filter(() => false))
.filter(val => {
const doSkip = skipNext;
skipNext = false;
return !doSkip;
});
}
I'm using merge()
just to update skipNext
, other
's value is always ignored.
Using .scan()
:
This solution is without any state variables and side-effects.
function skipNextWhen(other) {
const SKIP = 'skip';
return this.merge(other.mapTo(SKIP))
.scan((acc, val) => {
if (acc === SKIP) {
return null;
} else if (val === SKIP) {
return SKIP;
} else {
return val;
}
}, [])
.filter(val => Boolean(val) && val !== SKIP);
}
Basically, when SKIP
arrives I return it right away because it's going to be passed again in acc
parameter by the scan()
operator and later ignored by filter()
.
If I receive a normal value but the previous value was SKIP
I ignore it and return just null
which is later filter away.
Both solutions give the same result:
Observable.prototype.skipNextWhen = skipNextWhen;
const source = Observable.range(1, 10)
.concatMap(val => Observable.of(val).delay(100));
source
.skipNextWhen(Observable.interval(350))
.subscribe(console.log);
This prints the following:
1
2
3
5
6
8
9
10
Just be aware that you're not in fact creating new operator. You just have a shortcut for an operator chain. This for example doesn't let you unsubscribe from other
when the source completes.
Upvotes: 6