okrumnow
okrumnow

Reputation: 2416

Create Extension method for IObservable

I need to bring some data into an IObservable stream. I want to use an extension method on IObservable to do this, but i am not sure how to do it.

The data is produced by a simple class that has this signature:

interface Process<TIn, TOut> {
    void Do(TIn data);
    event Action<TOut> Result;
}

i.e. to start the process I have to call Do(...) and the result is sent to the event Result(...). This can't be changed!

I want to bring this data into an Rx process, that handles user input in this way:

subject.AsObservable<string>   // this produces the observable of user inputs
       .Throttle(TimeSpan.FromMilliseconds(typingDelay))
       .DistinctUntilChanged()

       .Process(myProcess)     // this is what I need help for

       .Switch()
       .Subscribe(myConsumer)

This is from adopted from the standard example of delayed user input triggering a web service (or something else that is long-running and has to be async) in the Hands-on Labs.

Whenever the user continues to enter, all observables that are still "underway" must be cancelled (therefore the Switch()). So my Process() has to return an IObservable<IObservable<TOut>> to make the switch work correctly.

I'm really stuck here. Does anyone has a hint for me how to write this Process(...) extension method?

Upvotes: 1

Views: 523

Answers (2)

Gideon Engelberth
Gideon Engelberth

Reputation: 6155

Basically, your Process extension method needs to subscribe to the source observable and output another. Something like:

public static IObservable<TOut> Process<TIn, TOut>(this IObservable<TIn> source, IProcess<TIn, TOut> processor)
{
    return Observable.Create((IObserver<TOut> obs) =>
        {
            var sourceSub = source.Subscribe(processor.Do);
            var outSub = Observable.FromEvent<TOut>(h => processor.Result += h,
                                                    h => processor.Result -= h)
                                   .Subscribe(obs);
            return new System.Reactive.Disposables.CompositeDisposable(sourceSub, outSub);
        });
}

I don't think that Switch will do what you are hoping it will. All Switch will do is end one subscription when another observable becomes available. However, ending the subscription cannot cancel any running calls to the interface method Process.Do. The cancellation you are talking about will need to be handled by the interface implementation itself.

Upvotes: 1

Enigmativity
Enigmativity

Reputation: 117027

I've modified your Process interface to use generics properly and provided a dummy implementation of the interface like so:

interface IProcess<T, R>
{
    void Do(T data);
    event Action<IEnumerable<R>> Result;
}

public class ProcessImpl<T, R> : IProcess<T, R>
{
    public void Do(T data) { }
    public event Action<IEnumerable<R>> Result;
}

Now you can write the extension method like this:

public static IObservable<IObservable<R>> Process<T, R>(this T @this)
{
    return Observable.Create<IObservable<R>>(o =>
    {
        var p = new ProcessImpl<T, R>();
        var subscription =
            Observable
                .FromEvent<IEnumerable<R>>(
                    h => p.Result += h,
                    h => p.Result -= h)
                .Take(1)
                .Select(x => x.ToObservable())
                .Subscribe(o);
        p.Do(@this);
        return subscription;
    });
}

Does this work for you?

Upvotes: 1

Related Questions