Reputation: 195
I'd like reduce the number of items that go into a log file:
I don't need all the duplicate items, but I want to keep the last item before the change. The items are numbers, and are going to be plotted. I need the last item before the change to keep the shape of the waveform, otherwise it will end up as a triangular wave instead of square wave.
Upvotes: 1
Views: 566
Reputation: 117064
Here's a query that does what you want:
IObservable<string> query =
subject
.Publish(ss =>
Observable
.Concat(
ss.Take(1),
ss.
DistinctUntilChanged()
.Publish(dss => dss.Zip(dss.Skip(1), (m, n) => (m, n)))
.SelectMany(z => new [] { z.m, z.n })));
The Publish
operators ensure that there's only one subscription to the original source.
With this test:
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnCompleted();
I get these values:
A
A
B
B
A
Upvotes: 1
Reputation: 1993
My approach would be similar to @Olivers comment above: 'So whenever current and last differ, emit both'.
private static IObservable<T> NullableDistinctWithChange<T>(IObservable<T> source)
where T : struct
{
return source
.Scan(
(Value: default(T?), Result: Array.Empty<T>()),
(last, current) => last.Value switch
{
null => (Value: current, Result: new T[] { current }),
T x when x.Equals(current) => (Value: current, Result: Array.Empty<T>()),
T x => (Value: current, Result: new[] { x, current })
})
.SelectMany(tuple => tuple.Result);
}
private static IObservable<T> DistinctWithChange<T>(IObservable<T> source)
where T : class
{
return source
.Scan(
(Value: default(T), Result: Array.Empty<T>()),
(last, current) => last.Value switch
{
null => (Value: current, Result: new T[] { current }),
T x when x.Equals(current) => (Value: current, Result: Array.Empty<T>()),
T x => (Value: current, Result: new[] { x, current })
})
.SelectMany(tuple => tuple.Result);
}
private static readonly Recorded<Notification<string>>[] Xs = new Recorded<Notification<string>>[]
{
new Recorded<Notification<string>>(TimeSpan.FromSeconds(5).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(15).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(65).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(95).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(155).Ticks, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(185).Ticks, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks, Notification.CreateOnNext("C")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(215).Ticks, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(225).Ticks, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(235).Ticks, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(250).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(255).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(260).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(265).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(270).Ticks, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(280).Ticks, Notification.CreateOnCompleted<string>())
};
private static readonly Recorded<Notification<string>>[] Expected = new Recorded<Notification<string>>[]
{
new Recorded<Notification<string>>(TimeSpan.FromSeconds(5).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(125).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(195).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("C")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("C")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(205).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("B")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(245).Ticks + ReactiveTest.Subscribed, Notification.CreateOnNext("A")),
new Recorded<Notification<string>>(TimeSpan.FromSeconds(280).Ticks + ReactiveTest.Subscribed, Notification.CreateOnCompleted<string>())
};
[Test]
public void ShouldPerformDistinctWithChange()
{
var scheduler = new TestScheduler();
var xs = scheduler.CreateColdObservable(Xs);
var observed = scheduler.Start(() => DistinctWithChange(xs), TimeSpan.FromSeconds(300).Ticks);
Assert.That(observed.Messages, Is.EqualTo(Expected));
}
I included a NullableDistinctWithChange version as the OP suggested you wanted this solution to work with numbers.
Both functions could be unified and improved with an Option<T>
type but I didn't want to overcomplicate the answer.
Furthermore it would be trivial to prevent single-item duplicates (by introducing a count value into the Scan tuple), and to use an IEqualityComparer to delegate the predication of change.
Upvotes: 1
Reputation: 195
I've found a solution that meets the (rather limited) requirements:
var subject = new Subject<string>();
var distinct = subject.DistinctUntilChanged();
var combinedLatesDistinct = Observable.CombineLatest(distinct, subject, Selector).DistinctUntilChanged();
Observable.Merge(distinct, combinedLatesDistinct).Subscribe(i => Console.WriteLine($"Result: {i}"));
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("C");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("B");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnNext("A");
subject.OnCompleted();
Console.ReadLine();
The result is:
Result: A
Result: A
Result: B
Result: B
Result: C
Result: C
Result: B
Result: B
Result: A
Result: A
Items occurring only once will be duplicated, but this is OK for my use.
Upvotes: 0