shtse8
shtse8

Reputation: 1365

C# Rx DistinctUntilChanged() for ANY result including List

I am wrapping some API calls to pretend a socket result - looping the call with few seconds interval and emit the result when changed.

It works well when using DistinctUntilChanged() operator. However, when the result is a list, the operator always emit the result because it is distinct with the default comparer.

This is my custom observable for repeating a task with some delay no matter success or failed.

public class TaskRepeatObservable<T> : IObservable<T>
{
    private readonly Func<Task<T>> _taskFactory;
    private readonly TimeSpan _repeatDelayTimeSpan;
    private readonly ILogger _logger;
    private Func<Exception, bool> _onError;
    public TaskRepeatObservable(Func<Task<T>> taskFactory, TimeSpan repeatDelayTimeSpan = default(TimeSpan), Func<Exception, bool> onError = null)
    {
        _logger = new Log4NetLogger(GetType());
        _logger.IsEnabled = false;

        _taskFactory = taskFactory;
        _repeatDelayTimeSpan = repeatDelayTimeSpan;
        _onError = onError;
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        var tokenSource = new CancellationTokenSource();
        var cancellationToken = tokenSource.Token;
        Task.Factory.StartNew(async () =>
        {
            try
            {
                while (!cancellationToken.IsCancellationRequested)
                {
                    try
                    {
                        var result = await _taskFactory();
                        observer.OnNext(result);
                    }
                    catch (Exception e)
                    {
                        _logger.Error(e, "Observable Error: " + e.Message);
                        if (_onError != null && !_onError.Invoke(e))
                            throw;
                    }
                    finally
                    {
                        try
                        {
                            if (_repeatDelayTimeSpan > TimeSpan.Zero)
                                await Task.Delay(_repeatDelayTimeSpan, cancellationToken);
                        }
                        catch (TaskCanceledException)
                        {
                            // ignored
                        }
                    }
                }
            }
            catch (Exception e)
            {
                observer.OnError(e);
            }

            _logger.Debug("Observable is cancelled.");
        }, cancellationToken, TaskCreationOptions.LongRunning, TaskScheduler.Default);

        return Disposable.Create(() =>
        {
            tokenSource.Cancel();
        });
    }
}

This is the extension to wrap the api calls.

public static class ObservableBuilder
{
    ///<summary>
    ///<para>Convert Task to Observable and emit only changed result, it's useful to wrap the api as socket-like.</para>
    ///</summary>
    public static IObservable<T> Repeat<T>(this Func<Task<T>> taskFactory, TimeSpan delay = default(TimeSpan),
        Func<Exception, bool> onError = null)
    {
        return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged();
    }

My question is - how to make the DistinctUntilChanged() working for ANY result including List or Enumerable.

Noted that I have tried to implement my own comparer. But I still don't know how to check the type of T to select the proper comparer for DistinctUntilChanged()

public class IEnumerableComparer<T> : IEqualityComparer<IEnumerable<T>>
{
    public bool Equals(IEnumerable<T> x, IEnumerable<T> y)
    {
        return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
    }

    public int GetHashCode(IEnumerable<T> obj)
    {
        // Will not throw an OverflowException
        unchecked
        {
            return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
        }
    }
}

Here is the simple testing code:

        ObservableBuilder.Repeat(async () =>
        {
            var i = new List<int>() { 1, 2, 3, 4 };
            return i;
        }, TimeSpan.FromSeconds(1)).ToHotObservable().Subscribe(x => Logger.Info($"Result = {x}"));

I expect the result only be emitted once for list.

Upvotes: 2

Views: 2376

Answers (2)

shtse8
shtse8

Reputation: 1365

Thanks to the answer by Evk. Instead of using reflection, I have tried to implement a more general comparer. But I don't know if this is any drawback using this comparer. So I just give it as a choice for anyone facing the same issue.

public class DeepComparer<T> : IEqualityComparer<T>
{
    public bool Equals(T x, T y)
    {
        // Compare the Reference
        return ReferenceEquals(x, y) ||
               // Using Default Comparer to comparer the value
               EqualityComparer<T>.Default.Equals(x, y) ||
               // If they both are list, Compare using Sequenece Equal
               x is IEnumerable enumerableX &&
               y is IEnumerable enumerableY &&
               enumerableX.Cast<object>().SequenceEqual(enumerableY.Cast<object>());
    }

    public int GetHashCode(T obj)
    {
        unchecked
        {
            return obj is IEnumerable enumerable
                ? enumerable.Cast<object>()
                    .Select(e => e.GetHashCode())
                    .Aggregate(17, (a, b) => 23 * a + b)
                : obj.GetHashCode();
        }
    }
}

Upvotes: 2

Evk
Evk

Reputation: 101453

If you want to have just one Repeat function which convers all cases but works differently for IEnumerables - you have to use some reflection. You need to check if type T implements IEnumerable<Something> and if so - use special comparer for DistinctUtilChanged - otherwise use default comparer.

First it won't hurt to modify signature of your EnumerableComparer a bit, because you will need specifically IEqualityComparer<T>, not IEqualityComparer<IEnumerable<T>>:

private class EnumerableComparer<T, TItem> : IEqualityComparer<T> where T : IEnumerable<TItem> {
    public bool Equals(T x, T y) {
        return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
    }

    public int GetHashCode(T obj) {
        // Will not throw an OverflowException
        unchecked {
            return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
        }
    }
}

Now we need to check if T is IEnumerable and create instance of this comparer via reflection:

public static class ObservableBuilder {
    public static IObservable<T> Repeat<T>(this Func<Task<T>> taskFactory, TimeSpan delay = default(TimeSpan),
        Func<Exception, bool> onError = null) {
        var ienum = typeof(T).GetInterfaces().FirstOrDefault(c => c.IsGenericType && c.GetGenericTypeDefinition() == typeof(IEnumerable<>));
        if (ienum != null) {
            // implements IEnumerable - create instance of comparer and use
            var comparer = (IEqualityComparer<T>) Activator.CreateInstance(typeof(EnumerableComparer<,>).MakeGenericType(typeof(T), ienum.GenericTypeArguments[0]));
            return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged(comparer);
        }
        // otherwise - don't use
        return new TaskRepeatObservable<T>(taskFactory, delay, onError).DistinctUntilChanged();
    }

    private class EnumerableComparer<T, TItem> : IEqualityComparer<T> where T : IEnumerable<TItem> {
        public bool Equals(T x, T y) {
            return ReferenceEquals(x, y) || x != null && y != null && x.SequenceEqual(y);
        }

        public int GetHashCode(T obj) {
            // Will not throw an OverflowException
            unchecked {
                return obj.Where(e => e != null).Select(e => e.GetHashCode()).Aggregate(17, (a, b) => 23 * a + b);
            }
        }
    }
}  

Alternatively, you can create IEqualityComparer<T> which will check every time if x and y implement IEnumerable<Something> and compare accordingly, but I'd expect it to be less efficient to do that on every comparision compared to just once when building sequence.

Upvotes: 3

Related Questions