fsl
fsl

Reputation: 831

combining one observable with latest from another observable

I'm trying to combine two observables whose values share some key.

I want to produce a new value whenever the first observable produces a new value, combined with the latest value from a second observable which selection depends on the latest value from the first observable.

pseudo code example:

var obs1 = Observable.Interval(TimeSpan.FromSeconds(1)).Select(x => Tuple.create(SomeKeyThatVaries, x)

var obs2 = Observable.Interval(TimeSpan.FromMilliSeconds(1)).Select(x => Tuple.create(SomeKeyThatVaries, x)

from x in obs1
  let latestFromObs2WhereKeyMatches = …
  select Tuple.create(x, latestFromObs2WhereKeyMatches)

Any suggestions?

Clearly this could be implemented by subcribing to the second observable and creating a dictionary with the latest values indexable by the key. But I'm looking for a different approach..

Usage scenario: one minute price bars computed from a stream of stock quotes. In this case the key is the ticker and the dictionary contains latest ask and bid prices for concrete tickers, which are then used in the computation.

(By the way, thank you Dave and James this has been a very fruitful discussion)

(sorry about the formatting, hard to get right on an iPad..)

Upvotes: 2

Views: 570

Answers (2)

Dave Sexton
Dave Sexton

Reputation: 2662

I'd like to know the purpose of a such a query. Would you mind describing the usage scenario a bit?

Nevertheless, it seems like the following query may solve your problem. The initial projections aren't necessary if you already have some way of identifying the origin of each value, but I've included them for the sake of generalization, to be consistent with your extremely abstract mode of questioning. ;-)

Note: I'm assuming that someKeyThatVaries is not shared data as you've shown it, which is why I've also included the term anotherKeyThatVaries; otherwise, the entire query really makes no sense to me.

var obs1 = Observable.Interval(TimeSpan.FromSeconds(1))
                     .Select(x => Tuple.Create(someKeyThatVaries, x));
var obs2 = Observable.Interval(TimeSpan.FromSeconds(.25))
                     .Select(x => Tuple.Create(anotherKeyThatVaries, x));

var results = obs1.Select(t => new { Key = t.Item1, Value = t.Item2, Kind = 1 })
                  .Merge(
              obs2.Select(t => new { Key = t.Item1, Value = t.Item2, Kind = 2 }))
                  .GroupBy(t => t.Key, t => new { t.Value, t.Kind })
                  .SelectMany(g =>
                    g.Scan(
                      new { X = -1L, Y = -1L, Yield = false },
                      (acc, cur) => cur.Kind == 1
                                  ? new { X = cur.Value, Y = acc.Y, Yield = true }
                                  : new { X = acc.X, Y = cur.Value, Yield = false })
                      .Where(s => s.Yield)
                      .Select(s => Tuple.Create(s.X, s.Y)));

Upvotes: 2

James World
James World

Reputation: 29806

...why are you looking for a different approach? Sounds like you are on the right lines to me. It's short, simple code... roughly speaking it will be something like:

var cache = new ConcurrentDictionary<long, long>();    
obs2.Subscribe(x => cache[x.Item1] = x.Item2);    
var results = obs1.Select(x => new {
    obs1 = x.Item2,
    cache.ContainsKey(x.Item1) ? cache[x.Item1] : 0
});

At the end of the day, C# is an OO language and the heavy lifting of the thread-safe mutable collections is already all done for you.

There may be fancy Rx approach (feels like joins might be involved)... but how maintainable will it be? And how will it perform?

$0.02

Upvotes: 2

Related Questions