Reputation: 18059
In the world of RXJS I have a stream of observables that I would like to buffer/batch. For this, I am using the lovely bufferCount(5) to slow things down
bufferCount(5)
I would like to then resolve an async call (a save to the database), which is the observable being buffered.
bufferCount(5),
switchMap(b => forkJoin(b)),
// unbuffer here
I would then like to un-buffer or accumulate changes to get a single list of changes. To do this I used bufferTime(2000)
and follow it up with a map to flatten the results... needless to say, I feel very dirty for doing so.
How do I unbuffer the stream without using a timer?
Upvotes: 0
Views: 180
Reputation: 2775
It sounds like you're asking how to turn a list/observable of objects into an observable of each object's DB-save-result value, while avoiding saving every object at once.
As of this writing, your codesandbox seems to be on the right track with its use of mergeAll
with a specified concurrency value.
This would be my recommendation:
const perform$ = rick$.pipe(
switchMap((response) => response.json()),
concatMap((x) => {
return x.results.map((r) =>
longRunningTask(r, Math.floor(Math.random() * 5000))
);
}),
mergeAll(5)
);
Rather than concerning yourself with buffering and "un-buffering", you can create a stream of tasks (observables that start the save operation when you subscribe), and use mergeAll(N)
to run N tasks at a time. Every task result will be included in this observable.
I've included an updated codesandbox.
Upvotes: 1