Reputation: 440
I have two separate streams that come together in a combineLatest in something like the following:
const programState$ = Rx.Observable.combineLatest(
high$, low$,
(high, low) => {
return program(high, low);
});
This works fine and dandy but I also want to be able to reset both the high$ and the low$ to their initial state and only fire the program once. Those kind of look like the following:
const high$ = initialDataBranchOne$.merge(interactiveHigh$);
const low$ = initialDataBranchTwo$.merge(interactiveLow$);
Both of those come from an initialData stream that is fired fromEvent. While the program runs normally the combineLatest works great. How can I achieve the same result when the initialData fromEvent is fired? Right now the program gets run twice.
Upvotes: 1
Views: 236
Reputation: 3104
We can store the high
and low
properties in the same object. We can then perform a scan
as various events come in to update this state:
// Define your default high/low values
const defaultHighLow = /** **/;
// Different types of updates/actions
const highUpdate$ = high$.map(high => ({ high, type: 'UPDATE HIGH' }));
const lowUpdate$ = low$.map(low => ({ low, type: 'UPDATE LOW' }));
const resetUpdate$ = reset$.map(high => ({ type: 'RESET' }));
// Merge all the different types of actions to single stream
const update$ = Rx.Observable.merge(highUpdate$, lowUpdate$, resetUpdate$);
// Scan over these updates to modify the high/low values
const highLowState$ = update$.scan((state, update) => {
if (update.type === 'UPDATE HIGH') {
return { ...state, high: update.high };
}
if (update.type === 'UPDATE LOW') {
return { ...state, low: update.low };
}
// Return defaultHighLow if reset update is triggered
if (update.type === 'RESET') {
return defaultHighLow;
}
// Return state by default
return state;
}, defaultHighLow).startWith(defaultHighLow);
And finally we can derive the program state as before:
const programState$ = highLowState$.map(hl => program(hl.high, hl.low));
Upvotes: 2