Reputation: 1192
I am trying to change load from observable by Buffering it for 30s then shrinking list by taking only the last version of specific record id. It will still be big object so I would like to spread the load by splitting it into few items spread over a specific interval.
If Buffer time is 30s I would like to Buffer/Shrink load then send 5 small batches each with 5s between them
Simplified flow:
sequence
buffer
filter
split
delay
a-a-b-a-b-b-c-b-c-b-c-b-a-b-a-b-b-a-b-c-b-c-a-b-a-b-b-c-b-f-b-a-b-a-b-a-b-c-c-a-f-a-b-s-c
<aababbcbcbcbab> <abbabcbcababbcbfb>
<abc> <abcf>
<<ab>,<c>> <<ab>,<cf>>
<ab> <c> <ab> <cf>
Real Example:
Record
int Id
int Version
notifications
.Buffer(TimeSpan.FromSeconds(30), 20000)
.Select(list => list.GroupBy(x=> x.Id)
.Select(group => group.OrderByDescending(x=> x.Version)
.FirstOrDefault()).ToList())
Upvotes: 1
Views: 149
Reputation: 43545
Here is one approach. Each distinct group is subdivided to 5 subgroups using the expression index * 5 / list.Count
as the key of a GroupBy
, and then a Delay
is imposed to each subgroup, with duration relative to its index in the parent distinct group.
IObservable<List<Record>> query = notifications
.Buffer(TimeSpan.FromSeconds(30))
.Select(list => list
.GroupBy(x => x.Id)
.Select(g => g.OrderByDescending(x => x.Version).FirstOrDefault())
.ToList()
)
.SelectMany(list => list
.Select((x, i) => (x, i))
.GroupBy(e => e.i * 5 / list.Count, e => e.x)
.Select(g => g.ToList())
.Select((sublist, i) => Observable.Return(sublist)
.Delay(TimeSpan.FromSeconds(5 * i)))
)
.Merge();
Upvotes: 1