Reputation: 57
I need a good idea on how to distribute updates for aggregation...
Lets say I have an IObservable of an Id and a value producing a never ending stream of messages (5-10,000/sec). Now I want to calculate a lot of aggregates (for example sum) for distribution to other system on a regular interval - lets say every 10 seconds for each aggregate. The aggregates are based on the Id of the Tuple (the string), but could potentially fall into more than one aggregate (An aggregate defines what ids should be included - so will overlap).
There will be a few thousand aggregate definitions, so does anyone have any ideas on how to solve this?
Conceptually:
public struct Update
{
public string Id { get; }
public int Value { get; }
}
public class Aggregate
{
Dictionary<string, Update> latestValues = new Dictionary<string, Update>();
public void AddUpdate(Update update)
{
latestValues[update.Id] = update;
}
public int CalculateSum()
{
return latestValues.Values.Select(v => v.Value).Sum();
}
}
UPDATE:
The intention of the question was to simplify the real problem - maybe I did not do that good a job - sorry for that. Let's say I have multiple IOT devices that produces a temperature and on a regular basis reports this temperature (the stream of updates). Different users can then choose to see an aggregate (eg. average) value of a subset of the devices. So one customer might want to see the average of device 1, 2 and 3, while another customer might want to see an average of device 2, 3 and 4 etc. (the aggregate definitions)
Upvotes: 1
Views: 266
Reputation: 10783
I think what you are asking is how can you create live read models* with Rx.
Given what I can guess from you question, I think you want to be able to update some current state, with each update message. In the case of your CalculateSum
method, you cant just sum all message's Value
property as some will have been intended to update/override an existing value.
So given this assumption, it looks like GroupBy
will be your friend. If you first split the observable sequence of values in to sub-sequences you can divide and conquer the problem.
input.GroupBy(i=>i.Id)
If we consider just a single stream of value belonging to the same Id, what should the sum be for each value?
-1--1--2-
In this simple case the answer is always just the value passed straight through. i.e.
input -1--1--2-
result -1--1--2-
however when we look at two sequences producing values it become slightly harder to calculate
inputA -1-1-2--------
inputB --1-2-2-3-5-2-
result -122344-5-7-4-
Here we need to see what the delta was for each value in a sequence and push that delta to the result. That can be visualized as such
inputA -1-1-2--------
delta -1-0-1--------
inputB --1-2-2-3-5-2-
delta --1-1-0-1-2-(-3)-
result -122344-5-7-4-
To create this kind of delta projection you could write something like
input.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur, Delta = cur - acc.CurrentValue }))
.Select(acc => acc.Delta);
Putting that together the code could look like this:
void Main()
{
var testScheduler = new TestScheduler();
var input = testScheduler.CreateColdObservable<Update>(
ReactiveTest.OnNext(010, new Update("a", 1)), //1
ReactiveTest.OnNext(020, new Update("b", 1)), //2
ReactiveTest.OnNext(030, new Update("c", 3)), //5
ReactiveTest.OnNext(040, new Update("a", 1)), //5
ReactiveTest.OnNext(050, new Update("b", 2)), //6
ReactiveTest.OnNext(060, new Update("a", 2)), //7
ReactiveTest.OnNext(070, new Update("b", 2)), //7
ReactiveTest.OnNext(080, new Update("b", 3)), //8
ReactiveTest.OnNext(090, new Update("b", 5)), //10
ReactiveTest.OnNext(100, new Update("b", 2)) //7
);
var currentSum = input.GroupBy(i => i.Id)
.SelectMany(grp => grp.Scan(new { CurrentValue = 0, Delta = 0 }, (acc, cur) => new { CurrentValue = cur.Value, Delta = cur.Value - acc.CurrentValue }))
.Select(acc => acc.Delta)
.Scan((acc, cur) => acc + cur);
var observer = testScheduler.CreateObserver<int>();
var subscription = currentSum.Subscribe(observer);
testScheduler.Start();
subscription.Dispose();
ReactiveAssert.AreElementsEqual(new[]
{
ReactiveTest.OnNext(010, 1),
ReactiveTest.OnNext(020, 2),
ReactiveTest.OnNext(030, 5),
ReactiveTest.OnNext(040, 5),
ReactiveTest.OnNext(050, 6),
ReactiveTest.OnNext(060, 7),
ReactiveTest.OnNext(070, 7),
ReactiveTest.OnNext(080, 8),
ReactiveTest.OnNext(090, 10),
ReactiveTest.OnNext(100, 7)}
,
observer.Messages);
}
// Define other methods and classes here
public struct Update
{
public Update(string id, int value)
{
Id = id;
Value = value;
}
public string Id { get; }
public int Value { get; }
}
If you want to create multiple aggregates, then each new aggregate is just a query like above. You could aim to optimize by sharing/publishing the sequence after it has been grouped, but I would first ensure that this is required by profiling.
*Readmodels in the CQRS/ES terminology.
Upvotes: 2