Reputation: 5702
Small question - is this code idiomatic Rx or not - and if not, what should change about it? Note - the code was cooked up in LINQPad for testing purposes and possible later use as "boilerplate":
IObservable<byte[]> GenerateRandomDataChunks(IScheduler scheduler, int chunkSize = 10)
{
return
Observable.Create<byte[]>(
observer => {
var cancel = new CancellationDisposable();
scheduler.Schedule(() => {
try // outer capture of exceptions => OnError
{
var rnd = new Random();
while (!cancel.Token.IsCancellationRequested) // => cancellation
{
scheduler.Yield();
var ms = rnd.Next(100, 500);
Thread.Sleep(ms); // introduce artificial lag for testing purposes
//scheduler.Sleep(TimeSpan.FromMilliseconds(ms)); // test simulation doesn't work - why not?
var data = new byte[chunkSize];
rnd.NextBytes(data);
//var r = rnd.Next();
//if (r > 450 && r < 460)
// throw new Exception("foobar");
observer.OnNext(data); // give back next computed value => OnNext
}
observer.OnCompleted(); // terminated naturally => OnCompleted
}
catch (Exception ex)
{
observer.OnError(ex); // handle exception => OnError
}
finally
{
// TODO dispose any resources we might have
}
});
return cancel;
}
);
}
Cancellation appears to work nicely. For testing purposes I tried to insert lag time but it didn't work using the IScheduler.Sleep (example ThreadPoolScheduler).
Upvotes: 0
Views: 123
Reputation: 29786
You can use Observable.Generate
like this. Note in this variation and the next, the provision of a scheduler if one is not supplied and the consequential (and more idiomatic) change in order of parameters to put the scheduler last:
IObservable<byte[]> GenerateRandomDataChunks(
int chunkSize = 10, IScheduler scheduler = null
{
var rnd = new Random();
return Observable.Generate<object,byte[]>(
null, _ => true, _ => _, _ => {
var data = new byte[chunkSize];
rnd.NextBytes(data);
return data;
},
// delete next line entirely to remove lag
_ => TimeSpan.FromMilliseconds(rnd.Next(100, 500)),
scheduler ?? Scheduler.Default);
}
The approach you followed can work too but needs a little tidying. You don't need to OnComplete or OnError. Disposal of the subscription only signifies OnNext calls should cease. The async/await syntax also makes things a little cleaner and gives you a cancellation token tied to subscription lifetime. This is handled for you by Observable.Create. You definitely don't want to be calling Thread.Sleep
- although I appreciate this was for testing - I show correct usage of Scheduler.Sleep
below.
IObservable<byte[]> GenerateRandomDataChunks(
int chunkSize = 10, IScheduler scheduler = null)
{
var rnd = new Random();
scheduler = scheduler ?? Scheduler.Default;
return Observable.Create<byte[]>(async (o, ct) => {
while(!ct.IsCancellationRequested)
{
var ms = rnd.Next(100, 500);
await scheduler.Sleep(TimeSpan.FromMilliseconds(ms), ct);
// replace the above with this to yield instead of lag
// await scheduler.Yield(ct);
ct.ThrowIfCancellationRequested();
var data = new byte[chunkSize];
rnd.NextBytes(data);
o.OnNext(data);
}
});
}
Upvotes: 2