Reputation: 249
I have 2 observables :
What I'm trying to do is :
I have seen some similar problems resolved with withLatestFrom(inspector).filter(...)
but it does not work for me as I need all values emitted from the inspector observable during the debounce time.
I also tried a 'merge' operator but I only care about the source : if the inspector emits values but the source doesn't, the observable i'm trying to build shouldn't either.
Is there a way to achieve this using only observables ?
Upvotes: 1
Views: 641
Reputation: 20494
It is helpful to break this problem down. I don't quite understand what you're asking for, so here's my best impression:
The first observation I'd like to make (pardon the pun), is that you don't have to use debounceTime to have a debounce-like effect. I find that switchMap with a timer on the inside can produce the same results: SwitchMap will cancel previous emissions like debounce, and the timer can act to delay the emission.
My proposal is that you use switchMap from your source and then create an observable from the combination of a timer and your inspector. Use a filter operator so you only emit from the source if the inspector's last emitted result duration the duration of the timer was true.
this.source.pipe(
switchMap(x => // switchMap will cancel any emissions if the timer hasn't emitted yet.
// this combineLatest will only emit once - when the timer emits.
combineLatest([
timer(60000),
this.inspector.pipe(filter(x => !x), startWith(true))
]).pipe(
filter(([_, shouldEmit]) => shouldEmit),
mapTo(x) // emit source's value
)
)
)
switchMap(x =>
race(
timer(6000).pipe(mapTo(true)), // emit true after a minute.
this.inspector.pipe(filter(x => !x)) // only emit false
).pipe(
take(1), // this might not be necessary.
filter((shouldEmit) => shouldEmit),
mapTo(x) // emit source's value
)
)
Upvotes: 2
Reputation: 336
It could be solved by using the buffer
operator that raises notification only after required interval elapsed in case the blocker stream does not cancel it in advance.
source$.pipe(
buffer(source$.pipe(
exhaustMap(() => timer(10).pipe(
takeUntil(blocker$)
))
))
);
const {timer} = rxjs;
const {buffer, exhaustMap, takeUntil} = rxjs.operators;
const {TestScheduler} = rxjs.testing;
const {expect} = chai;
const test = (testName, testFn) => {
try {
testFn();
console.log(`Test PASS "${testName}"`);
} catch (error) {
console.error(`Test FAIL "${testName}"`, error.message);
}
}
const createTestScheduler = () => new TestScheduler((actual, expected) => {
expect(actual).deep.equal(expected);
});
const createTestStream = (source$, blocker$) => {
return source$.pipe(
buffer(source$.pipe(
exhaustMap(() => timer(10).pipe(
takeUntil(blocker$)
))
))
);
}
const testStream = ({ marbles, values}) => {
const testScheduler = createTestScheduler();
testScheduler.run((helpers) => {
const { cold, hot, expectObservable } = helpers;
const source$ = hot(marbles.source);
const blocker$ = hot(marbles.blocker);
const result$ = createTestStream(source$, blocker$)
expectObservable(result$).toBe(marbles.result, values.result);
});
}
test('should buffer changes with 10ms delay', () => {
testStream({
marbles: {
source: ' ^-a-b 7ms ---c 9ms -----| ',
blocker: '^- 10ms --- 10ms -----| ',
result: ' -- 10ms i-- 10ms j----(k|)',
},
values: {
result: {
i: ['a', 'b'],
j: ['c'],
k: [],
},
}
});
});
test('should block buffer in progress and move values to next one', () => {
testStream({
marbles: {
source: ' ^-a-b 7ms ---c 9ms -----| ',
blocker: '^- 8ms e---- 10ms -----| ',
result: ' -- 10ms --- 10ms j----(k|)',
},
values: {
result: {
j: ['a', 'b', 'c'],
k: [],
},
}
});
});
<script src="https://cdnjs.cloudflare.com/ajax/libs/chai/4.1.2/chai.js"></script>
<script src="https://unpkg.com/rxjs@^7/dist/bundles/rxjs.umd.min.js"></script>
Upvotes: 0