dgzargo
dgzargo

Reputation: 165

How to make an IObservable<string> from console input

I have tried to write console observable as in the example below, but it doesn't work. There are some issues with subscriptions. How to solve these issues?

static class Program
{
    static async Task Main(string[] args)
    {
        // var observable = Observable.Interval(TimeSpan.FromMilliseconds(1000)).Publish().RefCount(); // works
        // var observable = FromConsole().Publish().RefCount(); // doesn't work
        var observable = FromConsole(); // doesn't work
        observable.Subscribe(Console.WriteLine);
        await Task.Delay(1500);
        observable.Subscribe(Console.WriteLine);
        await new TaskCompletionSource().Task;
    }

    static IObservable<string> FromConsole()
    {
        return Observable.Create<string>(async observer =>
        {
            while (true)
            {
                observer.OnNext(Console.ReadLine());
            }
        });
    }
}

If I used Observable.Interval, it subscribes two times and I have two outputs for one input. If I used any version of FromConsole, I have one subscription and a blocked thread.

Upvotes: 1

Views: 806

Answers (3)

Enigmativity
Enigmativity

Reputation: 117064

To start with, it is usually best to avoid using Observable.Create to create observables - it's certainly there for that purpose, but it can create observables that don't behave like you think they should because of their blocking nature. As you've discovered!

Instead, when possible, use the built-in operators to create observables. And that can be done in this case.

My version of FromConsole is this:

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();

Observable.Start effectively is like Task.Run for observables. It calls Console.ReadLine() for us without blocking.

The Observable.Defer/Repeat pair repeatedly calls Observable.Start(() => Console.ReadLine()). Without the Defer it would just call Observable.Start and repeatedly return the one string forever.

That solves that.

Now, the second issue is that you want to see the value from the Console.ReadLine() output by both subscriptions to the FromConsole() observable.

Due to the way Console.ReadLine works, you are getting values from each subscription, but only one at a time. Try this code:

static async Task Main(string[] args)
{
    var observable = FromConsole();
    observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
    observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
    await new TaskCompletionSource<int>().Task;
}

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();
        

When I run that I get this kind of output:

1:ddfd
2:dfff
1:dfsdfs
2:sdffdfd
1:sdfsdfsdf

The reason for this is that each subscription starts up a fresh subscription to FromConsole. So you have two calls to Console.ReadLine() they effectively queue and each one only gets each alternate input. Hence the alternation between 1 & 2.

So, to solve this you simply need the .Publish().RefCount() operator pair.

Try this:

static async Task Main(string[] args)
{
    var observable = FromConsole().Publish().RefCount();
    observable.Select(x => $"1:{x}").Subscribe(Console.WriteLine);
    observable.Select(x => $"2:{x}").Subscribe(Console.WriteLine);
    await new TaskCompletionSource<int>().Task;
}

static IObservable<string> FromConsole() =>
    Observable
        .Defer(() =>
            Observable
                .Start(() => Console.ReadLine()))
        .Repeat();
        

I now get:

1:Hello
2:Hello
1:World
2:World

In a nutshell, it's the combination of the non-blocking FromConsole observable and the use of .Publish().RefCount() that makes this work the way you expect.

Upvotes: 7

Theodor Zoulias
Theodor Zoulias

Reputation: 43535

The problem is that the Console.ReadLine is a blocking method, so the subscription to the FromConsole sequence blocks indefinitely, so the await Task.Delay(1500); line is never reached. You can solve this problem by reading from the console asynchronously, offloading the blocking call to a ThreadPool thread:

static IObservable<string> FromConsole()
{
    return Observable.Create<string>(async observer =>
    {
        while (true)
        {
            observer.OnNext(await Task.Run(() => Console.ReadLine()));
        }
    });
}

You can take a look at this question about why there is no better solution than offloading.

As a side note, subscribing to a sequence without providing an onError handler is not a good idea, unless having the process crash with an unhandled exception is an acceptable behavior for your app. It is especially problematic with sequences produced with Observable.Create<T>(async, because it can lead to weird/buggy behavior like this one: Async Create hanging while publishing observable.

Upvotes: 0

ruben450
ruben450

Reputation: 140

You need to return a observable without the publish. You can then subscribe to it and do your thing further. Here is an example. When I run it i can readline multiple times.

public class Program
{

    static void Main(string[] args)
    {
        FromConsole().Subscribe(x =>
        {
            Console.WriteLine(x);
        });
    }

    static IObservable<string> FromConsole()
    {
        return Observable.Create<string>(async observer =>
        {
            while (true)
            {
                observer.OnNext(Console.ReadLine());
            }
        });
    }
}

Upvotes: -1

Related Questions