Reputation: 1016
I'm trying to to group the values from an observable into an array of n size, to be able to batch send these to a service to improve the overall performance. The thing is that I want to make sure that when the items left are less then n, they will be still be passed down the chain after a certain timeout.
I'm trying to rewrite the C# solution from https://stackoverflow.com/a/22873833/2157455 in Javascript.
The main problem is that in Rx.Js lots of methods have been deprecated and it's hard to find the new functions.
var people = new List<(string name, int age)>
{
("Sue", 25 ),
("Joe", 30 ),
("Frank", 25 ),
("Sarah", 35 ),
("John", 37)
}.ToObservable();
var buffers = people
.GroupByUntil(
// yes. yes. all items belong to the same group.
x => true,
g => Observable.Amb(
// close the group after 5 seconds of inactivity
g.Throttle(TimeSpan.FromSeconds(5)),
// close the group after 10 items
g.Skip(1)
))
// Turn those groups into buffers
.SelectMany(x => x.ToArray());
I could get this far, but I can't find the replacement for groupByUntil. And I'm not sure what's the selectMany operator in Rx.Js, probably toArray(). Most examples I find are using deprecated or non-exising functions.
I'm using rxjs 7.8.0
The syntax does not help as well, using the pipe all the time makes the code difficult to read in my opinion.
const people = [
{ name: 'Sue', age: 25 },
{ name: 'Joe', age: 30 },
{ name: 'Frank', age: 25 },
{ name: 'Sarah', age: 35 },
{ name: 'John', age: 37 }
];
const source = from(people);
const example = source.pipe(
groupBy(person => true),
mergeMap(group => group.pipe(
raceWith(
group.pipe(throttle(() => interval(1000))),
group.pipe(skip(2))
),
toArray()
)));
example.forEach(x => console.log(x.length));
I'm getting all 5, instead of two arrays, one with 3 the other with 2. Perhaps there is a better way to write it in js, but I can;t see the replacement for groupByUntil.
Thanks.
Upvotes: 0
Views: 44
Reputation: 54943
bufferTime
is probably what you are looking for
One of its signature is :
bufferTime(bufferTimeSpan: number, bufferCreationInterval: number, maxBufferSize: number, scheduler?: SchedulerLike): OperatorFunction<T, T[]>
so with bufferTime(1000, null, 2)
you get a buffered of length=2
or every 1s
.
Upvotes: 1