Veksi
Veksi

Reputation: 3758

Translating a piece of asynchronous C# code to F# (with Reactive Extensions and FSharpx)

After a while again I was leafing through some Rx code here in SO and came across to a question How to implement polling using Observables?.

Intriquing from the F# perspective is the use of (self-defined) Either type, or Choice in F# parlor. Alas, I get the translation wrong on many levels, beginning already from the types.

But rather than just quit attempting to translate, perhaps this could be of use as a broader education tool. Could someone lend a functioning F# brain and help translate the following piece(s) of C# code?

I have used F# TaskBuilder, but noticed just recently that it doesn't implemented the TryWith part of a computation builder in TaskBuilder. So, it may become difficult to translate the first piece of C# code (or perhaps one should go the async route), there's another version provided in the source SO link that doesn't use an asynchornous structure. Luck regarding this question, or unluck, I'm not able to translate that either.

The erros I get are of the sort:

Type mismatch. Expecting a IScheduler -> CancellationToken -> Task but given a 'a * CancellationToken -> 'b The type 'IScheduler' does not match the type ''a * CancellationToken'

Type mismatch. Expecting a IObservable<Choice<'TResult,exn>> but given a IObservable<'TResult> The resulting type would be infinite when unifying ''TResult' and 'Choice<'TResult,exn>'

and so forth.

-

    [<Extension>]
    type ObservableExtensions() = 
        static member inline poll<'TResult, 'TArg>(asyncFunction: 'TArg -> IObservable<'TResult>, parameterFactory: 'TArg, interval:TimeSpan, scheduler: IScheduler): IObservable<Choice<'TResult, exn>> =

            Observable.Create<'TResult>(fun(observer:IObserver<'TResult>) ->
                let task = new TaskBuilder()                                     
                let t(ctrl:IScheduler, ct:CancellationToken) = task {
                    while not <| ct.IsCancellationRequested do
                        try
                            let! result = asyncFunction(parameterFactory)
                            observer.OnNext(Choice1Of2(result))
                        with ex ->
                            observer.OnNext(Choice2Of2(ex))
                        do! ctrl.Sleep(interval, ct)
                }

                scheduler.ScheduleAsync(Func<IScheduler, CancellationToken, Task>(t)))

And the corresponding C# code (1):

public IObservable<Either<Exception, TResult>> Poll<TResult, TArg>(
Func<TArg, IObservable<TResult>> asyncFunction,
Func<TArg> parameterFactory,
TimeSpan interval,
IScheduler scheduler)
{
return Observable.Create<Either<Exception, TResult>>(observer =>
{
    return scheduler.ScheduleAsync(async (ctrl, ct) => {
        while(!ct.IsCancellationRequested)
        {
            try
            {
                var result = await asyncFunction(parameterFactory());
                observer.OnNext(Either.Right<Exception,TResult>(result));
            }
            catch(Exception ex)
            {
                observer.OnNext(Either.Left<Exception, TResult>(ex));
            }
            await ctrl.Sleep(interval, ct);
        }
    });        
});    
}

Alternative C# code

public IObservable<Either<Exception, TResult>> Poll2<TResult, TArg>(
    Func<TArg, IObservable<TResult>> asyncFunction,
    Func<TArg> parameterFactory,
    TimeSpan interval,
    IScheduler scheduler)
{
    return Observable.Create<Either<Exception, TResult>>(
        observer =>
            Observable.Defer(() => asyncFunction(parameterFactory()))
                      .Select(Either.Right<Exception, TResult>)
                      .Catch<Either<Exception, TResult>, Exception>(
                        ex => Observable.Return(Either.Left<Exception, TResult>(ex)))
                      .Concat(Observable.Defer(
                        () => Observable.Empty<Either<Exception, TResult>>()
                                        .Delay(interval, scheduler)))
                      .Repeat().Subscribe(observer));
}

Just a note that this bears some similarity to How to write a generic, recursive extension method in F#? and consequently to Write an Rx “RetryAfter” extension method extension method

<edit: To complement theexcellent answer of MisterMetaphor, I'll add a version without interval here too.

type Observable with
    static member Poll2(f: unit -> IObservable<_>, interval: TimeSpan, sched: IScheduler) : IObservable<_> =
        Observable.Create<_>(fun observer ->
            Observable.Defer(f)
                .Select(Choice1Of2)
                .Catch(Choice2Of2 >> Observable.Return)
                .Concat(Observable.Defer(fun _ -> Observable.Empty().Delay(interval, sched)))
                .Repeat()
                .Subscribe(observer))

    static member Poll2(f: 'a -> IObservable<_>, argFactory: unit -> 'a, interval: TimeSpan, sched: IScheduler) =
        Observable.Poll2(argFactory >> f, interval, sched)

I'm not sure if this is needed, but a due note on a subtle bug on .Subscribein F# (at least of Visual F# 3.1.1.0) as discovered in SO here, more here).

Upvotes: 2

Views: 251

Answers (1)

MisterMetaphor
MisterMetaphor

Reputation: 6008

If you don't absolutely have to use Observable.Create, you can achieve similar results with Observable.Interval:

type Observable with
    static member Poll(f : unit -> IObservable<_>, interval : TimeSpan, sched : IScheduler) : IObservable<_> =
        Observable.Interval(interval, sched)
            .SelectMany(fun _ ->
                Observable.Defer(f)
                    .Select(Choice1Of2)
                    .Catch(Choice2Of2 >> Observable.Return))

    // An overload that matches your original function
    static member Poll(f : 'a -> IObservable<_>, argFactory : unit -> 'a, interval : TimeSpan, sched : IScheduler) =
        Observable.Poll(argFactory >> f, interval, sched)

What I like about this implementation is that you don't have to go down to the level of directly using schedulers and Observable.Create. I think you should always use existing combinators/operators unless you absolutely have to do otherwise.

Also, Observable.Interval uses SchedulePeriodic (evident here) which is probably more efficient and correct than your Task-based implementation.

Upvotes: 2

Related Questions