Kieran
Kieran

Reputation: 18059

how to accumulate change after buffer in RXJS

In the world of RXJS I have a stream of observables that I would like to buffer/batch. For this, I am using the lovely bufferCount(5) to slow things down

bufferCount(5)

I would like to then resolve an async call (a save to the database), which is the observable being buffered.

bufferCount(5),
switchMap(b => forkJoin(b)),
// unbuffer here

I would then like to un-buffer or accumulate changes to get a single list of changes. To do this I used bufferTime(2000) and follow it up with a map to flatten the results... needless to say, I feel very dirty for doing so.

How do I unbuffer the stream without using a timer?

Edit naughty-river-p0v02p

Upvotes: 0

Views: 180

Answers (1)

backtick
backtick

Reputation: 2775

It sounds like you're asking how to turn a list/observable of objects into an observable of each object's DB-save-result value, while avoiding saving every object at once.

As of this writing, your codesandbox seems to be on the right track with its use of mergeAll with a specified concurrency value.

This would be my recommendation:

const perform$ = rick$.pipe(
  switchMap((response) => response.json()),
  concatMap((x) => {
    return x.results.map((r) =>
      longRunningTask(r, Math.floor(Math.random() * 5000))
    );
  }),
  mergeAll(5)
);

Rather than concerning yourself with buffering and "un-buffering", you can create a stream of tasks (observables that start the save operation when you subscribe), and use mergeAll(N) to run N tasks at a time. Every task result will be included in this observable.

I've included an updated codesandbox.

Edit silly-herschel-sox41h

Upvotes: 1

Related Questions