fahadash
fahadash

Reputation: 3281

A bizarre scenario with Observable which leads observers to miss items in OnNext

I have an application which keeps track of milk price fluctuation by states, users when they see milk price at their local grocery store can come to a website and submit that price (much like GasBuddy).

When they submit the price, I receive it in my Observable<MilkPrice>, here is milk price implementation

class MilkPrice {
    string State {get;set;}
    decimal Price {get;set;}
 }

When I receive the price, I maintain a list of MilkPriceTracker object each of an State, If object for particular state doesn't exist, I add it to my List object, MilkPriceTracker is a view model which takes IObservable in the constructor and this is how I construct the Tracker object

if (!_statesTracker.Any(s => s.State.Equals(receivedInOnNext.State)) { 
      _statesTracker.Add(new MilkPriceTracker (mainObservable.Where(s.State.Equals(receivedInOnNext.State));
     }

I don't add all 50 states to my list by default because, I only like to have states that have any price reported, Now here is the fun part, say 1 thousand users from Virginia starts reporting prices at the same time....

When first guy reports, he will construct the new Tracker object and his message will be missed because that has already been consumed, I could try passing Observable.Concat(Observable.Return(alreadyReceivedObj), mainObservable.Wher.....)) to the constructor

I tried it but because of the high frequency of messages, some are still missed by the time mainObservable's subscription.OnNext is handled and new Tracker is constructed.

How do I "pause" the mainObservable and tell it to "DVR" it or "Buffer" it until I resume ? so I don't miss any messages.

If I am not clear enough, feel free to tell me

Thanks.

Upvotes: 0

Views: 110

Answers (2)

fahadash
fahadash

Reputation: 3281

I feel like dancing. Thanks to the nicest guy in the world Lee Campbell

I wrote the following code to solve my issue.

mainObservable
            .GroupBy(k => k.State)
            .Select(g => new MilkPriceTracker(g.Key, g))
            .ObserveOnDispatcher(DispatcherPriority.DataBind)
            .Subscribe(_statesTrackers.Add);

Upvotes: 0

Lee Campbell
Lee Campbell

Reputation: 10783

It doesn't sound bizzare, more like a lack of understanding of Reactive Programming. ;-)

Try using GroupBy.

Upvotes: 2

Related Questions