Reputation: 393
I have an Observable where each new value should cause an HTTP request. On the client-side I only care about the latest response value; however, I want every request to complete for monitoring/etc. purposes.
What I currently have is something like:
function simulate(x) {
// Simulate an HTTP request.
return of(x).pipe(delay(6));
}
source$.pipe(
someMapFunc(x => simulate(x)),
);
When I use switchMap
for the someMapFunc
, I get the right set of responses (only the latest). However, if the request is taking too long, it will get canceled.
When I use mergeMap
instead, I get the right set of requests (every request completes), but I get the wrong set of responses (every single one).
Is there a way to get the requests of mergeMap
with the responses of switchMap
? I know I can write this as a custom operator, but I'm wondering if I can build this out of existing/standard rxjs operators. To summarize what I'm thinking of:
switchMap
that doesn't unsubscribe when it switches;mergeMap
that only emits values from the latest inner Observable.Edit: Based on the accepted answer, I was able to get the following, which works:
function orderedMergeMap(project) {
return (s) => defer(() => {
let recent = 0;
return s.pipe(
mergeMap((data, idx) => {
recent = idx;
return project(data).pipe(filter(() => idx === recent));
})
);
});
}
Upvotes: 3
Views: 1432
Reputation: 8022
I'm not 100% sure if this is what you're after, and I haven't fully tested this, but I created a custom operator that might do something close to what you're after. Maybe you can tinker with it a bit more.
This is a mergeMap that filters out "old" values. Old values are emissions from sources that happen after a newer source starts to emit.
function orderedMergeMap<T, R>(project: (v:T) => Observable<R>): OperatorFunction<T, R> {
return s => defer(() => {
let recent = 0;
return s.pipe(
map((v, i) => ({order: i, payload: v})),
mergeMap(({order, payload}) => project(payload).pipe(
map(v => ({order, payload: v}))
)),
tap(({order}) => {
if(order > recent) recent = order;
}),
filter(({order}) => order < recent),
map(({payload}) => payload)
);
});
}
The version OP settled on:
function orderedMergeMap<T, R>(project: (v:T) => Observable<R>): OperatorFunction<T, R> {
return s => defer(() => {
let recent = 0;
return s.pipe(
mergeMap((data, idx) => {
recent = idx;
return project(data).pipe(
filter(() => idx === recent)
);
})
);
});
}
Upvotes: 4
Reputation: 655
I believe that you need a combination of concatMap()
and last()
.
concatMap
does not subscribe to the next observable until the previous completes. Using it you will ensure the order of requests execution. And as it follows from the description it doesn't cancel previous subscriptions and let them finish, unlike switchMap
.
last
emits the last value emitted from the source on completion. Using it you will ensure that only one (last) result will be passed to the result.
Your code will look like that:
source$.pipe(
concatMap(x => simulate(x)),
last()
);
Upvotes: 2