Anne
Anne

Reputation: 249

Filter and debounce an observable using values from another observable

I have 2 observables :

What I'm trying to do is :

I have seen some similar problems resolved with withLatestFrom(inspector).filter(...) but it does not work for me as I need all values emitted from the inspector observable during the debounce time.

I also tried a 'merge' operator but I only care about the source : if the inspector emits values but the source doesn't, the observable i'm trying to build shouldn't either.

Is there a way to achieve this using only observables ?

Upvotes: 1

Views: 641

Answers (2)

Daniel Gimenez
Daniel Gimenez

Reputation: 20494

It is helpful to break this problem down. I don't quite understand what you're asking for, so here's my best impression:

  • Source emits a value.
  • After that initial emission we start listening to the inspector for true values.
  • Once the debounce time is passed we emit a value if the inspector only emitted true.

The first observation I'd like to make (pardon the pun), is that you don't have to use debounceTime to have a debounce-like effect. I find that switchMap with a timer on the inside can produce the same results: SwitchMap will cancel previous emissions like debounce, and the timer can act to delay the emission.

My proposal is that you use switchMap from your source and then create an observable from the combination of a timer and your inspector. Use a filter operator so you only emit from the source if the inspector's last emitted result duration the duration of the timer was true.

this.source.pipe(
  switchMap(x => // switchMap will cancel any emissions if the timer hasn't emitted yet.
    // this combineLatest will only emit once - when the timer emits.
    combineLatest([
        timer(60000), 
        this.inspector.pipe(filter(x => !x), startWith(true))
    ]).pipe( 
      filter(([_, shouldEmit]) => shouldEmit), 
      mapTo(x) // emit source's value
    )
  )
)
  • Note: startWith is used in the inspector's pipe so that at least one result is emitted. This guarantees that there will be a single emission once timer emits. The filter is on the inspector since all you care about is if a false result prevents emission.
  • If you don't want to force the user to wait a minute, you could just use race instead of combineLatest. It will emit the result from the first observable that emits. So you can have the timer emit true after a minute, and the inspector emit any false result.
switchMap(x =>
  race(
    timer(6000).pipe(mapTo(true)), // emit true after a minute.
    this.inspector.pipe(filter(x => !x)) // only emit false
  ).pipe(
    take(1), // this might not be necessary.
    filter((shouldEmit) => shouldEmit), 
    mapTo(x) // emit source's value
  )
)

Upvotes: 2

Lubomir Jurecka
Lubomir Jurecka

Reputation: 336

It could be solved by using the buffer operator that raises notification only after required interval elapsed in case the blocker stream does not cancel it in advance.

source$.pipe(
  buffer(source$.pipe(
    exhaustMap(() => timer(10).pipe(
      takeUntil(blocker$)
    ))
  ))
);

const {timer} = rxjs;
const {buffer, exhaustMap, takeUntil} = rxjs.operators;
const {TestScheduler} = rxjs.testing;
const {expect} = chai;

const test = (testName, testFn) => {
  try {
    testFn();
    console.log(`Test PASS "${testName}"`);
  } catch (error) {
    console.error(`Test FAIL "${testName}"`, error.message);
  }
}

const createTestScheduler = () => new TestScheduler((actual, expected) => {
  expect(actual).deep.equal(expected);
});

const createTestStream = (source$, blocker$) => {
  return source$.pipe(
    buffer(source$.pipe(
      exhaustMap(() => timer(10).pipe(
        takeUntil(blocker$)
      ))
    ))
  );
}

const testStream = ({ marbles, values}) => {
  const testScheduler = createTestScheduler();
  testScheduler.run((helpers) => {
    const { cold, hot, expectObservable } = helpers;
    const source$ = hot(marbles.source);
    const blocker$ = hot(marbles.blocker);
    const result$ = createTestStream(source$, blocker$)
    expectObservable(result$).toBe(marbles.result, values.result);
  });
}

test('should buffer changes with 10ms delay', () => {
  testStream({
    marbles: {
      source: ' ^-a-b 7ms ---c 9ms -----|   ',
      blocker: '^-   10ms --- 10ms -----|   ',
      result: ' --   10ms i-- 10ms j----(k|)',
    },
    values: {
      result: {
        i: ['a', 'b'],
        j: ['c'],
        k: [],
      },
    }
  });
});

test('should block buffer in progress and move values to next one', () => {
  testStream({
    marbles: {
      source: ' ^-a-b 7ms ---c 9ms -----|   ',
      blocker: '^-  8ms e---- 10ms -----|   ',
      result: ' --   10ms --- 10ms j----(k|)',
    },
    values: {
      result: {
        j: ['a', 'b', 'c'],
        k: [],
      },
    }
  });
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/chai/4.1.2/chai.js"></script>
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>

Upvotes: 0

Related Questions