Reputation: 34293
Throttle
method skips values from an observable sequence if others follow too quickly. But I need a method to just delay them. That is, I need to set a minimum delay between items, without skipping any.
Practical example: there's a web service which can accept requests no faster than once a second; there's a user who can add requests, single or in batches. Without Rx, I'll create a list and a timer. When users adds requests, I'll add them to the list. In the timer event, I'll check wether the list is empty. If it is not, I'll send a request and remove the corresponding item. With locks and all that stuff. Now, with Rx, I can create Subject
, add items when users adds requests. But I need a way to make sure the web service is not flooded by applying delays.
I'm new to Rx, so maybe I'm missing something obvious.
Upvotes: 18
Views: 5328
Reputation: 3677
I was playing around with this and found .Zip (as mentioned before) to be the most simple method:
var stream = "ThisFastObservable".ToObservable();
var slowStream =
stream.Zip(
Observable.Interval(TimeSpan.FromSeconds(1)), //Time delay
(x, y) => x); // We just care about the original stream value (x), not the interval ticks (y)
slowStream.TimeInterval().Subscribe(x => Console.WriteLine($"{x.Value} arrived after {x.Interval}"));
output:
T arrived after 00:00:01.0393840
h arrived after 00:00:00.9787150
i arrived after 00:00:01.0080400
s arrived after 00:00:00.9963000
F arrived after 00:00:01.0002530
a arrived after 00:00:01.0003770
s arrived after 00:00:00.9963710
t arrived after 00:00:01.0026450
O arrived after 00:00:00.9995360
b arrived after 00:00:01.0014620
s arrived after 00:00:00.9993100
e arrived after 00:00:00.9972710
r arrived after 00:00:01.0001240
v arrived after 00:00:01.0016600
a arrived after 00:00:00.9981140
b arrived after 00:00:01.0033980
l arrived after 00:00:00.9992570
e arrived after 00:00:01.0003520
Upvotes: 1
Reputation: 2063
I want to suggest an approach with using Observable.Zip
:
// Incoming requests
var requests = new[] {1, 2, 3, 4, 5}.ToObservable();
// defines the frequency of the incoming requests
// This is the way to emulate flood of incoming requests.
// Which, by the way, uses the same approach that will be used in the solution
var requestsTimer = Observable.Interval(TimeSpan.FromSeconds(0.1));
var incomingRequests = Observable.Zip(requests, requestsTimer, (number, time) => {return number;});
incomingRequests.Subscribe((number) =>
{
Console.WriteLine($"Request received: {number}");
});
// This the minimum interval at which we want to process the incoming requests
var processingTimeInterval = Observable.Interval(TimeSpan.FromSeconds(1));
// Zipping incoming requests with the interval
var requestsToProcess = Observable.Zip(incomingRequests, processingTimeInterval, (data, time) => {return data;});
requestsToProcess.Subscribe((number) =>
{
Console.WriteLine($"Request processed: {number}");
});
Upvotes: 2
Reputation: 1885
.Buffer(TimeSpan.FromSeconds(0.2)).Where(i => i.Any())
.Subscribe(buffer =>
{
foreach(var item in buffer) Console.WriteLine(item)
});
Upvotes: -1
Reputation: 15618
How about a simple extension method:
public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
return source.Select(x =>
Observable.Empty<T>()
.Delay(minDelay)
.StartWith(x)
).Concat();
}
Usage:
var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(1));
Upvotes: 5
Reputation: 117175
There's a fairly easy way to do what you want using an EventLoopScheduler
.
I started out with an observable that will randomly produce values once every 0 to 3 seconds.
var rnd = new Random();
var xs =
Observable
.Generate(
0,
x => x < 20,
x => x + 1,
x => x,
x => TimeSpan.FromSeconds(rnd.NextDouble() * 3.0));
Now, to make this output values immediately unless the last value was within a second ago I did this:
var ys =
Observable.Create<int>(o =>
{
var els = new EventLoopScheduler();
return xs
.ObserveOn(els)
.Do(x => els.Schedule(() => Thread.Sleep(1000)))
.Subscribe(o);
});
This effectively observes the source on the EventLoopScheduler
and then puts it to sleep for 1 second after each OnNext
so that it can only begin the next OnNext
after it wakes up.
I tested that it worked with this code:
ys
.Timestamp()
.Select(x => x.Timestamp.Second + (double)x.Timestamp.Millisecond/1000.0)
.Subscribe(x => Console.WriteLine(x));
I hope this helps.
Upvotes: 7
Reputation: 51369
How about using an observable timer to take from a blocking queue? Code below is untested, but should give you an idea of what I mean...
//assuming somewhere there is
BlockingCollection<MyWebServiceRequestData> workQueue = ...
Observable
.Timer(new TimeSpan(0,0,1), new EventLoopScheduler())
.Do(i => myWebService.Send(workQueue.Take()));
// Then just add items to the queue using workQueue.Add(...)
Upvotes: -1