Reputation: 10972
I am new to reactive programming and curious if there might be a more elegant way of implementing a debounced chunked async queue. What is a debounced chunked async queue you ask? Well, maybe there is a better name for it, but the idea is that an async function may be called many times over some period of time. We want to first debounce the calls to the function and batch up all those arguments into one argument. Secondly all executions of that async function should be serialized in a FIFO manner.
Here is my implementation without streams, observables or reactive programming, as I see it:
const debouncedChunkedQueue = <T>(
fn: (items: T[]) => Promise<void> | void,
delay = 1000
) => {
let items: T[] = [];
let started = false;
const start = async () => {
started = true;
while (items.length) {
await sleep(delay);
const chunk = items.splice(0, items.length);
await fn(chunk);
}
started = false;
};
const push = (item: T) => {
items.push(item);
if (!started) start();
};
return { push };
};
https://codesandbox.io/s/priceless-sanne-dkrkw?file=/src/index.ts:87-550
Upvotes: 3
Views: 573
Reputation: 8062
So, I made an observable monster.
It's much more complex than your implementation, but in its defence, it's also considerably more powerful.
It's a custom RxJS operator that should work seamlessly with the rest of the RxJS library. If you're running an HTTP call, for example, and get an error you can use retry
, retryWhen
, catchError
, ect to handle errors and respond per-call or for the entire operator.
You can chain other operators (and therefore other observables) before and after this operator to manipulate data before it is used or after it is returned.
If you decide to cancel your function call early, you can just error the source or unsubscribe from the observable (Just like any other operator). It will clean up after itself and cancel any mid-flight operations if possible (Vanilla Promises can't be cancelled).
Anyway, yeah, it's a behemith and perhaps overkill if you're just buffering values for an async function that returns void.
Here it is:
bufferedExhaustMap
operatorThis operator buffers values based on a minimum buffer length (time in milliseconds) and minimum buffer size. It always exhausts the current projected observable regardless of whether minimum time and buffer sizes have been met.
This works very much like exhaustMap, except it buffers values instead of discarding values while waiting for the current inner (projected) observable to complete.
/***
* ObservableInput works with an Array, an array-like
* object, a Promise, an iterable object, or an Observable-like object.
*/
function bufferedExhaustMap<T,R>(
project: (v:T[]) => ObservableInput<R>,
minBufferLength = 0,
minBufferCount = 1
): OperatorFunction<T,R> {
return s => new Observable(observer => {
const idle = new BehaviorSubject<boolean>(true);
const setIdle = (b) => () => idle.next(b);
const buffer = (() => {
const bufS = new Subject<(v:T[]) => T[]>();
return {
output: bufS.pipe(
scan((acc, curr) => curr(acc), [])
),
nextVal: (v:T) => bufS.next(curr => [...curr, v]),
clear: () => bufS.next(_ => [])
}
})();
const subProject = combineLatest(
idle,
idle.pipe(
filter(v => !v),
startWith(true),
switchMap(_ => timer(minBufferLength).pipe(
mapTo(true),
startWith(false)
))
),
buffer.output
).pipe(
filter(([idle, bufferedByTime, buffer]) =>
idle && bufferedByTime && buffer.length >= minBufferCount
),
tap(setIdle(false)),
tap(buffer.clear),
map(x => x[2]),
map(project),
mergeMap(projected => from(projected).pipe(
finalize(setIdle(true))
))
).subscribe(observer);
const subSource = s.subscribe({
next: buffer.nextVal,
complete: observer.complete.bind(observer),
error: e => {
subProject.unsubscribe();
observer.error(e);
}
});
return {
unsubscribe: () => {
subProject.unsubscribe();
subSource.unsubscribe();
}
}
});
}
debouncedChunkedQueue
Here is how I would implement your function with the operator I built.
function debouncedChunkedQueue<T>(
fn: (items: T[]) => Promise<void>,
delay = 1000
){
const processSubject = new Subject<T>();
processSubject.pipe(
bufferedExhaustMap(fn, delay)
).subscribe();
return {
push : processSubject.next.bind(processSubject)
}
}
Though this does abstract away RxJS. Instead, I would lean into the reactive style harder and just use the operator as-is. Whatever your promise is doing to the state of your program (Setting values, transforming objects, ect), have it return that information instead and react accordingly.
If that sounds miserable to you, then you're likely better off not using this approach. That's fine too! To each their own. :)
bufferedExhaustMap
Inspired by another answer here (by kvetis), I re-implemented this operator using bufferWhen
and extended it to include a minimum buffer size.
This also unsubscribes from the source rather than ignoring the source via exhaustMap
while buffering. This should (in theory) be faster, though I'm not sure it would ever matter in practice.
/***
* Buffers, then projects buffered source values to an Observable which is merged in
* the output Observable only if the previous projected Observable has completed.
***/
function bufferedExhaustMap<T,R>(
project: (v:T[]) => ObservableInput<R>,
minBufferLength = 0,
minBufferCount = 1
): OperatorFunction<T,R> {
return s => defer(() => {
const idle = new BehaviorSubject(true);
const setIdle = (b) => () => idle.next(b);
const shared = s.pipe(share());
const nextBufferTime = () => forkJoin(
shared.pipe(take(minBufferCount)),
timer(minBufferLength),
idle.pipe(first(v => v))
).pipe(
tap(setIdle(false))
);
return shared.pipe(
bufferWhen(nextBufferTime),
map(project),
mergeMap(projected => from(projected).pipe(
finalize(setIdle(true))
))
);
});
}
Perhaps the most complex bit is the nextBufferTime
factory function. It creates an observable that emits when the buffer should be cleared, then completes.
It does this by merging 3 observables and only completing once all three observables are complete. In this way, an inner observable completing counts as a condition being met.
minBufferCount
values.minBufferLength
has elapsed.idle == true
).const nextBufferTime = () => forkJoin(
// Take minBufferCount from source then complete
shared.pipe(take(minBufferCount)),
// Wait minBufferLength milliseconds and then complete
timer(minBufferLength),
// Wait until idle emits true, then complete
idle.pipe(first(v => v))
)
Upvotes: 2
Reputation: 7351
I followed your current implementation 1:1 which resulted in more complicated code than you would achieve with simple bufferTime
.
You can test the following code in RxViz.
const { BehaviorSubject, fromEvent, of } = Rx;
const { buffer, delay, first, tap, mergeMap, exhaustMap } = RxOperators;
// Source of the queue - click to emit event
const items = fromEvent(document, 'click');
// a "pushback" source
const processing = new BehaviorSubject(false);
items.pipe(
// we resubscribe to items to start only when an item is there
buffer(
// we take items again so we only
// start if there are any items
// to be processed and then we wait
// 1 second
items.pipe(
// we do not start processing until
// pending async call completes
exhaustMap(() =>
processing.pipe(
// wait for processing end
first(val => !val),
// wait for 1 second
delay(1000),
),
),
),
),
tap(() => processing.next(true)),
// basic mergeMap is okay, since we control that the next value
// will come no sooner than this is completed
mergeMap(items =>
// async fn simulation
of(items.length).pipe(delay(300 + Math.random() * 500)),
),
tap(() => processing.next(false)),
);
It's a bummer that we have to keep a state outside the observable, though.
You can test the following code in stackblitz
import { BehaviorSubject, Subject, of } from "rxjs";
import {
buffer,
delay,
first,
tap,
mergeMap,
exhaustMap
} from "rxjs/operators";
const debouncedChunkedQueue = <T>(
fn: (items: T[]) => Promise<void> | void,
delayMs = 1000
) => {
// Source of the queue - click to emit event
const items = new Subject<T>();
// a "pushback" source
const processing = new BehaviorSubject(false);
items
.pipe(
// we resubscribe to items to start only when an item is there
buffer(
// we take items again so we only
// start if there are any items
// to be processed and then we wait
// 1 second
items.pipe(
// we do not start processing until
// pending async call completes
exhaustMap(() =>
processing.pipe(
// wait for processing end
first(val => !val),
// wait for 1 second
delay(delayMs)
)
)
)
),
tap(() => processing.next(true)),
// basic mergeMap is okay, since we control that the next value
// will come no sooner than this is completed
mergeMap(
items =>
// TODO: Make sure to catch errors from the fn if you want the queue to recover
// async fn simulation
fn(items) || of(null)
),
tap(() => processing.next(false))
)
.subscribe();
return { push: (item: T) => items.next(item) };
};
Upvotes: 2
Reputation: 23813
We want to first debounce the calls to the function and batch up all those arguments into one argument. Secondly all executions of that async function should be serialized in a FIFO manner.
I think you've given a really good description of how to write this using streams.
According to this sentence, I believe you could write a custom operator, like the following:
export function debouncedChunkedQueue<T>(
fn: (items: T[]) => Observable<void>,
delay = 1000
) {
return (obs$: Observable<T>) =>
obs$.pipe(
bufferTime(delay),
map(items => fn(items) || EMPTY),
concatAll()
);
}
To demonstrate how this should work, I've done the following:
function mockApiCall<T>(items: T[]): Observable<void> {
console.log(`mock API call for the items`, items);
return of(undefined).pipe(delay(500));
}
interval(100)
.pipe(debouncedChunkedQueue(mockApiCall))
.subscribe(console.log);
Which output (for the first 3 results here) the following:
mock API call for the items
[0, 1, 2, 3, 4, 5, 6, 7, 8]
mock API call for the items
[9, 10, 11, 12, 13, 14, 15, 16, 17, 18]
mock API call for the items
[19, 20, 21, 22, 23, 24, 25, 26, 27, 28]
Here's a live demo:
https://stackblitz.com/edit/rxjs-qweh1g?devtoolsheight=60
By the way to explain a little bit:
bufferTime
lets you accumulate data into an array for a specified timemap
we create a cold observable which, once we subscribe to it will call the callback. So basically we end up with an observable of observableconcatAll
lets us unwrap the observable of observable by subscribing to one observable at the time to ensure they run in orderUpvotes: 1