jhp
jhp

Reputation: 569

How to create a joined observable which exhausts, but always fires the very last event after the current has finished?

I have the following theoretical problem to solve and please note that there might be a more general solution to the example issue I describe, but I specifically want to know on how to create a joined observable with these properties.

I have an observable of change events that should trigger save actions (which result in a observable of the save success).

  1. I need to make sure that in the end the very last save event will definitely be executed.
  2. Saving itself is a complex process and takes some time and during a save event no additional save actions should be executed.

Using exhaust or exhaustMap almost does, what I want: It assures that while saving is in progress no other event is fired. While concat or concatMap would ensure that the last one will be carried out, but I will do a lot of unnecessary save actions.

To rephrase it: How to create an observable that will exhaust and concat the last event?

Upvotes: 7

Views: 687

Answers (1)

frido
frido

Reputation: 14139

You can use throttle with config leading: true, trailing: true to emit the first event, then no events until an observable emits and then the last event received during that time. See How does throttleTime operator's config parameter work? (ThrottleConfig)

Map to the observable you want to execute (the save action) afterwards. Use a Subject and finalize to end the throttle interval when the save action completes.

Whether you use mergeMap, exhaustMap, concatMap etc. to map to your inner observable shouldn't matter as the throttle operator only emits the next event when you inner observable completed.

If you create a custom operator function with this logic you have to wrap the code with defer so that different subscribers don't share the same Subject but instead each get their own new Subject.

export function exhaustMapWithTrailing<T, R>(
  project: (value: T, index: number) => ObservableInput<R>
): OperatorFunction<T, R> {
  return (source): Observable<R> => defer(() => {
    const release = new Subject()

    return source.pipe(
      throttle(() => release, { leading: true, trailing: true }),
      exhaustMap((value, index) => from(project(value, index)).pipe(
        finalize(() => release.next())
      ) as Observable<R> )
    )
  })
}
events$.pipe(
  exhaustMapWithTrailing(event => save(event))
)

https://stackblitz.com/edit/rxjs-5k6egc?file=index.ts

This code was adapted from here https://github.com/ReactiveX/rxjs/issues/5004

Upvotes: 8

Related Questions