Moshe Yamini
Moshe Yamini

Reputation: 735

RXJS: Skipping values from observable triggered by other observable

I am trying to achieve the following behavior: I have a source observable that emits every second. I am trying to ignore values for some period(10 seconds) of time if another observable(mySubject) emitted value. This is what I came for:

this.source.pipe(
      takeUntil(this.mySubject),
      repeatWhen((observable) => observable.pipe(delay(10000))),
      tap((x) => console.log(x)),
).subscribe();

Now it's stoping the emitting of the source for 10 seconds on every mySubject emission.

The problem is that I need it if another emission of mySubject to reset the "count" of the 10 seconds and ignore for another 10 seconds without emitting anything meanwhile.

How can I achieve this?

Upvotes: 3

Views: 2189

Answers (2)

julianobrasil
julianobrasil

Reputation: 9367

This is a little tricky and this isn't (likely) the simplest solution, but you can do what I suggest below (you can also take a look at this Stackblitz demo):

// emits a number starting from 0 in every second
const sourceInterval = interval(1000);

// When semaphore emits, you will wait for two seconds
// The emissions should stop when it emits a true value
// and should resume when it emits a false value (after 
// the minimum waiting interval has passed if it was in
// the middle of one
const semaphore = new Subject<boolean>();

// semaphore will emit in 6 seconds
setTimeout(() => semaphore.next(true), 6000);

sourceInterval
  .pipe(
    // emits an array containing the emission of sourceInterval and semaphore
    withLatestFrom(semaphore.pipe(startWith(false))),

    // exhaustMap maps the value to a inner observable and only accepts new
    //   values when it completes.
    exhaustMap(([sourceIntervalEmission, semaphoreEmission]) =>
      semaphoreEmission
          // this observable completes only after 2 seconds, and release the
          // semaphore before that
        ? new Observable<void>(subscriber => {
            // release the semaphore
            semaphore.next(false);
            // complete the observable after 2 seconds
            setTimeout(() => subscriber.complete(), 2000);
          })
        : of(sourceIntervalEmission) // reemits the sourceInterval value
    )
  ).subscribe(console.log);

// expected output: 0 1 2 3 4 5 * * * 8 9 10...
// on the 6 emission of the sourceInterval, the semaphore emits, so exhaustMap
// gives the control to the `new Observable(...)` that will complete
// only after 2 seconds, releasing the semaphore and the subsequent emissions 
// from the sourceInterval, starting at the 8th emission.

Upvotes: 1

martin
martin

Reputation: 96899

I'm afraid this requires a little more complicated solution:

const ignore$ = this.mySubject.pipe(
  switchMap(() => merge(of(true), of(false).pipe(delay(10 * 1000)))),
);

this.source.pipe(
  withLatestFrom(ignore$),
  filter(([value, ignore]) => !ignore),
  map(([value]) => value),
).subscribe(...);

Upvotes: 7

Related Questions