Reputation: 635
take the following as an example:
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
What I'm trying to achieve here is to obtain the value of the latest item in the sequence at any given time "synchronously". Which means extensions like FirstAsync
can't make it up for me.
The StartWith
and Replay
bit ensures that there will always be a value, and the RefCount
bit is necessary in my actual code to detect when I can do some disposal actions.
So to simulate this "any given time" part, let's try getting the latest value after 5 seconds:
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
// Try to get latest value from "ob" here.
});
So with a 5 second delay, I need to get the value 5
out of the sequence and these are what I have tried so far with no success:
ob.First()
- returns 500ob.Latest().Take(1)
- same as aboveob.MostRecent(-1).First()
- same as aboveob.MostRecent(-1)
- gives me an IEnumerable<long>
full of "500"ob.Last()
- never returns because it's waiting for the sequence to complete which it never willob.Latest().Last()
- same as aboveob.ToTask().Result
- same as aboveob.ToEnumerable()
- same as aboveob.MostRecent().Last()
same as aboveIt seems there's not much resources around that people can actually do this. The closest I can find is this: "Rx: operator for getting first and most recent value from an Observable stream", but it is not a synchronous call after all (still using a subscription) so it doesn't work for me.
Does any body know if this is actually doable?
Upvotes: 3
Views: 3021
Reputation: 635
Just to clarify this a bit, and thanks for @LeeCampbell's answer.
What was not working:
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
ob.First().Dump();
// This gives you 500.
// Because this is the first time any one subscribes to the observable,
// so it starts right here and gives you the initial value.
});
What would actually work:
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
ob.Subscribe(); // Subscribe to start the above hot observable immediately.
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
ob.First().Dump();
// This would give you either 3 or 4, depending on the speed and timing of your computer.
});
Upvotes: 0
Reputation: 10783
To point out why your code probably isn't working as you expect it to
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500).Replay(1).RefCount();
//Note at this point `ob` has never been subscribed to,
// so the Reference-count is 0 i.e. has not be connected.
Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
// Try to get latest value from "ob" here.
//Here we make our first subscription to the `ob` sequence.
// This will connect the sequence (invoke subscribe)
// which will
// 1) invoke StartWith
// 2) invoke onNext(500)
// 3) invoke First()
// 4) First() will then unsubscribe() as it has the single value it needs
// 5) The refCount will now return to 0
// 6) The sequence will be unsubscribed to.
ob.First().Dump();
//Any future calls like `ob.First()` will thus always get the value 500.
});
Potentially what you want is
var ob = Observable.Interval(TimeSpan.FromSeconds(1))
.Publish(500);
var connection = ob.Connect();
//Note at this point `ob` has never been subscribed to, so the ReferenceCount is 0 i.e. has not be connected.
var subscription = Observable.Timer(TimeSpan.FromSeconds(5)).Subscribe(x =>
{
// Try to get latest value from "ob" here.
ob.First().Dump();
});
//Sometime later
subscription.Dispose();
connection.Dispose()
HOWEVER, You really don't want to be mixing Synchronous calls with Rx. You also generally don't want to be subscribing within a subscription (as .First()
is a subscription). What you probably mean to be doing is getting the latest value, and stashing it somewhere. Using .First()
is just a slippery slope. You probably would be better writing something like
var subscription = Observable.Timer(TimeSpan.FromSeconds(5))
.SelectMany(_=>ob.Take(1))
.Subscribe(x =>
{
//Do something with X here.
x.Dump();
});
Upvotes: 3
Reputation: 117175
You need to do something like this:
var ob = Observable.Interval(TimeSpan.FromSeconds(1)).StartWith(500);
var latestAndThenTheRest =
Observable
.Create<long>(o =>
{
var bs = new BehaviorSubject<long>(1);
var s1 = ob.Subscribe(bs);
var s2 = bs.Subscribe(o);
return new CompositeDisposable(s1, s2);
});
The only thing that you need to consider here is that ob
must be a hot observable for this to even make sense. If it were cold then every subscriber would get a brand new subscription to the start of the ob
sequence.
Upvotes: 0
Reputation: 587
I'm not sure if this answer helps you, but have you looked into BehaviorSubject? It's an IObservable that remembers its latest value. It's a bit like a combination of a regular variable and an observable in one.
Otherwise, why don't you subscribe to 'ob' and store the latest value in a variable yourself?
Upvotes: -1