Lawrence
Lawrence

Reputation: 3297

Smoothing Rx Observables

Very similar to this question: Rx IObservable buffering to smooth out bursts of events, I am interested in smoothing out observables that may occur in bursts.

Hopefully the diagram below illustrates that I am aiming for:

Raw:       A--B--CDE-F--------------G-----------------------
Interval:  o--o--o--o--o--o--o--o--o--o--o--o--o--o--o--o--o
Output:    A--B--C--D--E--F-----------G---------------------

Given the raw stream, I wish to stretch these events over regular intervals.

Throttling does not work as then I end up losing elements of the raw sequence.

Zip works well if the raw stream is more frequent than the timer, but fails if there are periods where there are no raw events.

EDIT

In response to Dan's answer, the problem with Buffer is that if bursts of many events arrive within a short time interval then I receive the events too often. Below shows what could happen with a buffer size of 3, and a timeout configured to the required interval:

Raw:       -ABC-DEF-----------G-H-------------------------------
Interval:  o--------o--------o--------o--------o--------o--------
Buffered:  ---A---D-------------------G--------------------------
              B   E                   H
              C   F
Desired:   ---------A--------B--------C--------D--------E ..etc.

Upvotes: 4

Views: 336

Answers (2)

IanR
IanR

Reputation: 4773

How about this? (inspired by James' answer mentioned in the comments)...

public static IObservable<T> Regulate<T>(this IObservable<T> source, TimeSpan period)
{
    var interval = Observable.Interval(period).Publish().RefCount();

    return source.Select(x => Observable.Return(x)
                                        .CombineLatest(interval, (v, _) => v)
                                        .Take(1))
                 .Concat();
}

It turns each value in the raw observable into its own observable. The CombineLatest means it won't produce a value until the interval does. Then we just take one value from each of these observables and concatenate.

The first value in the raw observable gets delayed by one period. I'm not sure if that is an issue for you or not.

Upvotes: 1

Dan Lyons
Dan Lyons

Reputation: 229

It looks like what you want to use is Buffer. One of the overloads allows you to specify an interval as well as the buffer length. You could conceivably set the length to 1.

Raw.Buffer(interval, 1);

For some more examples of its use, you can refer to the IntroToRX site.

Upvotes: 0

Related Questions