morleyc
morleyc

Reputation: 2431

Self contained Reactive Extensions helper methods that require state

Looking at https://eprystupa.wordpress.com/2009/12/18/detecting-running-highlow-prices-using-reactive-extensions-for-net/ it has an interesting code block:

var rnd = new Random();
var feed = Observable.Defer(() =>
    Observable.Return(Math.Round(30.0 + rnd.NextDouble(), 2))
    .Delay(TimeSpan.FromSeconds(1 * rnd.NextDouble())))
    .Repeat();

// Daily low price feed
double min = double.MaxValue;
var feedLo = feed
    .Where(p => p < min)
    .Do(p => min = Math.Min(min, p))
    .Select(p => "New LO: " + p);

// Daily high price feed
double max = double.MinValue;
var feedHi = feed
    .Where(p => p > max)
    .Do(p => max = Math.Max(max, p))
    .Select(p => "New HI: " + p);

// Combine hi and lo in one feed and subscribe to it
feedLo.Merge(feedHi).Subscribe(Console.WriteLine);

The above is OK and does the job but the local variables max and min mean the code is quite specific whereas i would like to attach the NewLowHi code/indicator to an existing IObservable<double> much like https://github.com/fiatsasia/Financier has:

public static IObservable<TSource> SimpleMovingAverage<TSource>(this IObservable<TSource> source, int period)
{
    return source.Buffer(period, 1).Select(e => e.Average());
}

What would be the best practice to create a self contained NewLowHi indicator which i could subscribe to without using (or at least hiding internally) the local variables max and min?

Upvotes: 0

Views: 70

Answers (1)

Enigmativity
Enigmativity

Reputation: 117064

The code that you referred to on the WordPress site has some flaws.

Because of the way that they created the feed it is a hot observable in that every subscription will receive a different set of figures. So the feedLo and the feedHi observables will be working from different sets of variables.

But it gets worse. If two subscriptions are made to feedLo, for example, then there will be two subscriptions to feed but only one state variable for min which means that the value coming out will be the minimum value of both subscriptions and not the minimum for each.

I'll show how to do this properly, but first your question is about how to encapsulate state. Here's how:

IObservable<T> feed =
    Observable
        .Defer(() =>
        {
            int state = 42;
            return Observable... // define your observable here.
        });

Now, the feed source uses Random for its state. We can go ahead and rewrite feed using the above a pattern.

var feed =
    Observable
        .Defer(() =>
        {
            var rnd = new Random();
            return
                Observable
                    .Generate(
                        0, x => true, x => x,
                        x => Math.Round(30.0 + rnd.NextDouble(), 2),
                        x => TimeSpan.FromSeconds(rnd.NextDouble()));
        });

I prefer to use Observable.Generate than the Defer/Return/Delay/Repeat pattern.

Now for how to get the min and max values out.

I want an IObservable<(State state, double value)> that gives me the high and low values from the one single subscription to the source observable. Here's what State looks like:

public enum State
{
    High,
    Low,
}

Here's my observable:

IObservable<(State state, double value)> feedHighLow(IObservable<double> source) =>
    source.Publish(xs => Observable.Merge(
        xs.Scan(Math.Min).DistinctUntilChanged().Select(x => (state: State.Low, value: x)),
        xs.Scan(Math.Max).DistinctUntilChanged().Select(x => (state: State.High, value: x))));

Now I can call feedHighLow(feed) and get a stream of the High/Low values from a single subscription to the source feed. The Publish call ensures a single subscription to the source and the Merge means I can run two distinct observables to get the min and the max respectively.

I get results like this:

output

Upvotes: 3

Related Questions