Reputation: 2010
I have this simple code, which:
Here's the code:
private void DisplayPoints()
{
var x = 0;
var ob = this.GeneratePoints();
ob
.Sample(TimeSpan.FromMilliseconds(500))
.SubscribeOn(ThreadPoolScheduler.Instance)
.ObserveOn(SynchronizationContext.Current)
.Subscribe(d => Console.WriteLine(d));
}
private IObservable<double> GeneratePoints()
{
return Observable.Create<double>(o => this.GeneratePoints(o));
}
private IDisposable GeneratePoints(IObserver<double> observer)
{
var i = 0;
while (true)
{
var value = random.Next(0, 100) * (1 / (double)random.Next(1, Math.Min(50, Math.Max(i, 1))));
observer.OnNext(value);
i++;
}
return Disposable.Empty;
}
However, nothing is ever output on console (i.e. anonymous observer is never called). If I remove Sample
operator, observer is called, although that behavior is clearly not intended (UI thread will get bombarded).
I am clearly missing something here. My intention is to generate data, push it via IObserver, and display some of it via UI.
Edit: since some people have misunderstood my intentions (even though they're clearly stated above), I should reiterate, what I'm trying to do:
double
values seems sufficient for my problem)Using IObservable
and Reactive Extensions seemed like a good solution for my problem.
Just to repeat: I won't return random numbers in real code - this was just a placeholder to get my intended behavior working.
Upvotes: 3
Views: 202
Reputation: 32192
You probably don't want to generate random number in a tight loop. Better to use a time interval. The below generates your random numbers every 200 millseconds.
IObservable<double> observable =
Observable.Interval(TimeSpan.FromMillSeconds(200))
.Select((t,i) => random.Next(0, 100)
* (1 / (double)random.Next(1, Math.Min(50, Math.Max(i, 1)))))
The code that Enigmativity wrote you is also effectively tight loop. The point he makes about your error pushing out values in your subscribe process is also correct. The minimal changes you would have to make to your code to get it working would be.
private static Task GeneratePoints(IObserver<double> observer, CancellationToken token)
{
return Task.Run(() =>
{
var i = 0;
var random = new Random();
while ( true )
{
token.ThrowIfCancellationRequested();
var value = random.Next(0, 100) * ( 1 / ( double ) random.Next(1, Math.Min(50, Math.Max(i, 1))) );
observer.OnNext(value);
i++;
}
});
}
sometime later
Observable.Create<double>((observer, token) => GeneratePoints(observer, token));
Note the cancellation token being passed. When the subscriber to the sequence unsubscribes this token will be set and the loop will terminate.
However this is a lot of work and Enigmativities answer is simpler and abstracts the above code away for you. It is still useful to know how to do this manually for more complex cases.
Upvotes: 1
Reputation: 117084
I suspect that your issue is to do with the fact that Throttle
introduces concurrency through the DefaultScheduler.Instance
internally and that your implementation of IDisposable GeneratePoints(IObserver<double> observer)
is non-standard.
Try reimplementing IObservable<double> GeneratePoints()
like this:
private IObservable<double> GeneratePoints()
{
return Observable.Generate<int, double>(
0,
i => true,
i => i + 1,
i => random.Next(0, 100) * (1 / (double)random.Next(1, Math.Min(50, Math.Max(i, 1)))));
}
That might help.
The issue comes from your observable directly pushing out values during the subscribe process. You should always try to use the standard, built-in operators when creating your observables. The above code uses the built-in Generate
operator, so it should play nicer with your code.
Upvotes: 1
Reputation: 3464
Throttle
will only let a value through when there has been a gap of at least 500 milliseconds (in your case). As GeneratePoints
is pushing values far faster than this nothing will happen. Sample
may be the operator you want, in this case it will produce one value every 500ms.
Source: 1111111111111111111----------111---111111
Throttle (5): -----------------------1-----------------
Sample (5): ----1----1----1----1---1------------1----1
Upvotes: 0