Athari
Athari

Reputation: 34293

Throttle Rx.Observable without skipping values

Throttle method skips values from an observable sequence if others follow too quickly. But I need a method to just delay them. That is, I need to set a minimum delay between items, without skipping any.

Practical example: there's a web service which can accept requests no faster than once a second; there's a user who can add requests, single or in batches. Without Rx, I'll create a list and a timer. When users adds requests, I'll add them to the list. In the timer event, I'll check wether the list is empty. If it is not, I'll send a request and remove the corresponding item. With locks and all that stuff. Now, with Rx, I can create Subject, add items when users adds requests. But I need a way to make sure the web service is not flooded by applying delays.

I'm new to Rx, so maybe I'm missing something obvious.

Upvotes: 18

Views: 5328

Answers (6)

gakera
gakera

Reputation: 3677

I was playing around with this and found .Zip (as mentioned before) to be the most simple method:

var stream = "ThisFastObservable".ToObservable();
var slowStream = 
    stream.Zip(
        Observable.Interval(TimeSpan.FromSeconds(1)), //Time delay 
        (x, y) => x); // We just care about the original stream value (x), not the interval ticks (y)

slowStream.TimeInterval().Subscribe(x => Console.WriteLine($"{x.Value} arrived after {x.Interval}"));

output:

T arrived after 00:00:01.0393840
h arrived after 00:00:00.9787150
i arrived after 00:00:01.0080400
s arrived after 00:00:00.9963000
F arrived after 00:00:01.0002530
a arrived after 00:00:01.0003770
s arrived after 00:00:00.9963710
t arrived after 00:00:01.0026450
O arrived after 00:00:00.9995360
b arrived after 00:00:01.0014620
s arrived after 00:00:00.9993100
e arrived after 00:00:00.9972710
r arrived after 00:00:01.0001240
v arrived after 00:00:01.0016600
a arrived after 00:00:00.9981140
b arrived after 00:00:01.0033980
l arrived after 00:00:00.9992570
e arrived after 00:00:01.0003520

Upvotes: 1

Varvara Kalinina
Varvara Kalinina

Reputation: 2063

I want to suggest an approach with using Observable.Zip:

// Incoming requests
var requests = new[] {1, 2, 3, 4, 5}.ToObservable();

// defines the frequency of the incoming requests
// This is the way to emulate flood of incoming requests.
// Which, by the way, uses the same approach that will be used in the solution
var requestsTimer = Observable.Interval(TimeSpan.FromSeconds(0.1)); 
var incomingRequests = Observable.Zip(requests, requestsTimer, (number, time) => {return number;});
incomingRequests.Subscribe((number) =>
{
    Console.WriteLine($"Request received: {number}");
});

// This the minimum interval at which we want to process the incoming requests
var processingTimeInterval = Observable.Interval(TimeSpan.FromSeconds(1));

// Zipping incoming requests with the interval
var requestsToProcess = Observable.Zip(incomingRequests, processingTimeInterval, (data, time) => {return data;});

requestsToProcess.Subscribe((number) =>
{
    Console.WriteLine($"Request processed: {number}");
});

Upvotes: 2

Juri Naidenov
Juri Naidenov

Reputation: 1885

.Buffer(TimeSpan.FromSeconds(0.2)).Where(i => i.Any())
.Subscribe(buffer => 
{
     foreach(var item in buffer) Console.WriteLine(item)
});

Upvotes: -1

yamen
yamen

Reputation: 15618

How about a simple extension method:

public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
    return source.Select(x => 
        Observable.Empty<T>()
            .Delay(minDelay)
            .StartWith(x)
    ).Concat();
}

Usage:

var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(1));

Upvotes: 5

Enigmativity
Enigmativity

Reputation: 117175

There's a fairly easy way to do what you want using an EventLoopScheduler.

I started out with an observable that will randomly produce values once every 0 to 3 seconds.

var rnd = new Random();

var xs =
    Observable
        .Generate(
            0,
            x => x < 20,
            x => x + 1,
            x => x,
            x => TimeSpan.FromSeconds(rnd.NextDouble() * 3.0));

Now, to make this output values immediately unless the last value was within a second ago I did this:

var ys =
    Observable.Create<int>(o =>
    {
        var els = new EventLoopScheduler();
        return xs
            .ObserveOn(els)
            .Do(x => els.Schedule(() => Thread.Sleep(1000)))
            .Subscribe(o);
    });

This effectively observes the source on the EventLoopScheduler and then puts it to sleep for 1 second after each OnNext so that it can only begin the next OnNext after it wakes up.

I tested that it worked with this code:

ys
    .Timestamp()
    .Select(x => x.Timestamp.Second + (double)x.Timestamp.Millisecond/1000.0)
    .Subscribe(x => Console.WriteLine(x));

I hope this helps.

Upvotes: 7

Chris Shain
Chris Shain

Reputation: 51369

How about using an observable timer to take from a blocking queue? Code below is untested, but should give you an idea of what I mean...

//assuming somewhere there is 
BlockingCollection<MyWebServiceRequestData> workQueue = ...

Observable
  .Timer(new TimeSpan(0,0,1), new EventLoopScheduler())
  .Do(i => myWebService.Send(workQueue.Take()));

// Then just add items to the queue using workQueue.Add(...)

Upvotes: -1

Related Questions