Reputation: 12533
Example code :
public static IObservable<Order> ObserveOrders(this IProxy proxy,IEqualityComparer<Order> distinctPerDayComparer )
{
return Observable.FromEvent<Order>(ev => proxy.OrderUpdated += ev,ev => proxy.OrderUpdated -= ev)
.Distinct(distinctPerDayComparer);
}
public class DistinctPerDayComparer : IEqualityComparer<Order>
{
public bool Equals(Order o1, Order o2)
{
if(o1.Id != o2.Id)
return false;
bool isSameDay = o1.Date.Day == o2.Date.Day;
return isSameDay;
}
public int GetHashCode(Order o)
{
return o.Id.GetHashCode();
}
}
public class Order
{
public int Id { get; set; }
public DateTime Date { get; set; }
}
Scenario :
Sequence :
{id:1,D:'5/25/2016'}-{id:1,D:'5/25/2016'}-{id:2,D:'5/25/2016'}-{id:1 ,D:'5/26/2016'}
Distinct Sequence :
{id:1,D:'5/25/2016'}-{id:2,D:'5/25/2016'}-{id:1,D:'5/26/2016'}
Now assume that this sequence is long running, in fact onComplete is never called.
How does Rx manage it so it does not hold all the distinct elements in memory to compare with ?
I'm guessing it holds some back storage for elements in it's pipeline. but i always figured that after onNext is called with the next item that item is simply disposed.
Still if it's disposed what elements does Rx use for the EqualityComparer when calling the Distinct operator ?
Upvotes: 2
Views: 360
Reputation: 3259
If you look at the Rx source code you will find that distinct is using a hashset and storing the values in there. Your assumption that item is simply disposed isn't correct.
If your order objects are heavy you can use the keyselector and RX will just store that value in the hashset.
.Distinct(o => Tuple.Create(o.id, o.Date), distinctPerDayComparer);
then distinctPerDayComparer will need to be changed
public class DistinctPerDayComparer : IEqualityComparer<Tuple<int, DateTime>>
{
public bool Equals(Tuple<int, DateTime> o1, Tuple<int, DateTime> o2)
{
if(o1.Item1 != o2.Item1)
return false;
bool isSameDay = o1.Item2.Day == o2.Item2.Day;
return isSameDay;
}
public int GetHashCode(Tuple<int, DateTime> o)
{
return o.Item1.GetHashCode();
}
}
didn't test the code but should be a starting place. Will now store Tuples until the sequence is complete instead of your Order objects.
Otherwise you could use the Window function to group them and clean them up on a schedule but then it's not truly distinct for the entire observable sequence.
Upvotes: 2