david_adler
david_adler

Reputation: 10972

Implement debounced chunked async queue with streams

I am new to reactive programming and curious if there might be a more elegant way of implementing a debounced chunked async queue. What is a debounced chunked async queue you ask? Well, maybe there is a better name for it, but the idea is that an async function may be called many times over some period of time. We want to first debounce the calls to the function and batch up all those arguments into one argument. Secondly all executions of that async function should be serialized in a FIFO manner.

Here is my implementation without streams, observables or reactive programming, as I see it:

const debouncedChunkedQueue = <T>(
  fn: (items: T[]) => Promise<void> | void,
  delay = 1000
) => {
  let items: T[] = [];
  let started = false;
  const start = async () => {
    started = true;
    while (items.length) {
      await sleep(delay);
      const chunk = items.splice(0, items.length);
      await fn(chunk);
    }
    started = false;
  };
  const push = (item: T) => {
    items.push(item);
    if (!started) start();
  };
  return { push };
};

https://codesandbox.io/s/priceless-sanne-dkrkw?file=/src/index.ts:87-550

Upvotes: 3

Views: 573

Answers (3)

Mrk Sef
Mrk Sef

Reputation: 8062

So, I made an observable monster.

It's much more complex than your implementation, but in its defence, it's also considerably more powerful.

It's a custom RxJS operator that should work seamlessly with the rest of the RxJS library. If you're running an HTTP call, for example, and get an error you can use retry, retryWhen, catchError, ect to handle errors and respond per-call or for the entire operator.

You can chain other operators (and therefore other observables) before and after this operator to manipulate data before it is used or after it is returned.

If you decide to cancel your function call early, you can just error the source or unsubscribe from the observable (Just like any other operator). It will clean up after itself and cancel any mid-flight operations if possible (Vanilla Promises can't be cancelled).

Anyway, yeah, it's a behemith and perhaps overkill if you're just buffering values for an async function that returns void.

Here it is:

The bufferedExhaustMap operator

This operator buffers values based on a minimum buffer length (time in milliseconds) and minimum buffer size. It always exhausts the current projected observable regardless of whether minimum time and buffer sizes have been met.

This works very much like exhaustMap, except it buffers values instead of discarding values while waiting for the current inner (projected) observable to complete.

/***
 * ObservableInput works with an Array, an array-like
 * object, a Promise, an iterable object, or an Observable-like object.
 */
function bufferedExhaustMap<T,R>(
  project: (v:T[]) => ObservableInput<R>, 
  minBufferLength = 0, 
  minBufferCount = 1
): OperatorFunction<T,R> {
  return s => new Observable(observer => {

    const idle = new BehaviorSubject<boolean>(true);
    const setIdle = (b) => () => idle.next(b);

    const buffer = (() => {
      const bufS = new Subject<(v:T[]) => T[]>();
      return {
        output: bufS.pipe(
          scan((acc, curr) => curr(acc), [])
        ),
        nextVal: (v:T) => bufS.next(curr => [...curr, v]),
        clear: () => bufS.next(_ => [])
      }
    })();

    const subProject = combineLatest(
      idle,
      idle.pipe(
        filter(v => !v),
        startWith(true),
        switchMap(_ => timer(minBufferLength).pipe(
          mapTo(true),
          startWith(false)
        ))
      ),
      buffer.output
    ).pipe(
      filter(([idle, bufferedByTime, buffer]) => 
        idle && bufferedByTime && buffer.length >= minBufferCount
      ),
      tap(setIdle(false)),
      tap(buffer.clear),
      map(x => x[2]),
      map(project),
      mergeMap(projected => from(projected).pipe(
        finalize(setIdle(true))
      ))
    ).subscribe(observer);

    const subSource = s.subscribe({
      next: buffer.nextVal,
      complete: observer.complete.bind(observer),
      error: e => {
        subProject.unsubscribe();
        observer.error(e);
      }
    });

    return {
      unsubscribe: () => {
        subProject.unsubscribe();
        subSource.unsubscribe();
      }
    }
  });
}

Back to debouncedChunkedQueue

Here is how I would implement your function with the operator I built.

function debouncedChunkedQueue<T>(
  fn: (items: T[]) => Promise<void>,
  delay = 1000
){
  const processSubject = new Subject<T>();
  processSubject.pipe(
    bufferedExhaustMap(fn, delay)
  ).subscribe();

  return {
    push : processSubject.next.bind(processSubject)
  }
}

Though this does abstract away RxJS. Instead, I would lean into the reactive style harder and just use the operator as-is. Whatever your promise is doing to the state of your program (Setting values, transforming objects, ect), have it return that information instead and react accordingly.

If that sounds miserable to you, then you're likely better off not using this approach. That's fine too! To each their own. :)

Update # 1: A More Condenced bufferedExhaustMap

Inspired by another answer here (by kvetis), I re-implemented this operator using bufferWhen and extended it to include a minimum buffer size.

This also unsubscribes from the source rather than ignoring the source via exhaustMap while buffering. This should (in theory) be faster, though I'm not sure it would ever matter in practice.

/***
 * Buffers, then projects buffered source values to an Observable which is merged in 
 * the output Observable only if the previous projected Observable has completed.
 ***/
