Ivan
Ivan

Reputation: 7746

Rx subscribe FromEvents merge all

Say I have a set of stock symbols:

StockTicker stockTicker = 
  new StockTicker("MSFT", "APPL", "YHOO", "GOOG");
stockTicker.OnPriceChanged += (sender, args ) => 
{
  Console.WriteLine(
    "{0}: Price - {1}  Volume - {2}", 
    args.StockSymbol, args.Price, args.Volume);
};

I can subscribe to the events and get it as a hot IObservable:

IObservable<PriceChangedEventArgs> priceChangedObservable = 
  Observable.FromEventPattern<PriceChangedEventArgs>(
    eventHandler => stockTicker.OnPriceChanged += eventHandler,
    eventHandler => stockTicker.OnPriceChanged -= eventHandler )
      .Select( eventPattern => eventPattern.EventArgs );

priceChangedObservable.Subscribe(args => Console.WriteLine( 
  "{0}: Price - {1}  Volume - {2}", 
    args.StockSymbol, args.Price, args.Volume ) );

This prints the quote for each symbol as a new quote comes in: In sequence each event is a single qute YHOO 25.33, MSFT 127, AAPL 175, GOOG 1126 etc.

How do I modify the code above so that each "quote" is a combination of all the current quotes of each individual quote? (YHOO 25.33, MSFT 127, GOOG 1126, AAPL 175). A "quote" is now the state of the last quote seen from all of them, served as one "quote".

I see that Rx has a Zip operator, but the semantics of that appear to need n IObservables to zip. Here there are multiple quotes coming through the same IObservable?

So the Console.WriteLine would print all n-quotes subscribed to as a single point-event (and only print when all of them have a value)

Upvotes: 1

Views: 78

Answers (2)

Theodoros Chatzigiannakis
Theodoros Chatzigiannakis

Reputation: 29233

If these are random events arriving in any order and you are only interested in the latest value at any time, then perhaps you want a dictionary that is kept updated using the Scan operator:

source
    /* keep track of the latest value for each symbol */
    .Scan(new Dictionary<string, decimal>(), (a, b) => a[b.StockSymbol] = b.Price)

    /* example logic to wait until they all have values */
    .Where(dict => stockTicker.StockSymbols.All(dict.ContainsKey))

    .Subscribe(dict => { ... });

If the number of events you want to buffer remains constant and you know the source is cycling between the events, the Buffer operator might work for you:

source
    /* burst values periodically */
    .Buffer(count)

    .Subscribe(list => { ... }) 

Upvotes: 3

Dmitrii Dovgopolyi
Dmitrii Dovgopolyi

Reputation: 6299

It is not very convinient to think about everything in terms of data pipes. There is a great library to DynamicData which intoduce the concept of reactive collections.

I assume PriceChangedEventArgs.StockSymbol is string

var cache = new SourceCache<PriceChangedEventArgs,string>(x => x.StockSymbol);
cache.PopulateFrom(priceChangedObservable);

now you can monitor all the items with

cache.Connect()
  .Select(cache.Items)
  .Subscribe({...});

The simple solution is to use Scan, but with reactive collections you can apply transformations like concat, sort, filter etc

Upvotes: 1

Related Questions