withLatestFrom does not emit value

I'm trying to block main stream until config is initialized using withLatestFrom operator and secondary stream (which contains info about config state).

Here is my code:

@Effect()
public navigationToActiveRedemptionOffers = this.actions$.ofType(ROUTER_NAVIGATION)
  .filter((action: RouterNavigationAction<RouterStateProjection>) => {
    return action.payload.event.url.includes('/redemption-offers/list/active');
  })
  .do(() => console.log('before withLatestFrom'))
  .withLatestFrom(
    this.store.select(selectConfigInitialized)
      .do(initialized => console.log('before config filter', initialized))
      .filter(initialized => initialized)
      .do(initialized => console.log('after config filter', initialized)),
  )
  .do(() => console.log('after withLatestFrom'))
  .switchMap(data => {
    const action = data[0] as RouterNavigationAction<RouterStateProjection>;

    return [
      new MapListingOptionsAction(action.payload.routerState.queryParams),
      new LoadActiveOffersAction(),
    ];
  });

Problem is that second do (after withLatestFrom) block never gets called. Log:

Thank you for your suggestions.

Upvotes: 8

Views: 14936

Answers (3)

Richard Matsen
Richard Matsen

Reputation: 23463

You can make it simpler by inverting the observables.

public navigationToActiveRedemptionOffers = 

  return this.store.select(selectConfigInitialized)
    .filter(initialized => initialized)      
    .flatMap(x => 
      return this.actions$.ofType(ROUTER_NAVIGATION)
        .filter((action: RouterNavigationAction<RouterStateProjection>) => {
          return action.payload.event.url.includes('/redemption-offers/list/active');
        })
    .map(action =>
      return [
        new MapListingOptionsAction(action.payload.routerState.queryParams),
        new LoadActiveOffersAction(),
      ];
    )

As this is a common pattern, I use a generic waitFor method.

export const waitFor$ = function() {
  return this.filter(data => !!data ).take(1);
};

Usage,

return this.config$
  .waitFor$()
  .flatMap(config => {
    return observableWithRequiredValue()
  }

Fluent syntax provided by

Observable.prototype.waitFor$ = waitFor$;

declare module 'rxjs/Observable' {
  interface Observable<T> {
    waitFor$: typeof waitFor$;
  }
}

Note, this method of operator creation has been superseded by Lettable Operators in rxjs v5.5 (but the above still works).

Upvotes: 0

martin
martin

Reputation: 96891

The withLatestFrom emits only when its source Observable emits. What happens is that this.store.select emits before the source to withLatestFrom. You can see it from the order of emissions:

  • before config filter false

  • before withLatestFrom

  • ...

Now it depends on what you want to achieve. You can trigger this.store.select(selectConfigInitialized) only when the source emits by using concatMap:

navigationToActiveRedemptionOffers = this.actions$.ofType(ROUTER_NAVIGATION)
  .filter(...)
  .concatMap(() => this.store.select(selectConfigInitialized)...)
  .switchMap(data => { ... })
  ...

Or you can pass along results from both Observables:

navigationToActiveRedemptionOffers = this.actions$.ofType(ROUTER_NAVIGATION)
  .filter(...)
  .concatMap(nav => this.store.select(selectConfigInitialized)
    .map(result => [nav, result])
  )
  .switchMap(data => { ... })
  ...

Upvotes: 3

bygrace
bygrace

Reputation: 5988

I think what you want is .combineLatest.

withLatestFrom only treats what comes before it as the trigger. combineLatest treats both the source and the observables provided to it as the trigger.

So your issue is that the source (route change) fires and the observable passed to withLatest (state initialized) has not emited yet because of its filter. It only emits a value after the source fires. So it is ignored.

Here is a running example of what you are doing:

const first = Rx.Observable.create((o) => {
  setTimeout(() => {
    o.next();
  }, 2000);
});
const second = Rx.Observable.create((o) => {
  setTimeout(() => {
    o.next(false);
  }, 1000);
  setTimeout(() => {
    o.next(true);
  }, 3000);
});

first
  .do(() => console.log('before withLatestFrom'))
  .withLatestFrom(
    second
      .do(initialized => console.log('before config filter', initialized))
      .filter(initialized => initialized)
      .do(initialized => console.log('after config filter', initialized)),
  )
  .subscribe(() => console.log('after withLatestFrom'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>

Here is with combineLatest:

const first = Rx.Observable.create((o) => {
  setTimeout(() => {
    o.next();
  }, 2000);
});
const second = Rx.Observable.create((o) => {
  setTimeout(() => {
    o.next(false);
  }, 1000);
  setTimeout(() => {
    o.next(true);
  }, 3000);
});

first
  .do(() => console.log('before withLatestFrom'))
  .combineLatest(
    second
      .do(initialized => console.log('before config filter', initialized))
      .filter(initialized => initialized)
      .do(initialized => console.log('after config filter', initialized)),
  )
  .subscribe(() => console.log('after withLatestFrom'));
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.2/Rx.js"></script>

Upvotes: 14

Related Questions