HenningNT
HenningNT

Reputation: 195

DistinctUntilChanged, but keep the last item before the change

I'd like reduce the number of items that go into a log file:

enter image description here

I don't need all the duplicate items, but I want to keep the last item before the change. The items are numbers, and are going to be plotted. I need the last item before the change to keep the shape of the waveform, otherwise it will end up as a triangular wave instead of square wave.

Upvotes: 1

Views: 566

Answers (3)

Enigmativity
Enigmativity

Reputation: 117064

Here's a query that does what you want:

IObservable<string> query =
    subject
        .Publish(ss =>
            Observable
                .Concat(
                    ss.Take(1),
                    ss.
                        DistinctUntilChanged()
                            .Publish(dss => dss.Zip(dss.Skip(1), (m, n) => (m, n)))
                            .SelectMany(z => new [] { z.m, z.n })));

The Publish operators ensure that there's only one subscription to the original source.

With this test:

subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnCompleted();

I get these values:

A 
A 
B 
B 
A 

Upvotes: 1

ibebbs
ibebbs

Reputation: 1993

My approach would be similar to @Olivers comment above: 'So whenever current and last differ, emit both'.

private static IObservable<T> NullableDistinctWithChange<T>(IObservable<T> source)
    where T : struct
{
    return source
        .Scan(
            (Value: default(T?), Result: Array.Empty<T>()),
            (last, current) => last.Value switch
            {
                null => (Value: current, Result: new T[] { current }),
                T x when x.Equals(current) => (Value: current, Result: Array.Empty<T>()),
                T x => (Value: current, Result: new[] { x, current })
            })
        .SelectMany(tuple => tuple.Result);
}

private static IObservable<T> DistinctWithChange<T>(IObservable<T> source)
    where T : class
{
    return source
        .Scan(
            (Value: default(T), Result: Array.Empty<T>()),
            (last, current) => last.Value switch
            {
                null => (Value: current, Result: new T[] { current }),
                T x when x.Equals(current) => (Value: current, Result: Array.Empty<T>()),
                T x => (Value: current, Result: new[] { x, current })
            })
        .SelectMany(tuple => tuple.Result);
}


private static readonly Recorded<Notification<string>>[] Xs = new Recorded<Notification<string>>[]
{
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(5).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(15).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(65).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(95).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(155).Ticks, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(185).Ticks, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks, Notification.CreateOnNext("C")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(215).Ticks, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(225).Ticks, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(235).Ticks, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(250).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(255).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(260).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(265).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(270).Ticks, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(280).Ticks, Notification.CreateOnCompleted<string>())
};

private static readonly Recorded<Notification<string>>[] Expected = new Recorded<Notification<string>>[]
{
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(5).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("C")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("C")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("A")),
    new Recorded<Notification<string>>(TimeSpan.FromSeconds(280).Ticks + ReactiveTest.Subscribed, Notification.CreateOnCompleted<string>())
};

[Test]
public void ShouldPerformDistinctWithChange()
{
    var scheduler = new TestScheduler();

    var xs = scheduler.CreateColdObservable(Xs);

    var observed = scheduler.Start(() => DistinctWithChange(xs), TimeSpan.FromSeconds(300).Ticks);

    Assert.That(observed.Messages, Is.EqualTo(Expected));
}

I included a NullableDistinctWithChange version as the OP suggested you wanted this solution to work with numbers.

Both functions could be unified and improved with an Option<T> type but I didn't want to overcomplicate the answer.

Furthermore it would be trivial to prevent single-item duplicates (by introducing a count value into the Scan tuple), and to use an IEqualityComparer to delegate the predication of change.

Upvotes: 1

HenningNT
HenningNT

Reputation: 195

I've found a solution that meets the (rather limited) requirements:

    var subject = new Subject<string>();
    var distinct = subject.DistinctUntilChanged();

    var combinedLatesDistinct = Observable.CombineLatest(distinct, subject, Selector).DistinctUntilChanged();

    Observable.Merge(distinct, combinedLatesDistinct).Subscribe(i => Console.WriteLine($"Result: {i}"));

    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("C");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("B");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnNext("A");
    subject.OnCompleted();

    Console.ReadLine();

The result is:

Result: A
Result: A
Result: B
Result: B
Result: C
Result: C
Result: B
Result: B
Result: A
Result: A

Items occurring only once will be duplicated, but this is OK for my use.

Upvotes: 0

Related Questions