Reputation: 137
A colleague and I have added a redux-observable like flow to our android application and can't figure out how to create an epic that stops receiving actions until the first call completes.
Originally I thought we could use skipUntil()
but then realized the result of skipUntil()
is swallowed and never passed along to the rest of the chain.
Below is some rough JS code showing what we're hoping to accomplish.
const fetchUserEpic = action$ =>
action$.ofType(FETCH_USER)
.skipUntil( /* first request completes */ )
.mergeMap(action =>
ajax.getJSON(`/api/users/${action.payload}`)
.map(response => fetchUserFulfilled(response))
.takeUntil(action$.ofType(FETCH_USER_CANCELLED))
);
It's almost like I need a skipMap()
operator that acts like switchMap()
but only honors one request at a time, while ignoring all items if an observable is ongoing.
Thanks for any suggestions.
Upvotes: 2
Views: 731
Reputation: 18663
You don't need to do anything fancy, there is already an operator for this called exhaustMap
. exhaustMap
is similar to concatMap
except that it will silently drop requests that come in while the previous one is still processing.
const fetchUserEpic = action$ =>
action$.ofType(FETCH_USER)
.exhaustMap(action =>
ajax.getJSON(`/api/users/${action.payload}`)
.map(response => fetchUserFulfilled(response))
.takeUntil(action$.ofType(FETCH_USER_CANCELLED))
);
Upvotes: 3
Reputation: 2962
Assuming you're pretty sure that's what you need (it's usually not, dropping the input while you execute a previous query means the result is outdated, switchMap
and/or debounce
are usually preferred), take(1) + repeat
should work, as the action observable is hot.
const fetchUserEpic = action$ =>
action$.ofType(FETCH_USER)
.take(1)
.concatMap(action =>
ajax.getJSON(`/api/users/${action.payload}`)
.map(response => fetchUserFulfilled(response))
.takeUntil(action$.ofType(FETCH_USER_CANCELLED))
)
.repeat()
Here's a snippet simulating the logic.
// simulate action$.ofType(FETCH_USER) every second
const obs1 = Rx.Observable.interval(1000).publish()
obs1.connect()
function getStuff(i) {
return Rx.Observable.timer(i*1000).mapTo(i)
}
obs1.take(1).concatMap(getStuff).repeat().subscribe(console.log)
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.2/Rx.js"></script>
Upvotes: 4