Reputation: 569
I have the following theoretical problem to solve and please note that there might be a more general solution to the example issue I describe, but I specifically want to know on how to create a joined observable with these properties.
I have an observable of change events that should trigger save actions (which result in a observable of the save success).
Using exhaust or exhaustMap almost does, what I want: It assures that while saving is in progress no other event is fired. While concat or concatMap would ensure that the last one will be carried out, but I will do a lot of unnecessary save actions.
To rephrase it: How to create an observable that will exhaust and concat the last event?
Upvotes: 7
Views: 687
Reputation: 14139
You can use throttle
with config leading: true, trailing: true
to emit the first event, then no events until an observable emits and then the last event received during that time. See How does throttleTime operator's config parameter work? (ThrottleConfig)
Map to the observable you want to execute (the save action) afterwards. Use a Subject
and finalize
to end the throttle interval when the save action completes.
Whether you use mergeMap
, exhaustMap
, concatMap
etc. to map to your inner observable shouldn't matter as the throttle
operator only emits the next event when you inner observable completed.
If you create a custom operator function with this logic you have to wrap the code with defer
so that different subscribers don't share the same Subject but instead each get their own new Subject.
export function exhaustMapWithTrailing<T, R>(
project: (value: T, index: number) => ObservableInput<R>
): OperatorFunction<T, R> {
return (source): Observable<R> => defer(() => {
const release = new Subject()
return source.pipe(
throttle(() => release, { leading: true, trailing: true }),
exhaustMap((value, index) => from(project(value, index)).pipe(
finalize(() => release.next())
) as Observable<R> )
)
})
}
events$.pipe(
exhaustMapWithTrailing(event => save(event))
)
https://stackblitz.com/edit/rxjs-5k6egc?file=index.ts
This code was adapted from here https://github.com/ReactiveX/rxjs/issues/5004
Upvotes: 8