Reputation: 1439
My goal is to have two subscribers from an observable but I am only interested in the Latest item in the event stream. I want others to be discarded. Consider this like a stock price screen that updates every 1 second and disregarding any intermediate values. Here's my code:
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100)) // fast event source
.Latest().ToObservable().ToEvent();
ob.OnNext += (l =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000); // slow processing of events
Console.WriteLine("Latest: " + l);
});
ob.OnNext += (l =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000); // slow processing of events
Console.WriteLine("Latest1: " + l);
// subject.OnNext(l);
});
However as a result of above code, despite I attached two events (it doesn't matter even if you use the subscribe notation either) Only the first subscription is invoked periodically. Second one doesn't run at all. Why is that so ?
Upvotes: 2
Views: 1077
Reputation: 117027
I don't think you understand what .Latest()
does.
public static IEnumerable<TSource> Latest<TSource>(
this IObservable<TSource> source
)
The enumerable sequence that returns the last sampled element upon each iteration and subsequently blocks until the next element in the observable source sequence becomes available.
Note that it blocks waiting for the next element from the observable.
So when you turned the IObservable<>
into an IEnumerable<>
using .Latest()
you then had to use .ToObservable()
to turn it back into an IObservable<>
to be able to call .ToEvent()
. That's where it fell over.
The problem with this code is that you're creating code that blocks.
If you just do it this, it works:
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100)).ToEvent();
There's no need to call .Latest()
as you're always getting the latest value from an observable. You can never get an earlier value. It's an observable, not a time machine.
What I also don't understand is why you are calling .ToEvent()
in any case. what's the need?
Just do this:
var ob = Observable.Interval(TimeSpan.FromMilliseconds(100));
ob.Subscribe(l =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000); // slow processing of events
Console.WriteLine("Latest: " + l);
});
ob.Subscribe(l =>
{
Console.WriteLine(Thread.CurrentThread.ManagedThreadId);
Thread.Sleep(1000); // slow processing of events
Console.WriteLine("Latest1: " + l);
});
Upvotes: 2
Reputation: 10783
First I think your requirement is one of the following:
The code for 1)
var ob = Observable.Interval(TimeSpan.FromMilliseconds(1000)) // fast event source
.Publish();
ob.Connect();
The code for 2)
var ob = Observable.Interval(TimeSpan.FromMilliseconds(1000)) // fast event source
.Replay(1);
ob.Connect();
The code for 3)
var ob = Observable.Interval(TimeSpan.FromMilliseconds(1000)) // fast event source
.Replay(1);
ob.Connect();
var latest = ob.Take(1);
The code for 4) can be this, but there are subtle behaviors around what you consider a window.
var ob = Observable.Interval(TimeSpan.FromMilliseconds(200)) // fast event source
.Replay(1);
//Connect the hot observable
ob.Connect();
var bufferedSource = ob.Buffer(TimeSpan.FromSeconds(1))
.Where(buffer => buffer.Any())
.Select(buffer => buffer.Last());
The code for 5) can be found on James World's blog http://www.zerobugbuild.com/?p=192 and is quite common in many banking applications in London.
Upvotes: 4