mnn
mnn

Reputation: 2010

Throttling IObservable causes observer not being invoked

I have this simple code, which:

Here's the code:

private void DisplayPoints()
{
    var x = 0;
    var ob = this.GeneratePoints();
    ob
      .Sample(TimeSpan.FromMilliseconds(500))
      .SubscribeOn(ThreadPoolScheduler.Instance)
      .ObserveOn(SynchronizationContext.Current)
      .Subscribe(d => Console.WriteLine(d));
}

private IObservable<double> GeneratePoints()
{
    return Observable.Create<double>(o => this.GeneratePoints(o));
}

private IDisposable GeneratePoints(IObserver<double> observer)
{
    var i = 0;
    while (true)
    {
        var value = random.Next(0, 100) * (1 / (double)random.Next(1, Math.Min(50, Math.Max(i, 1))));

        observer.OnNext(value);

        i++;
    }

    return Disposable.Empty;
}

However, nothing is ever output on console (i.e. anonymous observer is never called). If I remove Sample operator, observer is called, although that behavior is clearly not intended (UI thread will get bombarded).

I am clearly missing something here. My intention is to generate data, push it via IObserver, and display some of it via UI.

Edit: since some people have misunderstood my intentions (even though they're clearly stated above), I should reiterate, what I'm trying to do:

Using IObservable and Reactive Extensions seemed like a good solution for my problem.

Just to repeat: I won't return random numbers in real code - this was just a placeholder to get my intended behavior working.

Upvotes: 3

Views: 202

Answers (3)

bradgonesurfing
bradgonesurfing

Reputation: 32192

You probably don't want to generate random number in a tight loop. Better to use a time interval. The below generates your random numbers every 200 millseconds.

IObservable<double> observable =
     Observable.Interval(TimeSpan.FromMillSeconds(200))
          .Select((t,i) => random.Next(0, 100) 
                      * (1 / (double)random.Next(1, Math.Min(50, Math.Max(i, 1)))))

The code that Enigmativity wrote you is also effectively tight loop. The point he makes about your error pushing out values in your subscribe process is also correct. The minimal changes you would have to make to your code to get it working would be.

    private static Task GeneratePoints(IObserver<double> observer, CancellationToken token)
    {
        return Task.Run(() =>
        {
            var i = 0;
            var random = new Random();
            while ( true )
            {
                token.ThrowIfCancellationRequested();

                var value = random.Next(0, 100) * ( 1 / ( double ) random.Next(1, Math.Min(50, Math.Max(i, 1))) );

                observer.OnNext(value);

                i++;
            }
        });
    }

sometime later

    Observable.Create<double>((observer, token) => GeneratePoints(observer, token));

Note the cancellation token being passed. When the subscriber to the sequence unsubscribes this token will be set and the loop will terminate.

However this is a lot of work and Enigmativities answer is simpler and abstracts the above code away for you. It is still useful to know how to do this manually for more complex cases.

Upvotes: 1

Enigmativity
Enigmativity

Reputation: 117084

I suspect that your issue is to do with the fact that Throttle introduces concurrency through the DefaultScheduler.Instance internally and that your implementation of IDisposable GeneratePoints(IObserver<double> observer) is non-standard.

Try reimplementing IObservable<double> GeneratePoints() like this:

private IObservable<double> GeneratePoints()
{
    return Observable.Generate<int, double>(
        0,
        i => true,
        i => i + 1,
        i => random.Next(0, 100) * (1 / (double)random.Next(1, Math.Min(50, Math.Max(i, 1)))));
}

That might help.

The issue comes from your observable directly pushing out values during the subscribe process. You should always try to use the standard, built-in operators when creating your observables. The above code uses the built-in Generate operator, so it should play nicer with your code.

Upvotes: 1

Matthew Finlay
Matthew Finlay

Reputation: 3464

Throttle will only let a value through when there has been a gap of at least 500 milliseconds (in your case). As GeneratePoints is pushing values far faster than this nothing will happen. Sample may be the operator you want, in this case it will produce one value every 500ms.

Source:       1111111111111111111----------111---111111
Throttle (5): -----------------------1-----------------
Sample (5):   ----1----1----1----1---1------------1----1

Upvotes: 0

Related Questions