Reputation: 905
Can someone help me to do a Synchronous Subscription to an IObserver, so that the calling method will block until the subscription is complete. eg:
Publisher
public static class Publisher {
public static IObservable<string> NonBlocking()
{
return Observable.Create<string>(
observable =>
{
Task.Run(() =>
{
observable.OnNext("a");
Thread.Sleep(1000);
observable.OnNext("b");
Thread.Sleep(1000);
observable.OnCompleted();
Thread.Sleep(1000);
});
return Disposable.Create(() => Console.WriteLine("Observer has unsubscribed"));
});
}
}
Subscriber
public static class Subscriber{
public static bool Subscribe()
{
Publisher.NonBlocking().Subscribe((s) =>
{
Debug.WriteLine(s);
}, () =>
{
Debug.WriteLine("Complete");
});
// This will currently return true before the subscription is complete
// I want to block and not Return until the Subscriber is Complete
return true;
}
}
Upvotes: 4
Views: 2477
Reputation: 18125
You'll need to use System.Reactive.Threading.Task
for this:
Turn your observable into a task...
var source = Publisher.NonBlocking()
.Do(
(s) => Debug.WriteLines(x),
() => Debug.WriteLine("Completed")
)
.LastOrDefault()
.ToTask();
Do(...).Subscribe()
is just like Subscribe(...)
. So Do
just adds some side-effects.
LastOrDefault
is in there because the Task
created by ToTask
will wait only for the first item from the source Observable
, and if no item is yielded it will fail (throw). So, LastOrDefault
effectively causes the Task
to wait until the source has completed, no matter what it yields.
So after we have a task, just wait on it:
task.Wait(); // blocking
Or use async / await:
await task; // non-blocking
Cory Nelson has made an excellent point:
In the latest version of C# and Visual Studio, you can actually await
an IObservable<T>
. This is a very cool feature, but it works in a slightly different manner than awaiting a Task
.
When you await a task, it causes the task to run. If a single instance of a task is awaited multiple times, that task will only be executed once. Observables are slightly different. You can think of an observable as an asynchronous function with multiple return values... each time you subscribe to an observable, the observable/function executes. Therefore these two pieces of code have different meaning:
Awaiting an Observable:
// Console.WriteLine will be invoked twice.
var source = Observable.Return(0).Do(Console.WriteLine);
await source; // Subscribe
await source; // Subscribe
Awaiting an Observable via a Task:
// Console.WriteLine will be invoked once.
var source = Observable.Return(0).Do(Console.WriteLine);
var task = source.ToTask();
await task; // Subscribe
await task; // Just yield the task's result.
So, in essence, awaiting an Observable works like this:
// Console.WriteLine will be invoked twice.
var source = Observable.Return(0).Do(Console.WriteLine);
await source.ToTask(); // Subscribe
await source.ToTask(); // Subscribe
However, the await observable
syntax does not work in Xamerin Studio (as of the time of this writing). If you are using Xamerin Studio, I would highly suggest that you use ToTask
at the very last possible moment, to simulate the behavior of Visual Studio's await observable
syntax.
Upvotes: 6