fahadash
fahadash

Reputation: 3281

Subscribers are missing messages; Is this a bug with Rx or its me doing it wrong?

I have a single Window WPF application with the following constructor

        numbers = Observable.Generate(DateTime.Now,
                                         time => true,
                                         time => DateTime.Now,
                                         time => { return new     Random(DateTime.Now.Millisecond).NextDouble() * 99 + 2; },
                                         time => TimeSpan.FromSeconds(1.0));


        numbers.ObserveOnDispatcher()
            .Subscribe(s => list1.Items.Add(s.ToString("##.00")));

        numbers.Where(n => n < 10).ObserveOnDispatcher().
            Subscribe(s => list2.Items.Add(s.ToString("##.00")));

Now here is the screenshot of the lists - Notice 3.76 is missing from the left list... This behavior is intermittent.

Image

Upvotes: 1

Views: 232

Answers (1)

Enigmativity
Enigmativity

Reputation: 117027

The short answer is that you are doing it wrong. Rx is working perfectly.

When you create an observable you are creating the definition of a sequence of values over time, not the actual sequence of values over time. This means that whenever you have a subscriber to the observable you are creating new instances of the observable for each subscriber.

So, in your case you have two instances of this sequence operating:

var numbers =
    Observable
        .Generate(
            DateTime.Now,
            time => true,
            time => DateTime.Now,
            time => new Random(DateTime.Now.Millisecond)
                .NextDouble() * 99 + 2,
            time => TimeSpan.FromSeconds(1.0));

Now, since you are subscribing to this observable twice in immediate succession the two instances of this observable would be trying to generate values at almost the same time. So the value DateTime.Now.Millisecond would be the same most of the time, but now always. The value returned from new Random(x).NextDouble() is the same for the same x, so hence why most of the time you get the same value from the two instances of the observable. It's just when DateTime.Now.Millisecond is different that you get two different values and it appears that the subscribers are missing values.


Here's an alternative version that should work as you expected initially:

var rnd = new Random((int)DateTime.Now.Ticks);

var numbers =
    Observable
        .Generate(0, n => true, n => 0,
            n => rnd.NextDouble() * 99 + 2,
            n => TimeSpan.FromSeconds(1.0));

var publishedNumbers = numbers.Publish();

publishedNumbers
    .ObserveOnDispatcher()
    .Subscribe(s => list1.Items.Add(s.ToString("##.00")));

publishedNumbers
    .Where(n => n < 10)
    .ObserveOnDispatcher()
    .Subscribe(s => list2.Items.Add(s.ToString("##.00")));

publishedNumbers.Connect();

Upvotes: 7

Related Questions