Reputation: 3943
I expected the program below to echo any key presses until you hit z
, but it doesn't terminate when you hit z
, and only echos every other keypress. What have I done wrong?
using System.Reactive;
using System.Reactive.Linq;
public class Printer : IObserver<char>
{
public void OnNext(char x)
{
Console.WriteLine(x);
}
public void OnError(Exception x)
{
}
public void OnCompleted()
{
}
}
class Program
{
static IObservable<char> keys = Observable.Defer(() =>Observable.Start(() =>Console.ReadKey().KeyChar)).Repeat(); //https://stackoverflow.com/questions/10675451/iobservable-of-keys-pressed
public static int Main()
{
IObserver<char> x = new Printer();
keys.Subscribe(x);
keys.Where(b => b == 'z').Wait();
return 0;
}
}
Upvotes: 1
Views: 54
Reputation: 12667
Okay, so two issues, both separate:
What you have here is a cold observable.
It's only when you start observing it that it produces values.
Conversely, each time you subscribe to it, it represents a new stream - similar to how IEnumerable
evaluates every time you try to take items. You can see this clearly if you put a breakpoint inside the Observable.Defer
.
You can have those two streams share just one subscription to the source, i.e, the key-press observable. So we convert the cold observable into a hot one.
The Wait
method is:
Waits for the observable sequence to complete and returns the last element of the sequence. If the sequence terminates with an OnError notification, the exception is thrown.
So it's going to wait till the sequence completes, i.e., an OnCompleted
has been called down the observable chain. So, we use TakeUntil
so that the sequence completes only when a condition is met (pressing 'z').
public static int Main()
{
var keys_stream = keys.Publish().RefCount(); // share
IObserver<char> x = new Printer();
keys_stream.Subscribe(x);
keys_stream.TakeUntil(b => b == 'z').Wait(); //wait until z
return 0;
}
Upvotes: 2