Reputation: 6629
We have a class Thing
that implements IObservable<Thing>
. In another class, there is a collection of Thing
s , and that class needs to react to updates from all those observables in a unified manner. The obvious way to do that is Observable.Merge()
, and that generally works; however, when the collection changes, we also need to subscribe to any new Thing
s in our merged subscription (and in theory unsubscribe from all the removed ones, but that seems less problematic - they just won't produce any updates anymore).
We currently achieve that by recreating the subscription on every change of the collection, but that seems rather suboptimal in terms of processing overhead and also due to missing updates from any of the Thing
s in the brief time between discarding the old subscription and creating the new one (which has proven to be an issue in practice, especially as we also need to Buffer()
the subscription for a short amount of time, and the buffered items are lost when disposing the subscription).
What is the proper way of merging a changing collection of observables like this?
Upvotes: 2
Views: 237
Reputation: 14350
If you have an IObservable<IObservable<T>> observable
, then calling Merge
on that, will include children of new parents, if you catch my drift. The trick is converting the ObservableCollection<IObservable<Thing>>
to an IObservable<IObservable<Thing>>
.
If you have ReactiveUI running around, and are ok to use it, then you could convert the ObservableCollection<IObservable<Thing>>
to a ReactiveCollection<IObservable<Thing>>
. ReactiveCollection
inherits from ObservableCollection
, and also implements IObservable
.
If ReactiveUI is out of the question (which I'm guessing it is because you're already using a Caliburn Micro collection), then you can convert using ObservableCollection
's events:
ObservableCollection<IObservable<Thing>> observableCollection = new ObservableCollection<IObservable<Thing>>();
IObservable<IObservable<Thing>> oCollectionObservable = Observable.FromEventPattern<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
h => observableCollection.CollectionChanged += h,
h => observableCollection.CollectionChanged -= h
)
.SelectMany(ep => ep.EventArgs.NewItems.Cast<IObservable<Thing>>());
Here's some sample code demonstrating use:
oCollectionObservable
.Merge()
.Subscribe(t => Console.WriteLine($"Received Thing {{Id = {t.Id}}}"));
var firstObservable = Observable.Range(1, 5)
.Select(i => new Thing { Id = i })
.Concat(
Observable.Range(8, 5)
.Select(i => new Thing { Id = i })
.Delay(TimeSpan.FromSeconds(2))
);
observableCollection.Add(firstObservable);
var subject = new Subject<Thing>();
observableCollection.Add(subject);
subject.OnNext(new Thing { Id = 6 });
subject.OnNext(new Thing { Id = 7 });
Using the following class:
public class Thing
{
public int Id { get; set; }
}
Upvotes: 2