Reputation: 37238
When my epic responds at the ofType
point, I then need to wait till state$.value.foo
becomes true
. Once it is true, then I want it to get to the from
which does a fetch and important stuff. I am doing it like this:
action$.pipe(
ofType(START_CONTINUE_SESSION),
concat(
iif(
() => state$.value.foo === true,
EMPTY,
action$.pipe(
filter(() => state$.value.foo === true),
)
),
from(fetch(...)).pipe(
// ... do important stuff here BUT only after state$.value.foo has become true
)
)
)
What is happening is that I get super high amounts of actions emitted and it never gets to the from(fetch))
.
Upvotes: 1
Views: 1306
Reputation: 1459
First, note that the concat
operator is deprecated in RxJS 6 (that would be the one imported from rxjs/operators
). However, the function concat
, used to create an observable, is not deprecated (that would be the one imported from rxjs
). I would recommend looking to use operators that are not deprecated.
Second, There are several issues with your current approach.
action$.pipe(
ofType(START_CONTINUE_SESSION),
concat(
...
The above filters actions matching type "START_CONTINUE_SESSION" and allows them to pass back to the Redux store. This is because the concat
operator lets the source events to pass through and waits for the previous observable to complete before beginning the next observables. However, because the redux-observable action stream never completes, the concat
should never begin the next observables! Check out the following marble diagram from the old RxJS docs:
As the diagram shows, the source events pass through. With redux-obserable, this means your "START_CONTINUE_SESSION" action would be stuck in a never ending, repeating loop.
Even if the action stream ended and the concat
were to begin the next observables, there are additional issues:
...
iif(
() => state$.value.foo === true,
EMPTY, // I've assumed that this is equivalent to `empty()`
action$.pipe(
filter(() => state$.value.foo === true),
),
),
...
You're first checking the current value of foo
in the store. If its value is true
, nothing more is emitted (in just this particular step) and the from
will begin next. If its value is false
, a new action stream subscription is created. For every future action dispatched, this checks the current value of foo
in the store. When its value is finally made true
, the action that flowed in (which could be any action!) is allowed to pass back out to the Redux store. But note, this subscription never ends! You would, again, have a never ending loop of actions coming in and going right back out to the Redux store, so long as foo
remains true
.
Instead of subscribing to action$
and checking the state, you should subscribe to state$
. The following example is a little bit different, but I think it shows a way of accomplishing your goal. This waits for the initial action (START_CONTINUE_SESSION
), then waits for state to have foo === true
, then sends both the action and state to a mergeMap
where you can process it like normal (fetch, dispatch other actions, etc.). If you don't need a copy of state, then that could just be disregarded.
export const epic = (action$, state$) =>
action$.pipe(
ofType(START_CONTINUE_SESSION),
withLatestFrom(state$),
exhaustMap(([action, state]) =>
state.foo === true
? of([action, state])
: state$.pipe(
mergeMap(state =>
state.foo === true
? of([action, state])
: empty()
),
first(),
)
),
mergeMap(([action, state]) =>
// here we have triggering action and state with foo === true
from(fetch(...)).pipe(
// ... do important stuff here
)
),
)
As far as responding to the initial action (START_CONTINUE_SESSION
), I chose exhaustMap
in the above example. Possible alternatives include concatMap
, mergeMap
, and switchMap
. You should pick the operator that best fits your use case:
concatMap
- Listen for all actions and run multiple workflows sequentially.exhaustMap
- Listen for the first action and wait until you complete the workflow before accepting another action.mergeMap
- Listen for all actions and run multiple workflows in parallel.switchMap
- Listen for all actions but only run one at a time. Cancels any previous workflow when receiving a new action.Upvotes: 1