Benjol
Benjol

Reputation: 66531

'WaitFor' an observable

I'm in a situation where I have a list of Tasks that I'm working through (enable drive, change position, wait for stop, disable).

The 'wait for' monitors an IObservable<Status>, which I want to wait on (so I can thread it through ContinueWith and the other tasks).

I started out with the following tasks inside the OnNext handling of the subscriber, but that was just ugly. What I've now come up with is this extension method:

public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
    var tcs = new TaskCompletionSource<T>();
    source
        .Where(pred)
        .DistinctUntilChanged()
        .Take(1)  //OnCompletes the observable, subscription will self-dispose
        .Subscribe(val => tcs.TrySetResult(val),
                    ex => tcs.TrySetException(ex),
                    () => tcs.TrySetCanceled());

    return tcs.Task;
}

(UPDATED with svick's suggestion of handling OnCompleted and OnError)

Questions:

Upvotes: 11

Views: 5054

Answers (2)

James Manning
James Manning

Reputation: 13579

Not 100% sure about this, but from reading the Rx 2.0 beta blog post, I would think that if you can use async/await, you could "return await source.FirstAsync(pred)" or without async, "return source.FirstAsync(pred).ToTask()"

http://blogs.msdn.com/b/rxteam/archive/2012/03/12/reactive-extensions-v2-0-beta-available-now.aspx

sshot of linqpad using rx 2.0 and firstasync

Upvotes: 3

Enigmativity
Enigmativity

Reputation: 117027

At the very least I would change this extension method to be this:

public static Task<T> WaitFor<T>(this IObservable<T> source, Func<T, bool> pred)
{
    return
        source
            .Where(pred)
            .DistinctUntilChanged()
            .Take(1)
            .ToTask();
}

Using .ToTask() is much better than to introduce TaskCompletionSource. You require a reference to the System.Reactive.Threading.Tasks namespace to get the .ToTask() extension method.

Also, DistinctUntilChanged is redundant in this code. You only ever get one value so it must be distinct by default.

Now, my next suggestion may be a little controversial. This extension is a bad idea because it hides the true semantics of what is going on.

If I had these two snippits of code:

var t = xs.WaitFor(x => x > 10);

Or:

var t = xs.Where(x => x > 10).Take(1).ToTask();

I would generally prefer the second snippit as it clearly shows me what is going on - I don't need to remember the semantics of WaitFor.

Unless you made the name of WaitFor more descriptive - perhaps TakeOneAsTaskWhere - then you are taking the clarity of using the operators out of the code that uses it and making the code harder to manage.

Doesn't the following make it easier to remember the semantics?

var t = xs.TakeOneAsTaskWhere(x => x > 10);

The bottom-line for me is that Rx operators are meant to be composed, not encapsulated, but if you're going to encapsulate them then their meaning must be clear.

I hope this helps.

Upvotes: 13

Related Questions