Reputation: 2365
I would like to use Rx to compute statistics on 2 stream of events.
Input streams
// stream1 --A---B----A-B-----A-----B----A--B|
// stream2 ----X---X-----------X--X---XX---X--X|
Intermediate result
Window durations, where windows open on A and close on B along with the count of stream2 events raised inside those window
// result ------1------0-----------2-------1| <-- count of stream2 events in [A-B] window
// 4 2 6 3 <-- paired with window [A-B] window duration
Final result
Group the intermediate result by count of stream2 events and return window duration statistics for each group, such as the average, min and max window duration
// output -----------------------------------0 1 2| <-- count of stream2 events in [A-B] window
// 2 3.5 6 <-- average [A-B] window duration for that count of stream2 events.
Rx queries
public enum EventKind
{
START,
STOP,
OTHER
};
public struct Event1
{
public EventKind Kind;
public DateTime OccurenceTime;
};
var merge = stream1.Merge(stream2.Select(x => new Event1
{
Kind = EventKind.OTHER,
OccurenceTime = x
}))
.RemoveDisorder(x => x.OccurenceTime, new TimeSpan(0,0,10));
var shared = merge.Publish().RefCount();
// Windows open on START and close on STOP
var windows = shared.Window(
shared.Where(x => x.Kind == EventKind.START),
opening => shared.Where(x => x.Kind == EventKind.STOP));
// For each window we're interested in the duration of the window along with
// the count of OTHER events that were raised inside the window
//
var pairs = windows.Select(window => new
{
Duration = window
.Where(x=>x.Kind != EventKind.OTHER) // we only want START & STOP events, not OTHER events
.Buffer(2,1) // could use buffer(2) but this is more reliable if stream1 sometimes has multiple consecutive START events.
.Where(x => x.Count == 2 && x[1].Kind == EventKind.STOP && x[0].Kind == EventKind.START)
.Select(x => x[1].OccurenceTime - x[0].OccurenceTime), // compute the latency
EventCount = window.Where(x=>x.Kind == EventKind.OTHER).Count() // count the number of OTHER events in the window
}
);
I would like to simplify the observable type
IObservable<{IObservable<int>, IObservable<TimeSpan>}>
IObservable<{int, TimeSpan}>
this should be possible since each window has exactly 1 duration and 1 count of OTHER events.
At which point it should not be too difficult to define the output query which groups the windows by EventCount
and selects statistics on the window durations, such as Min, Max, Avg for each group.
var result = pairs
.GroupBy(pair => pair.EventCount)
.Select(g => new
{
EventCount = g.Key,
Min = g.Min(x => x.Duration),
Avg = g.Average(x => x.Duration),
Max = g.Max(x => x.Duration)
});
The RemoveDisorder
is an extension method I use to sort the result of the merged obersvable on OccurenceTime
. I need it because my input streams are not live events (like in this example), but read from logs via Tx. And the output of a merge of 2 sorted streams is itself no longer sorted.
Upvotes: 4
Views: 752
Reputation: 3361
After a while using Rx a common scenario that you might encounter is about starting and stopping events. To handle it properly there are several ways, it will depend of your requirements.
If your problem is just with data projection check the @Brandon solution, the key is to compose in a different way, for example using SelectMany
. If you wan to keep the Select
operator it would necessary return an IObservable<T>
type in the projection.
Anyway, I think you have a problem with the composition in general, I will try to illustrate below.
Using Window
operator, like you did, if occurs multiple consecutive events in the start stream it will create more than one group. In your code could be a problem because the main event stream will processing many times when the next event happens.
The example is just to show you the creation of many groups:
var subject = new Subject<Event1>();
var shared = subject.Publish().RefCount();
var start = shared.Where(a => a.Kind == EventKind.START);
var stop = shared.Where(a => a.Kind == EventKind.STOP);
var values = shared.Where(a => a.Kind == EventKind.OTHER);
values.Window(start, a => stop).Subscribe(inner =>
{
Console.WriteLine("New Group Started");
inner.Subscribe(next =>
{
Console.WriteLine("Next = "+ next.Kind + " | " + next.OccurenceTime.ToLongTimeString());
}, () => Console.WriteLine("Group Completed"));
});
subject.OnNext(new Event1 { Kind = EventKind.START, OccurenceTime = DateTime.Now });
subject.OnNext(new Event1 { Kind = EventKind.START, OccurenceTime = DateTime.Now.AddSeconds(1) });
subject.OnNext(new Event1 { Kind = EventKind.OTHER, OccurenceTime = DateTime.Now.AddSeconds(2) });
subject.OnNext(new Event1 { Kind = EventKind.STOP, OccurenceTime = DateTime.Now.AddSeconds(3) });
The result:
New Group Started
New Group Started
Next = OTHER | 4:55:46 PM
Next = OTHER | 4:55:46 PM
Group Completed
Group Completed
Maybe this behavior is desired, otherwise it will be necessary other composition. In order to "tame" the event stream I see three different approach`s:
Switch
operator).To achieve just one of these options, in general, you have many different ways to do it. If I understand your question you are looking for the option ONE. Now the answers:
Window
, too much code:IObservable<Event1> sx= GetEventStream();
var shared = sx.Publish().RefCount();
var start = shared.Where(a => a.Kind == EventKind.START);
var stop = shared.Where(a => a.Kind == EventKind.STOP);
shared.Window(start, a => stop)
.Select(sx =>
sx.Publish(b =>
b.Take(1)
.Select(c =>
{
var final = b.LastOrDefaultAsync().Select(a => a.OccurenceTime);
var comp = b.Where(d => d.Kind == EventKind.OTHER).Count();
return final.Zip(comp, (d,e) => new { Count = e, Time = d - c.OccurenceTime });
})
.Switch() // whatever operator here there's no difference
) // because is just 1
)
.Concat()
.Subscribe(next =>
{
Console.WriteLine("Count = "+ next.Count + " | " + next.Time);
});
Using GroupByUntil
, a kind of "hack", but it is my preference:
IObservable<Event1> sx = GetEventStream();
var shared = sx.Publish().RefCount();
var stop = shared.Where(a => a.Kind == EventKind.STOP).Publish().RefCount();
var start = shared.Where(a => a.Kind == EventKind.START);
start.GroupByUntil(a => Unit.Default, a => stop)
.Select(newGroup =>
{
var creation = newGroup.Take(1);
var rightStream = shared.Where(a => a.Kind == EventKind.OTHER)
.TakeUntil(newGroup.LastOrDefaultAsync())
.Count();
var finalStream = stop.Take(1);
return creation.Zip(rightStream, finalStream, (a,b,c) => new { Count = b, Time = c.OccurenceTime - a.OccurenceTime });
})
.Concat()
.Subscribe(next =>
{
Console.WriteLine("Count = "+ next.Count + " | " + next.Time);
});
Without using Group/Window
with Take(1)
adding in the final of the composition the Repeat
operator, but could cause an undesired behavior, because of the "re-subscription" (will depend if it is a cold or a hot observable, and the Scheduler used).
Creating a custom implementation declaring your own extension method, is not as hard that seems, probably will be the best option, but take a while to implement.
The other problem with your composition is that will be impossible to get statistical data because you don't have a way to complete each new group in the GroupBy
operator.
I would suggest re-think about your approach, probably the solution will be combine time in someway. More information about statistics and Rx, check: http://www.codeproject.com/Tips/853256/Real-time-statistics-with-Rx-Statistical-Demo-App
Upvotes: 2
Reputation: 39222
To simplify the observable type (which I guess is your question?) you can do something like this (note the change to SelectMany
):
var pairs = windows.SelectMany(window =>
{
var duration = window
.Where(x=>x.Kind != EventKind.OTHER) // we only want START & STOP events, not OTHER events
.Buffer(2,1) // could use buffer(2) but this is more reliable if stream1 sometimes has multiple consecutive START events.
.Where(x => x.Count == 2 && x[1].Kind == EventKind.STOP && x[0].Kind == EventKind.START)
.Select(x => x[1].OccurenceTime - x[0].OccurenceTime), // compute the latency
var eventCount = window.Where(x=>x.Kind == EventKind.OTHER).Count() // count the number of OTHER events in the window
return duration.Zip(eventCount, (d, e) => new { EventCount = e, Duration = d });
}
);
Upvotes: 1