Reputation: 381
I'm trying to block main stream until config is initialized using withLatestFrom
operator and secondary stream (which contains info about config state).
Here is my code:
@Effect()
public navigationToActiveRedemptionOffers = this.actions$.ofType(ROUTER_NAVIGATION)
.filter((action: RouterNavigationAction<RouterStateProjection>) => {
return action.payload.event.url.includes('/redemption-offers/list/active');
})
.do(() => console.log('before withLatestFrom'))
.withLatestFrom(
this.store.select(selectConfigInitialized)
.do(initialized => console.log('before config filter', initialized))
.filter(initialized => initialized)
.do(initialized => console.log('after config filter', initialized)),
)
.do(() => console.log('after withLatestFrom'))
.switchMap(data => {
const action = data[0] as RouterNavigationAction<RouterStateProjection>;
return [
new MapListingOptionsAction(action.payload.routerState.queryParams),
new LoadActiveOffersAction(),
];
});
Problem is that second do
(after withLatestFrom
) block never gets called.
Log:
Thank you for your suggestions.
Upvotes: 8
Views: 14936
Reputation: 23463
You can make it simpler by inverting the observables.
public navigationToActiveRedemptionOffers =
return this.store.select(selectConfigInitialized)
.filter(initialized => initialized)
.flatMap(x =>
return this.actions$.ofType(ROUTER_NAVIGATION)
.filter((action: RouterNavigationAction<RouterStateProjection>) => {
return action.payload.event.url.includes('/redemption-offers/list/active');
})
.map(action =>
return [
new MapListingOptionsAction(action.payload.routerState.queryParams),
new LoadActiveOffersAction(),
];
)
As this is a common pattern, I use a generic waitFor
method.
export const waitFor$ = function() {
return this.filter(data => !!data ).take(1);
};
Usage,
return this.config$
.waitFor$()
.flatMap(config => {
return observableWithRequiredValue()
}
Fluent syntax provided by
Observable.prototype.waitFor$ = waitFor$;
declare module 'rxjs/Observable' {
interface Observable<T> {
waitFor$: typeof waitFor$;
}
}
Note, this method of operator creation has been superseded by Lettable Operators in rxjs v5.5 (but the above still works).
Upvotes: 0
Reputation: 96891
The withLatestFrom
emits only when its source Observable emits. What happens is that this.store.select
emits before the source to withLatestFrom
. You can see it from the order of emissions:
before config filter false
before withLatestFrom
...
Now it depends on what you want to achieve. You can trigger this.store.select(selectConfigInitialized)
only when the source emits by using concatMap
:
navigationToActiveRedemptionOffers = this.actions$.ofType(ROUTER_NAVIGATION)
.filter(...)
.concatMap(() => this.store.select(selectConfigInitialized)...)
.switchMap(data => { ... })
...
Or you can pass along results from both Observables:
navigationToActiveRedemptionOffers = this.actions$.ofType(ROUTER_NAVIGATION)
.filter(...)
.concatMap(nav => this.store.select(selectConfigInitialized)
.map(result => [nav, result])
)
.switchMap(data => { ... })
...
Upvotes: 3
Reputation: 5988
I think what you want is .combineLatest
.
withLatestFrom
only treats what comes before it as the trigger.
combineLatest
treats both the source and the observables provided to it as the trigger.
So your issue is that the source (route change) fires and the observable passed to withLatest
(state initialized) has not emited yet because of its filter. It only emits a value after the source fires. So it is ignored.
Here is a running example of what you are doing:
const first = Rx.Observable.create((o) => {
setTimeout(() => {
o.next();
}, 2000);
});
const second = Rx.Observable.create((o) => {
setTimeout(() => {
o.next(false);
}, 1000);
setTimeout(() => {
o.next(true);
}, 3000);
});
first
.do(() => console.log('before withLatestFrom'))
.withLatestFrom(
second
.do(initialized => console.log('before config filter', initialized))
.filter(initialized => initialized)
.do(initialized => console.log('after config filter', initialized)),
)
.subscribe(() => console.log('after withLatestFrom'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
Here is with combineLatest
:
const first = Rx.Observable.create((o) => {
setTimeout(() => {
o.next();
}, 2000);
});
const second = Rx.Observable.create((o) => {
setTimeout(() => {
o.next(false);
}, 1000);
setTimeout(() => {
o.next(true);
}, 3000);
});
first
.do(() => console.log('before withLatestFrom'))
.combineLatest(
second
.do(initialized => console.log('before config filter', initialized))
.filter(initialized => initialized)
.do(initialized => console.log('after config filter', initialized)),
)
.subscribe(() => console.log('after withLatestFrom'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>
Upvotes: 14