Reputation: 3758
So, I asked how to change Throttle
Timespan in middle of a running query here, to which James then replied that there's an overload and actually provided an example too (all good and well, I learned some techniques from there too).
During the previous weekend I produced a piece of code where the Throttle
interval would be defined by the incoming stream itself. As a practical example, the stream could be a series of structs defined as follows
struct SomeEvent
{
public int Id;
public DateTimeOffset TimeStamp;
}
And then the accepting stream would check the TimeStamp
fields and calculate the absence intervals based on them. Altering a bit James' linked example, the stream could be produced like
Func<SomeEvent, IObservable<long>> throttleFactory = e => Observable.Timer(TimeSpan.FromTicks(throttleDuration.Ticks - (DateTimeOffset.Now.Ticks - e.TimeStamp.Ticks)));
var sequence = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => new SomeEvent { Id = 0, TimeStamp = DateTimeOffset.Now.AddTicks(-1) }).Throttle(throttleFactory);
var subscription = sequence.Subscribe(e => Console.WriteLine(e.TimeStamp));
The time shift, a few ticks, is just for illustrational purposes
Then I had a more elaborate example here, again, helped a lot by James. In short, the idea here was that there could be "a tower of alert lights" per ID (akin to traffic lights), having colours like yellow and red, which are lit each on their turn defined by how long there has been absence of events. Then when an event arrives, all the lights are switched off and "absence timers" start from zero again.
The snag I've hit is that I seem to be unable to alter this particular example so that it would use this idea to produce the Throttle
value. Particularly I can't seem to get the grouping play out nicely on line grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler))
in James' code here. Maybe I'm just too exhausted on debugging and all, but I'd sure appreciate if someone could provide a nudge to the right direction!
What's the big idea? Well, the events could be timestamped at the source, but the transmission could add a delay that needs to be accounted for. Judging from the F# Users Group discussion that's gearing with distributed computing (and being somewhat familiar with integration issues myself), a scenario in which the events are timestamped somewhere and then relayed through different queuing systems create two kinds of cases:
<edit: Brandon makes a valid point in regard to my example given in 2.. How should one actually interpret the absence of "business timeouts"? In case events haven't arrived, the only valid timeout event to produce is the "technical" one in 1. If they do arrive in a burst, is the receiver interested on the time difference between the events and wants to raise color events accordingly? Or should the timer just be reset according to the timestamps in business events and then when a burst arrives, take the timestamp of the last one (which, again, could be longer than the allowed timeout period). It gets complicated and messy, better drop this one as an example.
That being written, I'd still be interested to know how to perform the join in grp => grp.Throttle(thresholdSelector(grp.Key, level), scheduler))
. I'm inclined to mark Brandon's post as an answer too if this gets complicated (as I feel it might get, that grouping is fairly complex, I feel).
Upvotes: 1
Views: 785
Reputation: 39182
It sounds like throttling is no longer what you want. Is this what you are trying to do?
var alarms = events
.GroupBy(e => e.Id)
.SelectMany(grp =>
{
// Determine light color based on delay between events
// go black if event arrives that is not stale
var black = grp
.Where(ev => (Date.Now - ev.TimeStamp) < TimeSpan.FromSeconds(2))
.Select(ev => "black");
// go yellow if no events after 1 second
var yellow = black
.Select(b => Observable.Timer(TimeSpan.FromSeconds(1)))
.SwitchLatest()
.Select(t => "yellow");
// go red if no events after 2 seconds
var red = black
.Select(b => Observable.Timer(TimeSpan.FromSeconds(2)))
.SwitchLatest()
.Select(t => "red");
return Observable
.Merge(black, yellow, red)
.Select(color => new { Id = grp.Key, Color = color });
});
Upvotes: 2