user007
user007

Reputation: 1192

Reactive Filter, Split and Delay after Buffering

I am trying to change load from observable by Buffering it for 30s then shrinking list by taking only the last version of specific record id. It will still be big object so I would like to spread the load by splitting it into few items spread over a specific interval.

If Buffer time is 30s I would like to Buffer/Shrink load then send 5 small batches each with 5s between them

Simplified flow:

sequence
buffer
filter
split
delay
a-a-b-a-b-b-c-b-c-b-c-b-a-b-a-b-b-a-b-c-b-c-a-b-a-b-b-c-b-f-b-a-b-a-b-a-b-c-c-a-f-a-b-s-c
                          <aababbcbcbcbab>                  <abbabcbcababbcbfb>
                          <abc>                             <abcf>
                          <<ab>,<c>>                        <<ab>,<cf>> 
                          <ab>              <c>             <ab>              <cf>

Real Example:

Record
    int Id
    int Version

notifications  
    .Buffer(TimeSpan.FromSeconds(30), 20000)
    .Select(list => list.GroupBy(x=> x.Id)
                        .Select(group => group.OrderByDescending(x=> x.Version)
                                              .FirstOrDefault()).ToList())

Upvotes: 1

Views: 149

Answers (1)

Theodor Zoulias
Theodor Zoulias

Reputation: 43545

Here is one approach. Each distinct group is subdivided to 5 subgroups using the expression index * 5 / list.Count as the key of a GroupBy, and then a Delay is imposed to each subgroup, with duration relative to its index in the parent distinct group.

IObservable<List<Record>> query = notifications
    .Buffer(TimeSpan.FromSeconds(30))
    .Select(list => list
        .GroupBy(x => x.Id)
        .Select(g => g.OrderByDescending(x => x.Version).FirstOrDefault())
        .ToList()
    )
    .SelectMany(list => list
        .Select((x, i) => (x, i))
        .GroupBy(e => e.i * 5 / list.Count, e => e.x)
        .Select(g => g.ToList())
        .Select((sublist, i) => Observable.Return(sublist)
            .Delay(TimeSpan.FromSeconds(5 * i)))
    )
    .Merge();

Upvotes: 1

Related Questions