function bufferedExhaustMap<T,R>(
  project: (v:T[]) => ObservableInput<R>, 
  minBufferLength = 0, 
  minBufferCount = 1
): OperatorFunction<T,R> {
  return s => defer(() => {
    const idle = new BehaviorSubject(true);
    const setIdle = (b) => () => idle.next(b);
    const shared = s.pipe(share());

    const nextBufferTime = () => forkJoin(
      shared.pipe(take(minBufferCount)),
      timer(minBufferLength),
      idle.pipe(first(v => v))
    ).pipe(
      tap(setIdle(false))
    );

    return shared.pipe(
      bufferWhen(nextBufferTime),
      map(project),
      mergeMap(projected => from(projected).pipe(
        finalize(setIdle(true))
      ))
    );
  });
}

Perhaps the most complex bit is the nextBufferTime factory function. It creates an observable that emits when the buffer should be cleared, then completes.

It does this by merging 3 observables and only completing once all three observables are complete. In this way, an inner observable completing counts as a condition being met.

  • First condition: The source has emitted minBufferCount values.
  • Second condition: A timer of minBufferLength has elapsed.
  • Third condition: No projected observable is currently active (idle == true).
const nextBufferTime = () => forkJoin(
  // Take minBufferCount from source then complete
  shared.pipe(take(minBufferCount)),
  // Wait minBufferLength milliseconds and then complete
  timer(minBufferLength),
  // Wait until idle emits true, then complete
  idle.pipe(first(v => v))
)

Upvotes: 2

kvetis
kvetis

Reputation: 7351

I followed your current implementation 1:1 which resulted in more complicated code than you would achieve with simple bufferTime.

You can test the following code in RxViz.

const { BehaviorSubject, fromEvent, of } = Rx;
const { buffer, delay, first, tap, mergeMap, exhaustMap } = RxOperators;

// Source of the queue - click to emit event
const items = fromEvent(document, 'click');

// a "pushback" source
const processing = new BehaviorSubject(false);

items.pipe(
  // we resubscribe to items to start only when an item is there
  buffer(
    // we take items again so we only
    // start if there are any items
    // to be processed and then we wait
    // 1 second
    items.pipe(
      // we do not start processing until
      // pending async call completes
      exhaustMap(() =>
        processing.pipe(
          // wait for processing end
          first(val => !val),
          // wait for 1 second
          delay(1000),
        ),
      ),
    ),
  ),
  tap(() => processing.next(true)),
  // basic mergeMap is okay, since we control that the next value
  // will come no sooner than this is completed
  mergeMap(items =>
    // async fn simulation
    of(items.length).pipe(delay(300 + Math.random() * 500)),
  ),
  tap(() => processing.next(false)),
);

It's a bummer that we have to keep a state outside the observable, though.

A debouncedChunkedQueue

You can test the following code in stackblitz

import { BehaviorSubject, Subject, of } from "rxjs";
import {
  buffer,
  delay,
  first,
  tap,
  mergeMap,
  exhaustMap
} from "rxjs/operators";

const debouncedChunkedQueue = <T>(
  fn: (items: T[]) => Promise<void> | void,
  delayMs = 1000
) => {
  // Source of the queue - click to emit event
  const items = new Subject<T>();

  // a "pushback" source
  const processing = new BehaviorSubject(false);

  items
    .pipe(
      // we resubscribe to items to start only when an item is there
      buffer(
        // we take items again so we only
        // start if there are any items
        // to be processed and then we wait
        // 1 second
        items.pipe(
          // we do not start processing until
          // pending async call completes
          exhaustMap(() =>
            processing.pipe(
              // wait for processing end
              first(val => !val),
              // wait for 1 second
              delay(delayMs)
            )
          )
        )
      ),
      tap(() => processing.next(true)),
      // basic mergeMap is okay, since we control that the next value
      // will come no sooner than this is completed
      mergeMap(
        items =>
          // TODO: Make sure to catch errors from the fn if you want the queue to recover
          // async fn simulation
          fn(items) || of(null)
      ),
      tap(() => processing.next(false))
    )
    .subscribe();
  return { push: (item: T) => items.next(item) };
};

Upvotes: 2

maxime1992
maxime1992

Reputation: 23813

We want to first debounce the calls to the function and batch up all those arguments into one argument. Secondly all executions of that async function should be serialized in a FIFO manner.

I think you've given a really good description of how to write this using streams.

According to this sentence, I believe you could write a custom operator, like the following:

export function debouncedChunkedQueue<T>(
  fn: (items: T[]) => Observable<void>,
  delay = 1000
) {
  return (obs$: Observable<T>) =>
    obs$.pipe(
      bufferTime(delay),
      map(items => fn(items) || EMPTY),
      concatAll()
    );
}

To demonstrate how this should work, I've done the following:

function mockApiCall<T>(items: T[]): Observable<void> {
  console.log(`mock API call for the items`, items);
  return of(undefined).pipe(delay(500));
}

interval(100)
  .pipe(debouncedChunkedQueue(mockApiCall))
  .subscribe(console.log);

Which output (for the first 3 results here) the following:

mock API call for the items
[0, 1, 2, 3, 4, 5, 6, 7, 8]

mock API call for the items
[9, 10, 11, 12, 13, 14, 15, 16, 17, 18]

mock API call for the items
[19, 20, 21, 22, 23, 24, 25, 26, 27, 28]

Here's a live demo:

https://stackblitz.com/edit/rxjs-qweh1g?devtoolsheight=60

By the way to explain a little bit:

  • bufferTime lets you accumulate data into an array for a specified time
  • with the map we create a cold observable which, once we subscribe to it will call the callback. So basically we end up with an observable of observable
  • concatAll lets us unwrap the observable of observable by subscribing to one observable at the time to ensure they run in order

Upvotes: 1

Related Questions