Leon Zhou
Leon Zhou

Reputation: 635

Getting the latest item in an observable sequence using RX in C#

take the following as an example:

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();

What I'm trying to achieve here is to obtain the value of the latest item in the sequence at any given time "synchronously". Which means extensions like FirstAsync can't make it up for me.

The StartWith and Replay bit ensures that there will always be a value, and the RefCount bit is necessary in my actual code to detect when I can do some disposal actions.

So to simulate this "any given time" part, let's try getting the latest value after 5 seconds:

Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    // Try to get latest value from "ob" here.
});

So with a 5 second delay, I need to get the value 5 out of the sequence and these are what I have tried so far with no success:

  1. ob.First() - returns 500
  2. ob.Latest().Take(1) - same as above
  3. ob.MostRecent(-1).First() - same as above
  4. ob.MostRecent(-1) - gives me an IEnumerable<long> full of "500"
  5. ob.Last() - never returns because it's waiting for the sequence to complete which it never will
  6. ob.Latest().Last() - same as above
  7. ob.ToTask().Result - same as above
  8. ob.ToEnumerable() - same as above
  9. ob.MostRecent().Last() same as above

It seems there's not much resources around that people can actually do this. The closest I can find is this: "Rx: operator for getting first and most recent value from an Observable stream", but it is not a synchronous call after all (still using a subscription) so it doesn't work for me.

Does any body know if this is actually doable?

Upvotes: 3

Views: 3021

Answers (4)

Leon Zhou
Leon Zhou

Reputation: 635

Just to clarify this a bit, and thanks for @LeeCampbell's answer.

What was not working:

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    ob.First().Dump();
    // This gives you 500.
    // Because this is the first time any one subscribes to the observable,
    // so it starts right here and gives you the initial value.
});

What would actually work:

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
ob.Subscribe(); // Subscribe to start the above hot observable immediately.
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    ob.First().Dump(); 
    // This would give you either 3 or 4, depending on the speed and timing of your computer.
});

Upvotes: 0

Lee Campbell
Lee Campbell

Reputation: 10783

To point out why your code probably isn't working as you expect it to

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
//Note at this point `ob` has never been subscribed to,
// so the Reference-count is 0 i.e. has not be connected.

Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    // Try to get latest value from "ob" here.

    //Here we make our first subscription to the `ob` sequence.
    //  This will connect the sequence (invoke subscribe)
    //   which will
    //      1) invoke StartWith
    //      2) invoke onNext(500)
    //      3) invoke First()
    //      4) First() will then unsubscribe() as it has the single value it needs
    //      5) The refCount will now return to 0
    //      6) The sequence will be unsubscribed to.
    ob.First().Dump();  

    //Any future calls like `ob.First()` will thus always get the value 500.
});

Potentially what you want is

var ob = Observable.Interval(TimeSpan.FromSeconds(1))
    .Publish(500);
var connection = ob.Connect();
//Note at this point `ob` has never been subscribed to, so the ReferenceCount is 0 i.e. has not be connected.

var subscription = Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
    // Try to get latest value from "ob" here.
    ob.First().Dump();
});

//Sometime later
subscription.Dispose();
connection.Dispose()

HOWEVER, You really don't want to be mixing Synchronous calls with Rx. You also generally don't want to be subscribing within a subscription (as .First() is a subscription). What you probably mean to be doing is getting the latest value, and stashing it somewhere. Using .First() is just a slippery slope. You probably would be better writing something like

var subscription = Observable.Timer(TimeSpan.FromSeconds(5))
    .SelectMany(_=>ob.Take(1))
    .Subscribe(x =>
    {
        //Do something with X here.
        x.Dump();
    }); 

Upvotes: 3

Enigmativity
Enigmativity

Reputation: 117175

You need to do something like this:

var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500);

var latestAndThenTheRest =
    Observable
        .Create<long>(o =>
        {
            var bs = new BehaviorSubject<long>(1);
            var s1 = ob.Subscribe(bs);
            var s2 = bs.Subscribe(o);
            return new CompositeDisposable(s1, s2);
        });

The only thing that you need to consider here is that ob must be a hot observable for this to even make sense. If it were cold then every subscriber would get a brand new subscription to the start of the ob sequence.

Upvotes: 0

KeyboardDrummer
KeyboardDrummer

Reputation: 587

I'm not sure if this answer helps you, but have you looked into BehaviorSubject? It's an IObservable that remembers its latest value. It's a bit like a combination of a regular variable and an observable in one.

Otherwise, why don't you subscribe to 'ob' and store the latest value in a variable yourself?

Upvotes: -1

Related Questions