Onur Gumus
Onur Gumus

Reputation: 1439

Why subscribing an observable in RX.NET by Latest only accepts 1 subscriber?

My goal is to have two subscribers from an observable but I am only interested in the Latest item in the event stream. I want others to be discarded. Consider this like a stock price screen that updates every 1 second and disregarding any intermediate values. Here's my code:

    var ob = Observable.Interval(TimeSpan.FromMilliseconds(100)) // fast event source
        .Latest().ToObservable().ToEvent();

    ob.OnNext += (l =>
                      {
                          Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
                          Thread.Sleep(1000); // slow processing of events
                          Console.WriteLine("Latest: " + l);
                                                    });

    ob.OnNext += (l =>
        {
            Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
            Thread.Sleep(1000); // slow processing of events
            Console.WriteLine("Latest1: " + l);
            //  subject.OnNext(l);
        });

However as a result of above code, despite I attached two events (it doesn't matter even if you use the subscribe notation either) Only the first subscription is invoked periodically. Second one doesn't run at all. Why is that so ?

Upvotes: 2

Views: 1077

Answers (2)

Enigmativity
Enigmativity

Reputation: 117027

I don't think you understand what .Latest() does.

public static IEnumerable<TSource> Latest<TSource>(
    this IObservable<TSource> source
)

The enumerable sequence that returns the last sampled element upon each iteration and subsequently blocks until the next element in the observable source sequence becomes available.

Note that it blocks waiting for the next element from the observable.

So when you turned the IObservable<> into an IEnumerable<> using .Latest() you then had to use .ToObservable() to turn it back into an IObservable<> to be able to call .ToEvent(). That's where it fell over.

The problem with this code is that you're creating code that blocks.

If you just do it this, it works:

var ob = Observable.Interval(TimeSpan.FromMilliseconds(100)).ToEvent();

There's no need to call .Latest() as you're always getting the latest value from an observable. You can never get an earlier value. It's an observable, not a time machine.

What I also don't understand is why you are calling .ToEvent() in any case. what's the need?

Just do this:

var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));

ob.Subscribe(l =>
{
    Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(1000); // slow processing of events
    Console.WriteLine("Latest: " + l);
});

ob.Subscribe(l =>
{
    Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
    Thread.Sleep(1000); // slow processing of events
    Console.WriteLine("Latest1: " + l);
});

Upvotes: 2

Lee Campbell
Lee Campbell

Reputation: 10783

First I think your requirement is one of the following:

  1. You want to only get future values
  2. or you want to get the most recent value (if any) and any future values
  3. or you only want the most recent value (if any)
  4. or you only want to sample the ticks and get the value at each second
  5. or you have a slow consumer and you need perform load shedding (like in a GUI)

The code for 1)

var ob = Observable.Interval(TimeSpan.FromMilliseconds(1000)) // fast event source
    .Publish();
ob.Connect();

The code for 2)

var ob = Observable.Interval(TimeSpan.FromMilliseconds(1000)) // fast event source
    .Replay(1);
ob.Connect();    

The code for 3)

var ob = Observable.Interval(TimeSpan.FromMilliseconds(1000)) // fast event source
    .Replay(1);
ob.Connect();  
var latest = ob.Take(1); 

The code for 4) can be this, but there are subtle behaviors around what you consider a window.

var ob = Observable.Interval(TimeSpan.FromMilliseconds(200)) // fast event source
    .Replay(1);
//Connect the hot observable
ob.Connect();

var bufferedSource = ob.Buffer(TimeSpan.FromSeconds(1))
    .Where(buffer => buffer.Any())
    .Select(buffer => buffer.Last());  

The code for 5) can be found on James World's blog http://www.zerobugbuild.com/?p=192 and is quite common in many banking applications in London.

Upvotes: 4

Related Questions