Andrew Anderson
Andrew Anderson

Reputation: 3449

Rx: How can I respond immediately, and throttle subsequent requests

I would like to set up an Rx subscription that can respond to an event right away, and then ignore subsequent events that happen within a specified "cooldown" period.

The out of the box Throttle/Buffer methods respond only once the timeout has elapsed, which is not quite what I need.

Here is some code that sets up the scenario, and uses a Throttle (which isn't the solution I want):

class Program
{
    static Stopwatch sw = new Stopwatch();

    static void Main(string[] args)
    {
        var subject = new Subject<int>();
        var timeout = TimeSpan.FromMilliseconds(500);

        subject
            .Throttle(timeout)
            .Subscribe(DoStuff);

        var factory = new TaskFactory();
         
        sw.Start();

        factory.StartNew(() =>
        {
            Console.WriteLine("Batch 1 (no delay)");
            subject.OnNext(1);
        });

        factory.StartNewDelayed(1000, () =>
        {
            Console.WriteLine("Batch 2 (1s delay)");
            subject.OnNext(2);
        });
 
        factory.StartNewDelayed(1300, () =>
        {
            Console.WriteLine("Batch 3 (1.3s delay)");
            subject.OnNext(3);
        });

        factory.StartNewDelayed(1600, () =>
        {
            Console.WriteLine("Batch 4 (1.6s delay)");
            subject.OnNext(4);
        });

        Console.ReadKey();
        sw.Stop();
    }

    private static void DoStuff(int i)
    {
        Console.WriteLine("Handling {0} at {1}ms", i, sw.ElapsedMilliseconds);
    }
}

The output of running this right now is:

Batch 1 (no delay)

Handling 1 at 508ms

Batch 2 (1s delay)

Batch 3 (1.3s delay)

Batch 4 (1.6s delay)

Handling 4 at 2114ms

Note that batch 2 isn't handled (which is fine!) because we wait for 500ms to elapse between requests due to the nature of throttle. Batch 3 is also not handled, (which is less alright because it happened more than 500ms from batch 2) due to its proximity to Batch 4.

What I'm looking for is something more like this:

Batch 1 (no delay)

Handling 1 at ~0ms

Batch 2 (1s delay)

Handling 2 at ~1000s

Batch 3 (1.3s delay)

Batch 4 (1.6s delay)

Handling 4 at ~1600s

Note that batch 3 wouldn't be handled in this scenario (which is fine!) because it occurs within 500ms of Batch 2.

EDIT:

Here is the implementation for the "StartNewDelayed" extension method that I use:

/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <returns>A Task that will be completed after the specified duration.</returns>
public static Task StartNewDelayed(
    this TaskFactory factory, int millisecondsDelay)
{
    return StartNewDelayed(factory, millisecondsDelay, CancellationToken.None);
}

/// <summary>Creates a Task that will complete after the specified delay.</summary>
/// <param name="factory">The TaskFactory.</param>
/// <param name="millisecondsDelay">The delay after which the Task should transition to RanToCompletion.</param>
/// <param name="cancellationToken">The cancellation token that can be used to cancel the timed task.</param>
/// <returns>A Task that will be completed after the specified duration and that's cancelable with the specified token.</returns>
public static Task StartNewDelayed(this TaskFactory factory, int millisecondsDelay, CancellationToken cancellationToken)
{
    // Validate arguments
    if (factory == null) throw new ArgumentNullException("factory");
    if (millisecondsDelay < 0) throw new ArgumentOutOfRangeException("millisecondsDelay");

    // Create the timed task
    var tcs = new TaskCompletionSource<object>(factory.CreationOptions);
    var ctr = default(CancellationTokenRegistration);

    // Create the timer but don't start it yet.  If we start it now,
    // it might fire before ctr has been set to the right registration.
    var timer = new Timer(self =>
    {
        // Clean up both the cancellation token and the timer, and try to transition to completed
        ctr.Dispose();
        ((Timer)self).Dispose();
        tcs.TrySetResult(null);
    });

    // Register with the cancellation token.
    if (cancellationToken.CanBeCanceled)
    {
        // When cancellation occurs, cancel the timer and try to transition to cancelled.
        // There could be a race, but it's benign.
        ctr = cancellationToken.Register(() =>
        {
            timer.Dispose();
            tcs.TrySetCanceled();
        });
    }

    if (millisecondsDelay > 0)
    {
        // Start the timer and hand back the task...
        timer.Change(millisecondsDelay, Timeout.Infinite);
    }
    else
    {
        // Just complete the task, and keep execution on the current thread.
        ctr.Dispose();
        tcs.TrySetResult(null);
        timer.Dispose();
    }

    return tcs.Task;
}

Upvotes: 39

Views: 8238

Answers (9)

Yannick Excoffier
Yannick Excoffier

Reputation: 81

It's an old post, but no answer could really fill my needs, so I'm giving my own solution :

public static IObservable<T> ThrottleOrImmediate<T>(this IObservable<T> source, TimeSpan delay, IScheduler scheduler)
{
    return Observable.Create<T>((obs, token) =>
    {
        // Next item cannot be send before that time
        DateTime nextItemTime = default;

        return Task.FromResult(source.Subscribe(async item =>
        {
            var currentTime = DateTime.Now;
            // If we already reach the next item time
            if (currentTime - nextItemTime >= TimeSpan.Zero)
            {
                // Following item will be send only after the set delay
                nextItemTime = currentTime + delay;
                // send current item with scheduler
                scheduler.Schedule(() => obs.OnNext(item));
            }
            // There is still time before we can send an item
            else
            {
                // we schedule the time for the following item
                nextItemTime = currentTime + delay;
                try
                {
                    await Task.Delay(delay, token);
                }
                catch (TaskCanceledException)
                {
                    return;
                }

                // If next item schedule was change by another item then we stop here
                if (nextItemTime > currentTime + delay)
                    return;
                else
                {
                    // Set next possible time for an item and send item with scheduler
                    nextItemTime = currentTime + delay;
                    scheduler.Schedule(() => obs.OnNext(item));
                }
            }
        }));

    });
}

First item is immediately sent, then following items are throttled. Then if a following item is sent after the delayed time, it's immediately sent too.

Upvotes: 0

Bart de Boer
Bart de Boer

Reputation: 421

Use .Scan() ! This is what I use for Throttling when I need the first hit (after a certain period) immediately, but delay (and group/ignore) any subsequent hits. Basically works like Throttle, but fires immediately if the previous onNext was >= interval ago, otherwise, schedule it at exactly interval from the previous hit. And of course, if within the 'cooling down' period multiple hits come, the additional ones are ignored, just like Throttle does. The difference with your use case is that if you get an event at 0 ms and 100 ms, they will both be handled (at 0ms and 500ms), which might be what you actually want (otherwise, the accumulator is easy to adapt to ignore ANY hit closer than interval to the previous one).

public static IObservable<T> QuickThrottle<T>(this IObservable<T> src, TimeSpan interval, IScheduler scheduler)
{
  return src
    .Scan(new ValueAndDueTime<T>(), (prev, id) => AccumulateForQuickThrottle(prev, id, interval, scheduler))
    .Where(vd => !vd.Ignore)
    .SelectMany(sc => Observable.Timer(sc.DueTime, scheduler).Select(_ => sc.Value));
}

private static ValueAndDueTime<T> AccumulateForQuickThrottle<T>(ValueAndDueTime<T> prev, T value, TimeSpan interval, IScheduler s)
{
  var now = s.Now;

  // Ignore this completely if there is already a future item scheduled
  //  but do keep the dueTime for accumulation!
  if (prev.DueTime > now) return new ValueAndDueTime<T> { DueTime = prev.DueTime, Ignore = true };

  // Schedule this item at at least interval from the previous
  var min = prev.DueTime + interval;
  var nextTime = (now < min) ? min : now;
  return new ValueAndDueTime<T> { DueTime = nextTime, Value = value };
}

private class ValueAndDueTime<T>
{
  public DateTimeOffset DueTime;
  public T Value;
  public bool Ignore;
}

Upvotes: 1

James World
James World

Reputation: 29776

Here's my approach. It's similar to others that have gone before, but it doesn't suffer the over-zealous window production problem.

The desired function works a lot like Observable.Throttle but emits qualifying events as soon as they arrive rather than delaying for the duration of the throttle or sample period. For a given duration after a qualifying event, subsequent events are suppressed.

Given as a testable extension method:

public static class ObservableExtensions
{
    public static IObservable<T> SampleFirst<T>(
        this IObservable<T> source,
        TimeSpan sampleDuration,
        IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;
        return source.Publish(ps => 
            ps.Window(() => ps.Delay(sampleDuration,scheduler))
              .SelectMany(x => x.Take(1)));
    }
}

The idea is to use the overload of Window that creates non-overlapping windows using a windowClosingSelector that uses the source time-shifted back by the sampleDuration. Each window will therefore: (a) be closed by the first element in it and (b) remain open until a new element is permitted. We then simply select the first element from each window.

Rx 1.x Version

The Publish extension method used above is not available in Rx 1.x. Here is an alternative:

public static class ObservableExtensions
{
    public static IObservable<T> SampleFirst<T>(
        this IObservable<T> source,
        TimeSpan sampleDuration,
        IScheduler scheduler = null)
    {
        scheduler = scheduler ?? Scheduler.Default;
        var sourcePub = source.Publish().RefCount();
        return sourcePub.Window(() => sourcePub.Delay(sampleDuration,scheduler))
                        .SelectMany(x => x.Take(1));
    }
}

Upvotes: 18

Sergey Aldoukhov
Sergey Aldoukhov

Reputation: 22744

I have stumbled upon this question while trying to re-implement my own solution to the same or similar problem using .Window Take a look, it seems to be the same as this one and solved quite elegantly:

https://stackoverflow.com/a/3224723/58463

Upvotes: 0

Christoph
Christoph

Reputation: 27985

Well the most obvious thing will be to use Repeat() here. However, as far as I know Repeat() might introduce problems so that notifications disappear in between the moment when the stream stops and we subscribe again. In practice this has never been a problem for me.

subject
    .Take(1)
    .Concat(Observable.Empty<long>().Delay(TimeSpan.FromMilliseconds(500)))
    .Repeat();

Remember to replace with the actual type of your source.

UPDATE:

Updated query to use Concat instead of Merge

Upvotes: 0

Christoph
Christoph

Reputation: 27985

I got another one for your. This one doesn't use Repeat() nor Interval() so it might be what you are after:

subject
    .Window(() => Observable.Timer(TimeSpan.FromMilliseconds(500)))
    .SelectMany(x => x.Take(1));

Upvotes: 0

Andrew Anderson
Andrew Anderson

Reputation: 3449

The initial answer I posted has a flaw: namely that the Window method, when used with an Observable.Interval to denote the end of the window, sets up an infinite series of 500ms windows. What I really need is a window that starts when the first result is pumped into the subject, and ends after the 500ms.

My sample data masked this problem because the data broke down nicely into the windows that were already going to be created. (i.e. 0-500ms, 501-1000ms, 1001-1500ms, etc.)

Consider instead this timing:

factory.StartNewDelayed(300,() =>
{
    Console.WriteLine("Batch 1 (300ms delay)");
    subject.OnNext(1);
});

factory.StartNewDelayed(700, () =>
{
    Console.WriteLine("Batch 2 (700ms delay)");
    subject.OnNext(2);
});

factory.StartNewDelayed(1300, () =>
{
    Console.WriteLine("Batch 3 (1.3s delay)");
    subject.OnNext(3);
});

factory.StartNewDelayed(1600, () =>
{
    Console.WriteLine("Batch 4 (1.6s delay)");
    subject.OnNext(4);
});

What I get is:

Batch 1 (300ms delay)

Handling 1 at 356ms

Batch 2 (700ms delay)

Handling 2 at 750ms

Batch 3 (1.3s delay)

Handling 3 at 1346ms

Batch 4 (1.6s delay)

Handling 4 at 1644ms

This is because the windows begin at 0ms, 500ms, 1000ms, and 1500ms and so each Subject.OnNext fits nicely into its own window.

What I want is:

Batch 1 (300ms delay)

Handling 1 at ~300ms

Batch 2 (700ms delay)

Batch 3 (1.3s delay)

Handling 3 at ~1300ms

Batch 4 (1.6s delay)

After a lot of struggling and an hour banging on it with a co-worker, we arrived at a better solution using pure Rx and a single local variable:

bool isCoolingDown = false;

subject
    .Where(_ => !isCoolingDown)
    .Subscribe(
    i =>
    {
        DoStuff(i);

        isCoolingDown = true;

        Observable
            .Interval(cooldownInterval)
            .Take(1)
            .Subscribe(_ => isCoolingDown = false);
    });

Our assumption is that calls to the subscription method are synchronized. If they are not, then a simple lock could be introduced.

Upvotes: 2

Ana Betts
Ana Betts

Reputation: 74654

Awesome solution Andrew! We can take this a step further though and clean up the inner Subscribe:

subject
    .Window(() => { return Observable.Interval(timeout); })
    .SelectMany(x => x.Take(1))
    .Subscribe(DoStuff);

Upvotes: 4

Andrew Anderson
Andrew Anderson

Reputation: 3449

The solution I found after a lot of trial and error was to replace the throttled subscription with the following:

subject
    .Window(() => { return Observable.Interval(timeout); })
    .SelectMany(x => x.Take(1))
    .Subscribe(i => DoStuff(i));

Edited to incorporate Paul's clean-up.

Upvotes: 8

Related Questions