galenus
galenus

Reputation: 2127

Rx for .Net: how to combine Scan with Throttle

My problem is this: for a given sequence of events I would like to cache their values until there is a pause in the stream. Then, I'm going to process all the cached data in a batch and clear the cache state.

A naive approach to do this would be (not a working code, some errors may exist):

struct FlaggedData
{
    public EventData Data { get; set; }
    public bool Reset { get; set; }
}

...

IObservable<EventData> eventsStream = GetStream();
var resetSignal = new Subject<FlaggedData>();

var flaggedDataStream = eventsStream
    .Select(data => new FlaggedData { Data = data })
    .Merge(resetSignal)
    .Scan(
        new List<EventData>(),
        (cache, flaggedData) =>
        {
            if (!flaggedData.Reset())
            {
                cache.Add(flaggedData.Data);
                return cache;
            }

            return new List<EventData>();
        })
    .Throttle(SomePeriodOfTime)
    .Subscribe(batch => 
        {
            resetSignal.OnNext(new FlaggedData { Reset = true});
            ProcessBatch(batch);
        });

So here, after receiving any batch for processing, I request a reset of cache. The problem is that because of Throttle there could be some data in cache (or so I believe) which will be lost in this case.

What I would like is some operation like:

ScanWithThrottling<TAccumulate, TSource>(
    IObservable<TSource> source,
    Func<TAccumulate, TSource, TAccumulate> aggregate,
    TimeSpan throttlingSpan)

which returns an observable that will reset the accumulated value each time it calls OnNext of its subscriber.

Of course, I could write an extension of my own, but question is whether there is some way I could achieve the same effect using standard Rx operations.

Upvotes: 4

Views: 2044

Answers (2)

James World
James World

Reputation: 29786

I think there is a simple way to go here. Use Buffer() to buffer elements based on a throttle like this:

var buffered = source.Publish(ps =>        
    ps.Buffer(() => ps.Throttle(SomePeriodOfTime)));

This will buffer elements until there has been gap of SomePeriodOfTime and present them as a list. No need to worry about the "reset" aspect, and you won't lose elements.

The use of Publish ensures there's a single shared subscription to the source events that can be used by Buffer and each Throttle. The throttle is the buffer closing function, providing a signal that indicates a new buffer should be started.

Here's a testable version - I'm just dumping out the length of each buffer here and using Timestamp to add the timing info, but it's an IList<T> that you get on the raw buffered stream. Note how the scheduler is provided as an argument to time based operations to enable testing.

Note, you will need nuget package rx-testing to run this sample, to pull in the Rx test framework and get TestScheduler and ReactiveTest types:

void Main()
{
    var scenarios = new Scenarios();
    scenarios.Scenario1();
}

public class Scenarios : ReactiveTest
{
    public void Scenario1()
    {
        var scheduler = new TestScheduler();
        var source = scheduler.CreateHotObservable(
            OnNext(100, 1),
            OnNext(200, 2),
            OnNext(300, 3),
            OnNext(800, 4),
            OnNext(900, 5),
            OnNext(1400, 6),
            OnNext(1600, 7),
            OnNext(1700, 8),
            OnNext(1800, 9));

    var duration = TimeSpan.FromTicks(300);

    var buffered = source.Publish(ps =>        
        ps.Buffer(() => ps.Throttle(duration, scheduler)));

        buffered.Timestamp(scheduler).Subscribe(
            x => Console.WriteLine("Timestamp: {0} Value: {1}",
                 x.Timestamp.Ticks, x.Value.Count()));

        scheduler.Start();

    }
}

Upvotes: 6

Dave Sexton
Dave Sexton

Reputation: 2652

Essentially, it seems that you want a Throttle that buffers rather than drops.

To solve this problem, let's think about how Throttle works:

  1. When a value arrives, cache it (replacing any previous value) and start a timer.
  2. If another value arrives before the timer elapses, cancel the timer and goto #1.
  3. Else when the timer elapses, push the value from the cache.

Your spec:

  1. When a value arrives, append it to a buffer and start a timer.
  2. If another value arrives before the timer elapses, cancel the timer and goto #1.
  3. Else when the timer elapses, push the buffer and create a new buffer.

Thus:

var throttled = source.Publish(p => 
  from value in p
  // TODO: Add to buffer (side effect)
  from _ in Observable.Timer(period, scheduler).TakeUntil(p)
  select buffer);  // TODO: Create new buffer (side effect + contention)

Notice the race conditions and the possible contention for the hypothetical buffer if the specified scheduler introduces concurrency.

If the Timer elapses when p observes a new value, then in a naïve implementation it's possible for the latest value to leak into the buffer before the buffer is pushed and a new buffer is created. This might be acceptable, though synchronization is necessary no matter how you cut it. And it seems impossible to me to get the exact semantics that you're looking for unless you either extend notifications with timestamps or guarantee thread-affinity for all operations.

Perhaps it makes sense as a good generalized operator anyway. It seems orthogonal enough to me to be included as an Rx primitive, unless I'm just mistaken in my analysis. Consider adding a work item.

I also distinctly remember discussing a similar operator in the Rx MSDN Forum, probably more than once. Might be worth searching for those discussions.

Edit: I've changed my mind about using synchronization in this example since the race condition is unavoidable here anyway, assuming scheduler introduces concurrency.

Untested Example:

public static IObservable<IList<TSource>> ThrottleBuffered<TSource>(
  this IObservable<TSource> source, 
  TimeSpan period,
  IScheduler scheduler)
{
  return source.Publish(p => 
  {
    var buffer = new List<TSource>();

    return (from _ in p.Do(buffer.Add)
            from __ in Observable.Timer(period, scheduler).TakeUntil(p)
            .Do(_ => buffer = new List<TSource>())
            select buffer);
  });
}

Upvotes: 3

Related Questions