Reputation: 2431
Looking at https://eprystupa.wordpress.com/2009/12/18/detecting-running-highlow-prices-using-reactive-extensions-for-net/ it has an interesting code block:
var rnd = new Random();
var feed = Observable.Defer(() =>
Observable.Return(Math.Round(30.0 + rnd.NextDouble(), 2))
.Delay(TimeSpan.FromSeconds(1 * rnd.NextDouble())))
.Repeat();
// Daily low price feed
double min = double.MaxValue;
var feedLo = feed
.Where(p => p < min)
.Do(p => min = Math.Min(min, p))
.Select(p => "New LO: " + p);
// Daily high price feed
double max = double.MinValue;
var feedHi = feed
.Where(p => p > max)
.Do(p => max = Math.Max(max, p))
.Select(p => "New HI: " + p);
// Combine hi and lo in one feed and subscribe to it
feedLo.Merge(feedHi).Subscribe(Console.WriteLine);
The above is OK and does the job but the local variables max
and min
mean the code is quite specific whereas i would like to attach the NewLowHi code/indicator to an existing IObservable<double>
much like https://github.com/fiatsasia/Financier has:
public static IObservable<TSource> SimpleMovingAverage<TSource>(this IObservable<TSource> source, int period)
{
return source.Buffer(period, 1).Select(e => e.Average());
}
What would be the best practice to create a self contained NewLowHi indicator which i could subscribe to without using (or at least hiding internally) the local variables max
and min
?
Upvotes: 0
Views: 70
Reputation: 117064
The code that you referred to on the WordPress site has some flaws.
Because of the way that they created the feed
it is a hot observable in that every subscription will receive a different set of figures. So the feedLo
and the feedHi
observables will be working from different sets of variables.
But it gets worse. If two subscriptions are made to feedLo
, for example, then there will be two subscriptions to feed
but only one state variable for min
which means that the value coming out will be the minimum value of both subscriptions and not the minimum for each.
I'll show how to do this properly, but first your question is about how to encapsulate state. Here's how:
IObservable<T> feed =
Observable
.Defer(() =>
{
int state = 42;
return Observable... // define your observable here.
});
Now, the feed
source uses Random
for its state. We can go ahead and rewrite feed
using the above a pattern.
var feed =
Observable
.Defer(() =>
{
var rnd = new Random();
return
Observable
.Generate(
0, x => true, x => x,
x => Math.Round(30.0 + rnd.NextDouble(), 2),
x => TimeSpan.FromSeconds(rnd.NextDouble()));
});
I prefer to use Observable.Generate
than the Defer
/Return
/Delay
/Repeat
pattern.
Now for how to get the min and max values out.
I want an IObservable<(State state, double value)>
that gives me the high and low values from the one single subscription to the source observable. Here's what State
looks like:
public enum State
{
High,
Low,
}
Here's my observable:
IObservable<(State state, double value)> feedHighLow(IObservable<double> source) =>
source.Publish(xs => Observable.Merge(
xs.Scan(Math.Min).DistinctUntilChanged().Select(x => (state: State.Low, value: x)),
xs.Scan(Math.Max).DistinctUntilChanged().Select(x => (state: State.High, value: x))));
Now I can call feedHighLow(feed)
and get a stream of the High
/Low
values from a single subscription to the source feed. The Publish
call ensures a single subscription to the source and the Merge
means I can run two distinct observables to get the min and the max respectively.
I get results like this:
Upvotes: 3