Reputation: 2148
Suppose I have a service:
public interface ICustomersService
{
IObservable<ICustomer> Customers
{
get;
}
}
The implementation of the Customers
property starts by grabbing all existing customers and passing them onto the observer, after which it only passes on customers that are added to the system later. Thus, it never completes.
Now suppose I wanted to grab a snapshot (as a List<ICustomer>
) of the current customers, ignoring any that may be added in future. How do I do that? Any invocation of ToList()
or its kin will block forever because the sequence never completes.
I figured I could write my own extension, so I tried this:
public static class RxExtensions
{
public static List<T> ToSnapshot<T>(this IObservable<T> @this)
{
var list = new List<T>();
using (@this.Subscribe(x => list.Add(x)));
return list;
}
}
This appears to work. For example:
var customers = new ReplaySubject<string>();
// snapshot has nothing in it
var snapshot1 = customers.ToSnapshot();
customers.OnNext("A");
customers.OnNext("B");
// snapshot has just the two customers in it
var snapshot2 = customers.ToSnapshot();
customers.OnNext("C");
// snapshot has three customers in it
var snapshot3 = customers.ToSnapshot();
I realize the current implementation depends on the scheduler being the current thread, otherwise ToSnapshot
will likely close its subscription before items are received. However, I suspect I could also include a ToSnapshot
override that takes an IScheduler
and ensures any items scheduled there are received prior to ending the snapshot.
I can't find this sort of snapshot functionality built into Rx. Am I missing something?
Upvotes: 4
Views: 662
Reputation: 29776
There are several ways to approach this. I have tried the following with success in commercial projects:
1) A separate method to get an enumerable of current customers as Chris demonstrated.
2) A method to combine a "state of the world" call with a live stream - this was somewhat more involved than Chris's example because in order to ensure no missed data one typically has to start listening to the live stream first, then get the snapshot, then combine the two with de-duping.
I achieved this with a custom Observable.Create
implementation that cached the live stream until the history was retrieved and then merged the cache with the history before switching to live.
This returned Customers but wrapped with additional metadata that described the age of the data.
3) Most recently, it's been more useful to me to return IObservable<IEnumerable<Customer>>
where the first event is the entire state of the world. The reason this has been more useful is that many systems I work on get updates in batches, and it's often faster to update a UI with an entire batch than item by item. It is otherwise similar to (2) except you can just use a FirstAsync()
to get the snapshot you need.
I propose you consider this approach. You can always use a SelectMany(x => x)
to flatten a stream of IObservable<IEnumerable<Customer>>
to an IObservable<Customer>
if you need to.
I'll see if I can dig out an example implementation when I get back to the home office!
Upvotes: 1
Reputation: 32162
You could try using a timeout on your observable
source.Customers().TakeUntil(DateTime.Now).ToEnumerable();
Upvotes: 1
Reputation: 18125
What you've done here is actually pretty nifty. The reason ToSnapshot
works is because the underlying implementation of your subscribe logic is yielding all of the customers to the observer before releasing control flow. Basically, Dispose
is called only after the control flow is released, and the control flow is only released after you've yielded all pre-existing contacts.
While this is cool, it's also a misleading. The method you've written, ToSnapshot
, should really be named something like TakeSyncronousNotifications
. The extension is making heavy assumptions about how the underlying observable works, and isn't really in the spirit of Rx.
To make things easier to understand for the consumer, I would expose additional properties which explicitly state what is being returned.
public interface ICustomersService
{
IEnumerable<ICustomer> ExistingCustomers { get; }
IObservable<ICustomer> NewCustomers { get; }
IObservable<ICustomer> Customers { get; }
}
public class CustomerService : ICustomerService
{
public IEnumerable<ICustomer> ExistingCustomers { get { ... } }
public IObservable<ICustomer> NewCustomers { get { ... } }
public IObservable<ICustomer> Customers
{
get
{
return this.ExistingCustomers.ToObservable().Concat(this.NewCustomers);
}
}
}
Edit:
Consider the following problem...
50 = x + y
. solve for and evaluate x
.
The math just doesn't work unless you know what y
is. In this example, y
is the "new customers", x
is the "existing customers", and 50
is the combination of the two.
By exposing only a combination of the existing and new customers, and not the existing and new customers themselves, you've lost too much data. You need to expose at least x
or y
to the consumer, otherwise there's no way to solve for the other.
Upvotes: 0