Reputation: 331
My system has a lot of status objects - connections status, cpu load, logged users and so on. All of such events are merged into a single observable stream.
I want to make a admin utility to show actual status of the system and to show all of that counters.
How can I create an observable that would have a list of last changed values of all of the counters?
Here is a marble diagram I want to have:
s1 (cpu): -s1_v1----s1_v1---s1_v2
s2 (users count): --s2_v1--s2_v1---------s2_v2
s3 (some cat purr/sec) ----s3_v1----s3_v1----s3_v1
flatten sequence: s1_v1-s2_v1-s3_v1-s2_v1-s1_v1-s3_v1-s1_v2-s3_v1-s2_v2
desired output:
s1_v1|s1_v1|s1_v1|s1_v2|s1_v2
s2_v1|s2_v1|s2_v1|s2_v2
s3_v1|s3_v1|s3_v1
So far I can to this implementation:
public class StatusImplementation
{
public static IObservable<IDictionary<TKey, TValue>> Status<TKey, TValue>(
params IObservable<KeyValuePair<TKey, TValue>>[] observables)
{
var uniqueObservables = observables
.Select(x => x.Publish().RefCount().DistinctUntilChanged());
return Observable.Create<IDictionary<TKey, TValue>>(o =>
{
var compositeDisposable = new CompositeDisposable();
var dictionary = new Dictionary<TKey, TValue>();
foreach (var uniqueObservable in uniqueObservables)
{
var disposable = uniqueObservable.Subscribe(x =>
{
if (dictionary.ContainsKey(x.Key) && !dictionary[x.Key].Equals(x.Value))
{
var newDictionary = new Dictionary<TKey, TValue>(dictionary);
newDictionary[x.Key] = x.Value;
dictionary = newDictionary;
}
else
{
dictionary.Add(x.Key, x.Value);
}
o.OnNext(dictionary);
});
compositeDisposable.Add(disposable);
}
return compositeDisposable;
});
}
}
And here is a usage example:
var f1 = Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Select(x => new KeyValuePair<string, long>("event 1", x));
var f2 = Observable.Interval(TimeSpan.FromMilliseconds(1200))
.Select(x => new KeyValuePair<string, long>("event 2", x));
var f3 = Observable.Interval(TimeSpan.FromMilliseconds(1250))
.Select(x => new KeyValuePair<string, long>("event 3", x));
var combined = f1.Merge(f2).Merge(f3);
StatusImplementation.Status(f1, f2, f3)
.Select(x => string.Join(", ", x.ToList()))
.Dump("\tstatus");
combined.Dump("normal");
And the Dump function (from great book by Lee Campbell):
public static void Dump<T>(this IObservable<T> source, string name)
{
source.Subscribe(
i => Console.WriteLine("{0}-->{1}", name, i),
ex => Console.WriteLine("{0} failed-->{1}", name, ex.Message),
() => Console.WriteLine("{0} completed", name));
}
So the question is: are there any better way to implement this functionality? Probably not using Dictionary inside the observable?
Thank you.
Upvotes: 3
Views: 680
Reputation: 117064
So if you start with your combined
observable - which can be produced from any number of source observables - then you can do this:
var query =
combined
.Scan(
new Dictionary<string, long>() as IDictionary<string, long>,
(d, kvp) =>
{
var d2 = new Dictionary<string, long>(d) as IDictionary<string, long>;
d2[kvp.Key] = kvp.Value;
return d2;
});
This will return a series of dictionary objects for each value produced by the combined
observable. Each dictionary object will be a distinct instance - if the same instance was returned you'd have ever changing values which may cause threading issues.
Upvotes: 1
Reputation: 13495
You can use Observable.CombineLatest
, which will emit the latest values from all observables each time a new value arrives. Then you don't need to use the dictionary.
var f1 = Observable.Interval(TimeSpan.FromMilliseconds(1000))
.Select(x = > new KeyValuePair < string, long > ("event 1", x));
var f2 = Observable.Interval(TimeSpan.FromMilliseconds(1200))
.Select(x = > new KeyValuePair < string, long > ("event 2", x));
var f3 = Observable.Interval(TimeSpan.FromMilliseconds(1250))
.Select(x = > new KeyValuePair < string, long > ("event 3", x));
var combined = f1.Merge(f2).Merge(f3);
Observable.CombineLatest(f1, f2, f3)
.Select(x = > string.Join(", ", x.ToList()))
.Dump("\tstatus");
combined.Dump("normal");
Upvotes: 1