Reputation: 7746
Say I have a set of stock symbols:
StockTicker stockTicker =
new StockTicker("MSFT", "APPL", "YHOO", "GOOG");
stockTicker.OnPriceChanged += (sender, args ) =>
{
Console.WriteLine(
"{0}: Price - {1} Volume - {2}",
args.StockSymbol, args.Price, args.Volume);
};
I can subscribe to the events and get it as a hot IObservable
:
IObservable<PriceChangedEventArgs> priceChangedObservable =
Observable.FromEventPattern<PriceChangedEventArgs>(
eventHandler => stockTicker.OnPriceChanged += eventHandler,
eventHandler => stockTicker.OnPriceChanged -= eventHandler )
.Select( eventPattern => eventPattern.EventArgs );
priceChangedObservable.Subscribe(args => Console.WriteLine(
"{0}: Price - {1} Volume - {2}",
args.StockSymbol, args.Price, args.Volume ) );
This prints the quote for each symbol as a new quote comes in: In sequence each event is a single qute YHOO 25.33, MSFT 127, AAPL 175, GOOG 1126
etc.
How do I modify the code above so that each "quote" is a combination of all the current quotes of each individual quote? (YHOO 25.33, MSFT 127, GOOG 1126, AAPL 175)
. A "quote" is now the state of the last quote seen from all of them, served as one "quote".
I see that Rx has a Zip
operator, but the semantics of that appear to need n
IObservables
to zip. Here there are multiple quotes coming through the same IObservable
?
So the Console.WriteLine
would print all n-quotes subscribed to as a single point-event (and only print when all of them have a value)
Upvotes: 1
Views: 78
Reputation: 29233
If these are random events arriving in any order and you are only interested in the latest value at any time, then perhaps you want a dictionary that is kept updated using the Scan
operator:
source
/* keep track of the latest value for each symbol */
.Scan(new Dictionary<string, decimal>(), (a, b) => a[b.StockSymbol] = b.Price)
/* example logic to wait until they all have values */
.Where(dict => stockTicker.StockSymbols.All(dict.ContainsKey))
.Subscribe(dict => { ... });
If the number of events you want to buffer remains constant and you know the source is cycling between the events, the Buffer
operator might work for you:
source
/* burst values periodically */
.Buffer(count)
.Subscribe(list => { ... })
Upvotes: 3
Reputation: 6299
It is not very convinient to think about everything in terms of data pipes. There is a great library to DynamicData which intoduce the concept of reactive collections.
I assume PriceChangedEventArgs.StockSymbol
is string
var cache = new SourceCache<PriceChangedEventArgs,string>(x => x.StockSymbol);
cache.PopulateFrom(priceChangedObservable);
now you can monitor all the items with
cache.Connect()
.Select(cache.Items)
.Subscribe({...});
The simple solution is to use Scan
, but with reactive collections you can apply transformations like concat, sort, filter etc
Upvotes: 1