Bent Rasmussen
Bent Rasmussen

Reputation: 5702

Idiomatic Rx or not?

Small question - is this code idiomatic Rx or not - and if not, what should change about it? Note - the code was cooked up in LINQPad for testing purposes and possible later use as "boilerplate":

IObservable<byte[]> GenerateRandomDataChunks(IScheduler scheduler, int chunkSize = 10)
{
    return
        Observable.Create<byte[]>(
            observer => {
                var cancel = new CancellationDisposable();
                scheduler.Schedule(() => {
                    try // outer capture of exceptions => OnError
                    {
                        var rnd = new Random();
                        while (!cancel.Token.IsCancellationRequested) // => cancellation
                        {
                            scheduler.Yield();

                            var ms = rnd.Next(100, 500);
                            Thread.Sleep(ms); // introduce artificial lag for testing purposes
                            //scheduler.Sleep(TimeSpan.FromMilliseconds(ms)); // test simulation doesn't work - why not?

                            var data = new byte[chunkSize];
                            rnd.NextBytes(data);

                            //var r = rnd.Next();
                            //if (r > 450 && r < 460)
                            //  throw new Exception("foobar");

                            observer.OnNext(data); // give back next computed value => OnNext
                        }
                        observer.OnCompleted(); // terminated naturally => OnCompleted
                    }
                    catch (Exception ex)
                    {
                        observer.OnError(ex); // handle exception => OnError
                    }
                    finally
                    {
                        // TODO dispose any resources we might have
                    }
                });
                return cancel;
            }
        );
}

Cancellation appears to work nicely. For testing purposes I tried to insert lag time but it didn't work using the IScheduler.Sleep (example ThreadPoolScheduler).

Upvotes: 0

Views: 123

Answers (1)

James World
James World

Reputation: 29786

You can use Observable.Generate like this. Note in this variation and the next, the provision of a scheduler if one is not supplied and the consequential (and more idiomatic) change in order of parameters to put the scheduler last:

IObservable<byte[]> GenerateRandomDataChunks(
    int chunkSize = 10, IScheduler scheduler = null
{
    var rnd = new Random();
    return Observable.Generate<object,byte[]>(
        null, _ => true, _ => _, _ => {
            var data = new byte[chunkSize];
            rnd.NextBytes(data);
            return data;
        },
        // delete next line entirely to remove lag
        _ => TimeSpan.FromMilliseconds(rnd.Next(100, 500)), 
        scheduler ?? Scheduler.Default);
}

The approach you followed can work too but needs a little tidying. You don't need to OnComplete or OnError. Disposal of the subscription only signifies OnNext calls should cease. The async/await syntax also makes things a little cleaner and gives you a cancellation token tied to subscription lifetime. This is handled for you by Observable.Create. You definitely don't want to be calling Thread.Sleep - although I appreciate this was for testing - I show correct usage of Scheduler.Sleep below.

IObservable<byte[]> GenerateRandomDataChunks(
    int chunkSize = 10, IScheduler scheduler = null)
{
    var rnd = new Random();
    scheduler = scheduler ?? Scheduler.Default;

    return Observable.Create<byte[]>(async (o, ct) => {
        while(!ct.IsCancellationRequested)
        {
            var ms = rnd.Next(100, 500);
            await scheduler.Sleep(TimeSpan.FromMilliseconds(ms), ct);

            // replace the above with this to yield instead of lag
            // await scheduler.Yield(ct);

            ct.ThrowIfCancellationRequested();

            var data = new byte[chunkSize];
            rnd.NextBytes(data);
            o.OnNext(data);
        }
    });
}   

Upvotes: 2

Related Questions