Reputation: 1365
I am wrapping some API calls to pretend a socket result - looping the call with few seconds interval and emit the result when changed.
It works well when using DistinctUntilChanged()
operator. However, when the result is a list, the operator always emit the result because it is distinct with the default comparer.
This is my custom observable for repeating a task with some delay no matter success or failed.
public class TaskRepeatObservable<T> : IObservable<T>
{
private readonly Func<Task<T>> _taskFactory;
private readonly TimeSpan _repeatDelayTimeSpan;
private readonly ILogger _logger;
private Func<Exception, bool> _onError;
public TaskRepeatObservable(Func<Task<T>> taskFactory, TimeSpan repeatDelayTimeSpan = default(TimeSpan), Func<Exception, bool> onError = null)
{
_logger = new Log4NetLogger(GetType());
_logger.IsEnabled = false;
_taskFactory = taskFactory;
_repeatDelayTimeSpan = repeatDelayTimeSpan;
_onError = onError;
}
public IDisposable Subscribe(IObserver<T> observer)
{
var tokenSource = new CancellationTokenSource();
var cancellationToken = tokenSource.Token;
Task.Factory.StartNew(async () =>
{
try
{
while (!cancellationToken.IsCancellationRequested)
{
try
{
var result = await _taskFactory();
observer.OnNext(result);
}
catch (Exception e)
{
_logger.Error(e, "Observable Error: " + e.Message);
if (_onError != null && !_onError.Invoke(e))
throw;
}
finally
{
try
{
if (_repeatDelayTimeSpan > TimeSpan.Zero)
await Task.Delay(_repeatDelayTimeSpan, cancellationToken);
}
catch (TaskCanceledException)
{
// ignored
}
}
}
}
catch (Exception e)
{
observer.OnError(e);
}
_logger.Debug("Observable is cancelled.");
}, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);
return Disposable.Create(() =>
{
tokenSource.Cancel();
});
}
}
This is the extension to wrap the api calls.
public static class ObservableBuilder
{
///<summary>
///<para>Convert Task to Observable and emit only changed result, it's useful to wrap the api as socket-like.</para>
///</summary>
public static IObservable<T> Repeat<T>(this Func<Task<T>> taskFactory, TimeSpan delay = default(TimeSpan),
Func<Exception, bool> onError = null)
{
return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged();
}
My question is - how to make the DistinctUntilChanged()
working for ANY result including List
or Enumerable
.
Noted that I have tried to implement my own comparer. But I still don't know how to check the type of T
to select the proper comparer for DistinctUntilChanged()
public class IEnumerableComparer<T> : IEqualityComparer<IEnumerable<T>>
{
public bool Equals(IEnumerable<T> x, IEnumerable<T> y)
{
return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
}
public int GetHashCode(IEnumerable<T> obj)
{
// Will not throw an OverflowException
unchecked
{
return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
}
}
}
Here is the simple testing code:
ObservableBuilder.Repeat(async () =>
{
var i = new List<int>() { 1, 2, 3, 4 };
return i;
}, TimeSpan.FromSeconds(1)).ToHotObservable().Subscribe(x => Logger.Info($"Result = {x}"));
I expect the result only be emitted once for list.
Upvotes: 2
Views: 2376
Reputation: 1365
Thanks to the answer by Evk. Instead of using reflection, I have tried to implement a more general comparer. But I don't know if this is any drawback using this comparer. So I just give it as a choice for anyone facing the same issue.
public class DeepComparer<T> : IEqualityComparer<T>
{
public bool Equals(T x, T y)
{
// Compare the Reference
return ReferenceEquals(x, y) ||
// Using Default Comparer to comparer the value
EqualityComparer<T>.Default.Equals(x, y) ||
// If they both are list, Compare using Sequenece Equal
x is IEnumerable enumerableX &&
y is IEnumerable enumerableY &&
enumerableX.Cast<object>().SequenceEqual(enumerableY.Cast<object>());
}
public int GetHashCode(T obj)
{
unchecked
{
return obj is IEnumerable enumerable
? enumerable.Cast<object>()
.Select(e => e.GetHashCode())
.Aggregate(17, (a, b) => 23 * a + b)
: obj.GetHashCode();
}
}
}
Upvotes: 2
Reputation: 101453
If you want to have just one Repeat
function which convers all cases but works differently for IEnumerable
s - you have to use some reflection. You need to check if type T
implements IEnumerable<Something>
and if so - use special comparer for DistinctUtilChanged
- otherwise use default comparer.
First it won't hurt to modify signature of your EnumerableComparer
a bit, because you will need specifically IEqualityComparer<T>
, not IEqualityComparer<IEnumerable<T>>
:
private class EnumerableComparer<T, TItem> : IEqualityComparer<T> where T : IEnumerable<TItem> {
public bool Equals(T x, T y) {
return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
}
public int GetHashCode(T obj) {
// Will not throw an OverflowException
unchecked {
return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
}
}
}
Now we need to check if T
is IEnumerable
and create instance of this comparer via reflection:
public static class ObservableBuilder {
public static IObservable<T> Repeat<T>(this Func<Task<T>> taskFactory, TimeSpan delay = default(TimeSpan),
Func<Exception, bool> onError = null) {
var ienum = typeof(T).GetInterfaces().FirstOrDefault(c => c.IsGenericType && c.GetGenericTypeDefinition() == typeof(IEnumerable<>));
if (ienum != null) {
// implements IEnumerable - create instance of comparer and use
var comparer = (IEqualityComparer<T>) Activator.CreateInstance(typeof(EnumerableComparer<,>).MakeGenericType(typeof(T), ienum.GenericTypeArguments[0]));
return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged(comparer);
}
// otherwise - don't use
return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged();
}
private class EnumerableComparer<T, TItem> : IEqualityComparer<T> where T : IEnumerable<TItem> {
public bool Equals(T x, T y) {
return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
}
public int GetHashCode(T obj) {
// Will not throw an OverflowException
unchecked {
return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
}
}
}
}
Alternatively, you can create IEqualityComparer<T>
which will check every time if x
and y
implement IEnumerable<Something>
and compare accordingly, but I'd expect it to be less efficient to do that on every comparision compared to just once when building sequence.
Upvotes: 3