me--
me--

Reputation: 2148

Taking a snapshot of an IObservable<T>

Suppose I have a service:

public interface ICustomersService
{
    IObservable<ICustomer> Customers
    {
        get;
    }
}

The implementation of the Customers property starts by grabbing all existing customers and passing them onto the observer, after which it only passes on customers that are added to the system later. Thus, it never completes.

Now suppose I wanted to grab a snapshot (as a List<ICustomer>) of the current customers, ignoring any that may be added in future. How do I do that? Any invocation of ToList() or its kin will block forever because the sequence never completes.

I figured I could write my own extension, so I tried this:

public static class RxExtensions
{
    public static List<T> ToSnapshot<T>(this IObservable<T> @this)
    {
        var list = new List<T>();

        using (@this.Subscribe(x => list.Add(x)));

        return list;
    }
}

This appears to work. For example:

var customers = new ReplaySubject<string>();

// snapshot has nothing in it
var snapshot1 = customers.ToSnapshot();

customers.OnNext("A");
customers.OnNext("B");

// snapshot has just the two customers in it
var snapshot2 = customers.ToSnapshot();

customers.OnNext("C");

// snapshot has three customers in it
var snapshot3 = customers.ToSnapshot();

I realize the current implementation depends on the scheduler being the current thread, otherwise ToSnapshot will likely close its subscription before items are received. However, I suspect I could also include a ToSnapshot override that takes an IScheduler and ensures any items scheduled there are received prior to ending the snapshot.

I can't find this sort of snapshot functionality built into Rx. Am I missing something?

Upvotes: 4

Views: 662

Answers (3)

James World
James World

Reputation: 29776

There are several ways to approach this. I have tried the following with success in commercial projects:

1) A separate method to get an enumerable of current customers as Chris demonstrated.

2) A method to combine a "state of the world" call with a live stream - this was somewhat more involved than Chris's example because in order to ensure no missed data one typically has to start listening to the live stream first, then get the snapshot, then combine the two with de-duping.

I achieved this with a custom Observable.Create implementation that cached the live stream until the history was retrieved and then merged the cache with the history before switching to live.

This returned Customers but wrapped with additional metadata that described the age of the data.

3) Most recently, it's been more useful to me to return IObservable<IEnumerable<Customer>> where the first event is the entire state of the world. The reason this has been more useful is that many systems I work on get updates in batches, and it's often faster to update a UI with an entire batch than item by item. It is otherwise similar to (2) except you can just use a FirstAsync() to get the snapshot you need.

I propose you consider this approach. You can always use a SelectMany(x => x) to flatten a stream of IObservable<IEnumerable<Customer>> to an IObservable<Customer> if you need to.

I'll see if I can dig out an example implementation when I get back to the home office!

Upvotes: 1

bradgonesurfing
bradgonesurfing

Reputation: 32162

You could try using a timeout on your observable

source.Customers().TakeUntil(DateTime.Now).ToEnumerable();

Upvotes: 1

cwharris
cwharris

Reputation: 18125

What you've done here is actually pretty nifty. The reason ToSnapshot works is because the underlying implementation of your subscribe logic is yielding all of the customers to the observer before releasing control flow. Basically, Dispose is called only after the control flow is released, and the control flow is only released after you've yielded all pre-existing contacts.

While this is cool, it's also a misleading. The method you've written, ToSnapshot, should really be named something like TakeSyncronousNotifications. The extension is making heavy assumptions about how the underlying observable works, and isn't really in the spirit of Rx.

To make things easier to understand for the consumer, I would expose additional properties which explicitly state what is being returned.

public interface ICustomersService
{
    IEnumerable<ICustomer> ExistingCustomers { get; }
    IObservable<ICustomer> NewCustomers { get; }
    IObservable<ICustomer> Customers { get; }
}

public class CustomerService : ICustomerService
{
    public IEnumerable<ICustomer> ExistingCustomers { get { ... } }
    public IObservable<ICustomer> NewCustomers { get { ... } }
    public IObservable<ICustomer> Customers
    {
        get
        {
           return this.ExistingCustomers.ToObservable().Concat(this.NewCustomers);
        }
    }
}

Edit:

Consider the following problem...

50 = x + y. solve for and evaluate x.

The math just doesn't work unless you know what y is. In this example, y is the "new customers", x is the "existing customers", and 50 is the combination of the two.

By exposing only a combination of the existing and new customers, and not the existing and new customers themselves, you've lost too much data. You need to expose at least x or y to the consumer, otherwise there's no way to solve for the other.

Upvotes: 0

Related Questions