Reputation: 735
I am trying to achieve the following behavior:
I have a source observable that emits every second.
I am trying to ignore values for some period(10 seconds) of time if another observable(mySubject
) emitted value.
This is what I came for:
this.source.pipe(
takeUntil(this.mySubject),
repeatWhen((observable) => observable.pipe(delay(10000))),
tap((x) => console.log(x)),
).subscribe();
Now it's stoping the emitting of the source for 10 seconds on every mySubject
emission.
The problem is that I need it if another emission of mySubject
to reset the "count" of the 10 seconds and ignore for another 10 seconds without emitting anything meanwhile.
How can I achieve this?
Upvotes: 3
Views: 2189
Reputation: 9367
This is a little tricky and this isn't (likely) the simplest solution, but you can do what I suggest below (you can also take a look at this Stackblitz demo):
// emits a number starting from 0 in every second
const sourceInterval = interval(1000);
// When semaphore emits, you will wait for two seconds
// The emissions should stop when it emits a true value
// and should resume when it emits a false value (after
// the minimum waiting interval has passed if it was in
// the middle of one
const semaphore = new Subject<boolean>();
// semaphore will emit in 6 seconds
setTimeout(() => semaphore.next(true), 6000);
sourceInterval
.pipe(
// emits an array containing the emission of sourceInterval and semaphore
withLatestFrom(semaphore.pipe(startWith(false))),
// exhaustMap maps the value to a inner observable and only accepts new
// values when it completes.
exhaustMap(([sourceIntervalEmission, semaphoreEmission]) =>
semaphoreEmission
// this observable completes only after 2 seconds, and release the
// semaphore before that
? new Observable<void>(subscriber => {
// release the semaphore
semaphore.next(false);
// complete the observable after 2 seconds
setTimeout(() => subscriber.complete(), 2000);
})
: of(sourceIntervalEmission) // reemits the sourceInterval value
)
).subscribe(console.log);
// expected output: 0 1 2 3 4 5 * * * 8 9 10...
// on the 6 emission of the sourceInterval, the semaphore emits, so exhaustMap
// gives the control to the `new Observable(...)` that will complete
// only after 2 seconds, releasing the semaphore and the subsequent emissions
// from the sourceInterval, starting at the 8th emission.
Upvotes: 1
Reputation: 96899
I'm afraid this requires a little more complicated solution:
const ignore$ = this.mySubject.pipe(
switchMap(() => merge(of(true), of(false).pipe(delay(10 * 1000)))),
);
this.source.pipe(
withLatestFrom(ignore$),
filter(([value, ignore]) => !ignore),
map(([value]) => value),
).subscribe(...);
Upvotes: 7