Marco
Marco

Reputation: 1016

How to implement buffering with timeout in RX.JS

I'm trying to to group the values from an observable into an array of n size, to be able to batch send these to a service to improve the overall performance. The thing is that I want to make sure that when the items left are less then n, they will be still be passed down the chain after a certain timeout.

I'm trying to rewrite the C# solution from https://stackoverflow.com/a/22873833/2157455 in Javascript.

The main problem is that in Rx.Js lots of methods have been deprecated and it's hard to find the new functions.

    var people = new List<(string name, int age)>
{
    ("Sue", 25 ),
    ("Joe", 30 ),
    ("Frank", 25 ),
    ("Sarah", 35 ),
    ("John", 37)
}.ToObservable();

var buffers = people
    .GroupByUntil(
        // yes. yes. all items belong to the same group.
        x => true,
        g => Observable.Amb(
               // close the group after 5 seconds of inactivity
               g.Throttle(TimeSpan.FromSeconds(5)),
               // close the group after 10 items
               g.Skip(1)
             ))
    // Turn those groups into buffers
    .SelectMany(x => x.ToArray());

I could get this far, but I can't find the replacement for groupByUntil. And I'm not sure what's the selectMany operator in Rx.Js, probably toArray(). Most examples I find are using deprecated or non-exising functions.

I'm using rxjs 7.8.0

The syntax does not help as well, using the pipe all the time makes the code difficult to read in my opinion.

const people = [
    { name: 'Sue', age: 25 },
    { name: 'Joe', age: 30 },
    { name: 'Frank', age: 25 },
    { name: 'Sarah', age: 35 },
    { name: 'John', age: 37 }
  ];

  const source = from(people);

  const example = source.pipe(
        groupBy(person => true),
        mergeMap(group => group.pipe(
                raceWith(
                    group.pipe(throttle(() => interval(1000))),
                    group.pipe(skip(2))
                ),
                toArray()
            )));
    
 

    example.forEach(x => console.log(x.length));

I'm getting all 5, instead of two arrays, one with 3 the other with 2. Perhaps there is a better way to write it in js, but I can;t see the replacement for groupByUntil.

Thanks.

Upvotes: 0

Views: 44

Answers (1)

Matthieu Riegler
Matthieu Riegler

Reputation: 54943

bufferTime is probably what you are looking for

One of its signature is :

bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>

so with bufferTime(1000, null, 2) you get a buffered of length=2 or every 1s.

Upvotes: 1

Related Questions