Lukie
Lukie

Reputation: 905

Reactive Extensions Synchronous Subscription

Can someone help me to do a Synchronous Subscription to an IObserver, so that the calling method will block until the subscription is complete. eg:

Publisher

public static class Publisher {
public static IObservable<string> NonBlocking()
    {
        return Observable.Create<string>(
            observable =>
            {
                Task.Run(() =>
                {
                    observable.OnNext("a");
                    Thread.Sleep(1000);
                    observable.OnNext("b");
                    Thread.Sleep(1000);
                    observable.OnCompleted();
                    Thread.Sleep(1000);
                });

                return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
            });
    }

}

Subscriber

public static class Subscriber{
public static bool Subscribe()
    {
        Publisher.NonBlocking().Subscribe((s) =>
        {
            Debug.WriteLine(s);
        }, () =>
        {
            Debug.WriteLine("Complete");
        });
        // This will currently return true before the subscription is complete
        // I want to block and not Return until the Subscriber is Complete
        return true;
    }

}

Upvotes: 4

Views: 2477

Answers (1)

cwharris
cwharris

Reputation: 18125

You'll need to use System.Reactive.Threading.Task for this:

Turn your observable into a task...

var source = Publisher.NonBlocking()
    .Do(
        (s) => Debug.WriteLines(x),
        ()  => Debug.WriteLine("Completed")
    )
    .LastOrDefault()
    .ToTask();

Do(...).Subscribe() is just like Subscribe(...). So Do just adds some side-effects.

LastOrDefault is in there because the Task created by ToTask will wait only for the first item from the source Observable, and if no item is yielded it will fail (throw). So, LastOrDefault effectively causes the Task to wait until the source has completed, no matter what it yields.

So after we have a task, just wait on it:

task.Wait(); // blocking

Or use async / await:

await task; // non-blocking

Edit:

Cory Nelson has made an excellent point:

In the latest version of C# and Visual Studio, you can actually await an IObservable<T>. This is a very cool feature, but it works in a slightly different manner than awaiting a Task.

When you await a task, it causes the task to run. If a single instance of a task is awaited multiple times, that task will only be executed once. Observables are slightly different. You can think of an observable as an asynchronous function with multiple return values... each time you subscribe to an observable, the observable/function executes. Therefore these two pieces of code have different meaning:

Awaiting an Observable:

// Console.WriteLine will be invoked twice.
var source = Observable.Return(0).Do(Console.WriteLine);
await source; // Subscribe
await source; // Subscribe

Awaiting an Observable via a Task:

// Console.WriteLine will be invoked once.
var source = Observable.Return(0).Do(Console.WriteLine);
var task = source.ToTask();
await task; // Subscribe
await task; // Just yield the task's result.

So, in essence, awaiting an Observable works like this:

// Console.WriteLine will be invoked twice.
var source = Observable.Return(0).Do(Console.WriteLine);
await source.ToTask(); // Subscribe
await source.ToTask(); // Subscribe

However, the await observable syntax does not work in Xamerin Studio (as of the time of this writing). If you are using Xamerin Studio, I would highly suggest that you use ToTask at the very last possible moment, to simulate the behavior of Visual Studio's await observable syntax.

Upvotes: 6

Related Questions