Steztric
Steztric

Reputation: 2942

C# Rx Observable producing random results

Consider the following program;

class Program
{
    static IObservable<int> GetNumbers()
    {
        var observable = Observable.Empty<int>();
        foreach (var i in Enumerable.Range(1, 10))
        {
            observable = observable.Concat(Observable.FromAsync(() => Task.Run(() =>
            {
                Console.WriteLine($"Producing {i}");
                Thread.Sleep(1000);
                return i;
            })));
        }

        return observable;
    }

    static async Task LogNumbers(IObservable<int> observable)
    {
        var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
        await observable;
        subscription.Dispose();
    }

    static void Main(string[] args)
    {
        LogNumbers(GetNumbers()).Wait();
        Console.WriteLine("Finished");
        Console.ReadLine();
    }
}

It produces the following output

Producing 1
Producing 1
Producing 2
Consuming 1
Producing 2
Producing 3
Consuming 2
Producing 3
Producing 4
Consuming 3
Producing 4
Producing 5
Consuming 4
Producing 5
Producing 6
Consuming 5
Producing 6
Producing 7
Consuming 6
Producing 7
Producing 8
Consuming 7
Producing 8
Producing 9
Consuming 8
Producing 9
Producing 10
Consuming 9
Producing 10
Finished

It writes out two of every "Producing x" statements and one "Consuming x" statement. Why does it do this? Why does it never write out the expected final "Consuming 10" statement?

Upvotes: 1

Views: 204

Answers (2)

Enigmativity
Enigmativity

Reputation: 117037

Gideon nailed the issue for you, but as I started putting some hints in the comments I thought it might be good to post a complete solution. Try this:

static IObservable<int> GetNumbers() =>
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(i => (int)i + 1)
        .Do(i => Console.WriteLine($"Producing {i}"))
        .Take(10);

static Task LogNumbers(IObservable<int> observable) =>
    observable
        .Do(i => Console.WriteLine($"Consuming {i}"))
        .ToArray()
        .ToTask();

static void Main(string[] args)
{
    LogNumbers(GetNumbers()).Wait();
    Console.WriteLine("Finished");
    Console.ReadLine();
}

Or, even more cleanly:

static IObservable<int> GetNumbers() =>
    Observable
        .Interval(TimeSpan.FromSeconds(1.0))
        .Select(i => (int)i + 1)
        .Do(i => Console.WriteLine($"Producing {i}"))
        .Take(10);

static IObservable<int> LogNumbers(IObservable<int> observable) =>
    observable
        .Do(i => Console.WriteLine($"Consuming {i}"));

static async Task Main(string[] args)
{
    await LogNumbers(GetNumbers());
    Console.WriteLine("Finished");
    Console.ReadLine();
}

You can await observables directly.

Upvotes: 3

Gideon Engelberth
Gideon Engelberth

Reputation: 6155

You are getting two copies of the Producing lines because you are subscribing twice. Most likely, you are not getting the consuming 10 because the first subscription is being cancelled when the second subscription ends. I would not be surprised if you sometimes did get the Consuming 10, just because the Tasks run in a different order that time.

static async Task LogNumbers(IObservable<int> observable)
{
    //This is the first subscription
    var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));

    //This is the second subscription
    await observable;

    subscription.Dispose();
}

The way your GetNumbers function is written, each subscription to the observable will trigger its own set of 10 tasks to run, and thus its own set of outputs. The first subscription also monitors the produced values and outputs a Consuming line. The second subscription does nothing with the produced values, since you did not use the value of await observable, but does cause a second set of tasks to run.

You could eliminate the second subscription by either using Publish().RefCount() on the parameter to LogNumbers or by instead using a TaskCompletionSource and marking it complete from the OnError and OnComplete functions you currently aren't using in the first subscription. Those would look something like this:

static async Task LogNumbersWithRefCount(IObservable<int> observable)
{
    observable = observable.Publish().RefCount();
    var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"));
    await observable;
    subscription.Dispose();
}

static async Task LogNumbersTCS(IObservable<int> observable)
{
    var t = new TaskCompletionSource<object>()
    var subscription = observable.Subscribe(i => Console.WriteLine($"Consuming {i}"),
                       ex => t.TrySetException(ex),
                       () => t.TrySetResult(null));
    return t.Task;
}

Upvotes: 5

Related Questions