Reputation: 1068
I have a bunch of events coming in and I have to execute ALL of them without a loss, but I want to make sure that they are buffered and consumed at the appropriate time slots. Anyone have a solution?
I can't find any operators in Rx that can do that without the loss of the events (Throttle - looses events). I've also considered Buffered, Delay, etc... Can't find a good solution.
I've tried to put a timer in the middle, but somehow it doesn't work at all:
GetInitSequence()
.IntervalThrottle(TimeSpan.FromSeconds(5))
.Subscribe(
item =>
{
Console.WriteLine(DateTime.Now);
// Process item
}
);
public static IObservable<T> IntervalThrottle<T>(this IObservable<T> source, TimeSpan dueTime)
{
return Observable.Create<T>(o =>
{
return source.Subscribe(x =>
{
new Timer(state =>
o.OnNext((T)state), x, dueTime, TimeSpan.FromMilliseconds(-1));
}, o.OnError, o.OnCompleted);
});
}
Upvotes: 6
Views: 3602
Reputation: 10783
Along the lines of Enigmativity's answer, if all you want to do is just Delay all of the values by a TimeSpan, I cant see why Delay
is not the operator you want
GetInitSequence()
.Delay(TimeSpan.FromSeconds(5)) //ideally pass an IScheduler here
.Subscribe(
item =>
{
Console.WriteLine(DateTime.Now);
// Process item
}
);
Upvotes: 1
Reputation: 117047
I know this could just be too simple, but would this work?
var intervaled = source.Do(x => { Thread.Sleep(100); });
Basically this just puts a minimum delay between values. Too simplistic?
Upvotes: 1
Reputation: 15618
The question is not 100% clear so I'm making some presumptions.
Observable.Delay
is not what you want because that will create a delay from when each event arrives, rather than creating even time intervals for processing.
Observable.Buffer
is not what you want because that will cause all events in each given interval to be passed to you, rather than one at a time.
So I believe you're looking for a solution that creates some sort of metronome that ticks away, and gives you an event per tick. This can be naively constructed using Observable.Interval
for the metronome and Zip
for connecting it to your source:
var source = GetInitSequence();
var trigger = Observable.Interval(TimeSpan.FromSeconds(5));
var triggeredSource = source.Zip(trigger, (s,_) => s);
triggeredSource.Subscribe(item => Console.WriteLine(DateTime.Now));
This will trigger every 5 seconds (in the example above), and give you the original items in sequence.
The only problem with this solution is that if you don't have any more source elements for (say) 10 seconds, when the source elements arrive they will be immediately sent out since some of the 'trigger' events are sitting there waiting for them. Marble diagram for that scenario:
source: -a-b-c----------------------d-e-f-g
trigger: ----o----o----o----o----o----o----o
result: ----a----b----c-------------d-e-f-g
This is a very reasonable issue. There are two questions here already that tackle it:
Rx IObservable buffering to smooth out bursts of events
A way to push buffered events in even intervals
The solution provided is a main Drain
extension method and secondary Buffered
extension. I've modified these to be far simpler (no need for Drain
, just use Concat
). Usage is:
var bufferedSource = source.StepInterval(TimeSpan.FromSeconds(5));
The extension method StepInterval
:
public static IObservable<T> StepInterval<T>(this IObservable<T> source, TimeSpan minDelay)
{
return source.Select(x =>
Observable.Empty<T>()
.Delay(minDelay)
.StartWith(x)
).Concat();
}
Upvotes: 13
Reputation: 19063
How about Observable.Buffer? This should return all the events in the 1s window as a single event.
var xs = Observable.Interval(TimeSpan.FromMilliseconds(100));
var bufferdStream = xs.Buffer(TimeSpan.FromSeconds(5));
bufferdStream.Subscribe(item => { Console.WriteLine("Number of events in window: {0}", item.Count); });
It might be what you're asking isnt that clear. What is your code supposed to do? It looks like you're just delaying by creating a timer for each event. It also breaks the semantics of the observable as the next and complete could occur before the next.
Note this is also only as accurate at the timer used. Typically the timers are accurate to at most 16ms.
Edit:
your example becomes, and item contains all the events in the window:
GetInitSequence()
.Buffer(TimeSpan.FromSeconds(5))
.Subscribe(
item =>
{
Console.WriteLine(DateTime.Now);
// Process item
}
);
Upvotes: 0