Reputation: 165
I have tried to write console observable as in the example below, but it doesn't work. There are some issues with subscriptions. How to solve these issues?
static class Program
{
static async Task Main(string[] args)
{
// var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount(); // works
// var observable = FromConsole().Publish().RefCount(); // doesn't work
var observable = FromConsole(); // doesn't work
observable.Subscribe(Console.WriteLine);
await Task.Delay(1500);
observable.Subscribe(Console.WriteLine);
await new TaskCompletionSource().Task;
}
static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(Console.ReadLine());
}
});
}
}
If I used Observable.Interval
, it subscribes two times and I have two outputs for one input. If I used any version of FromConsole
, I have one subscription and a blocked thread.
Upvotes: 1
Views: 806
Reputation: 117064
To start with, it is usually best to avoid using Observable.Create
to create observables - it's certainly there for that purpose, but it can create observables that don't behave like you think they should because of their blocking nature. As you've discovered!
Instead, when possible, use the built-in operators to create observables. And that can be done in this case.
My version of FromConsole
is this:
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
Observable.Start
effectively is like Task.Run
for observables. It calls Console.ReadLine()
for us without blocking.
The Observable.Defer
/Repeat
pair repeatedly calls Observable.Start(() => Console.ReadLine())
. Without the Defer
it would just call Observable.Start
and repeatedly return the one string forever.
That solves that.
Now, the second issue is that you want to see the value from the Console.ReadLine()
output by both subscriptions to the FromConsole()
observable.
Due to the way Console.ReadLine
works, you are getting values from each subscription, but only one at a time. Try this code:
static async Task Main(string[] args)
{
var observable = FromConsole();
observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
await new TaskCompletionSource<int>().Task;
}
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
When I run that I get this kind of output:
1:ddfd
2:dfff
1:dfsdfs
2:sdffdfd
1:sdfsdfsdf
The reason for this is that each subscription starts up a fresh subscription to FromConsole
. So you have two calls to Console.ReadLine()
they effectively queue and each one only gets each alternate input. Hence the alternation between 1
& 2
.
So, to solve this you simply need the .Publish().RefCount()
operator pair.
Try this:
static async Task Main(string[] args)
{
var observable = FromConsole().Publish().RefCount();
observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
await new TaskCompletionSource<int>().Task;
}
static IObservable<string> FromConsole() =>
Observable
.Defer(() =>
Observable
.Start(() => Console.ReadLine()))
.Repeat();
I now get:
1:Hello
2:Hello
1:World
2:World
In a nutshell, it's the combination of the non-blocking FromConsole
observable and the use of .Publish().RefCount()
that makes this work the way you expect.
Upvotes: 7
Reputation: 43535
The problem is that the Console.ReadLine
is a blocking method, so the subscription to the FromConsole
sequence blocks indefinitely, so the await Task.Delay(1500);
line is never reached. You can solve this problem by reading from the console asynchronously, offloading the blocking call to a ThreadPool
thread:
static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(await Task.Run(() => Console.ReadLine()));
}
});
}
You can take a look at this question about why there is no better solution than offloading.
As a side note, subscribing to a sequence without providing an onError
handler is not a good idea, unless having the process crash with an unhandled exception is an acceptable behavior for your app. It is especially problematic with sequences produced with Observable.Create<T>(async
, because it can lead to weird/buggy behavior like this one: Async Create hanging while publishing observable.
Upvotes: 0
Reputation: 140
You need to return a observable without the publish. You can then subscribe to it and do your thing further. Here is an example. When I run it i can readline multiple times.
public class Program
{
static void Main(string[] args)
{
FromConsole().Subscribe(x =>
{
Console.WriteLine(x);
});
}
static IObservable<string> FromConsole()
{
return Observable.Create<string>(async observer =>
{
while (true)
{
observer.OnNext(Console.ReadLine());
}
});
}
}
Upvotes: -1