Reputation: 2307
I am fairly new to the concept of reactive programming. I am using Bonsai, which exposes some but not all .Net rx commands through c#.
I am trying to get a behavior like this marble diagram:
input1: ---1--------2--------3--------4--------5--------6--------7
input2: -------abc----------------------------------def-----------
result: ------------a--------b--------c--------c---------d-------e
Basically, input 2 generates waves of events that should be stored in a queue. Input 1 acts as a trigger to emit single items from this queue.
When the queue is empty, the last item of the queue should be emitted. I tried various combinations of zip and combineLatest but I cannot get the desired behavior.
I also tried an implementation of WithLatestFrom
based on this post, but I realize in retrospect this is also not going to produce the desired behavior.
public IObservable<Tuple<TSource, TOther>> Process<TSource, TOther>(
IObservable<TSource> source,
IObservable<TOther> other)
{
// return source1.WithLatestFrom(source2, (xs, ys) => Tuple.Create(xs, ys));
return source.Publish(os => other.Select(a => os.Select(b => Tuple.Create(b, a))).Switch());
}
Are there any operators or combinations of operators that will produce this behavior? I can do the implementation to Bonsai once I understand which operators to use.
UPDATE 1: 2018/05/18
Based on Sentinel's post, I wrote a new class DiscriminatedUnion
inside the Bonsai namespace. I didn't manage to specify the appropriate types though. The compiler states 'type arguments for Merge
cannot be inferred' (in .Merge(input1.Select...
).
Where do I add the correct type specification?
using System.Reactive.Linq;
using System.ComponentModel;
using System.Collections.Immutable;
namespace Bonsai.Reactive
{
[Combinator]
// [XmlType(Namespace = Constants.XmlNamespace)]
[Description("Implementation of Discriminated Union")]
public class DiscriminatedUnion
{
public IObservable<int?> Process<TInput1, TInput2>(
IObservable<TInput1> input1,
IObservable<TInput2> input2)
{
var merged =
input2.Select(s2 => Tuple.Create(2, (TInput2)s2))
.Merge(input1.Select(s1 => Tuple.Create(1, (TInput1)s1)))
.Scan(Tuple.Create((int?)null, new Queue<int>(), 0), (state, val) =>
{
int? next = state.Item1;
if (val.Item1 == 1)
{
if (state.Item2.Count > 0)
{
next = state.Item2.Dequeue();
}
}
else
{
state.Item2.Enqueue(val.Item2);
}
return Tuple.Create(next, state.Item2, val.Item1);
})
.Where(x => (x.Item1 != null && x.Item3 == 1))
.Select(x => x.Item1);
return merged;
}
}
}
Upvotes: 2
Views: 993
Reputation: 3697
I love these Rx puzzles. Can't believe one get paid to do this. So I came up with a slightly different approach. I think there are some weaknesses with race conditions here, but I would be curious what you think and how these can be eliminated.
The basic idea is to think of the queue as a recursive buffer-until over source1, where the buffer is replayed into the queue sans first element.
Based on shlomo's observation that publish().refcount() is needed, I updated the code and turned the solution into an extension "RegulatedQueue". Please see the below code. Input2 is the source to regulate via a queue, Input1 is the regulating signal.
public static class RxHelpers
{
public static IObservable<TInput2> RegulatedQueue<TInput1, TInput2>(this IObservable<TInput2> input2,
IObservable<TInput1> input1
)
{
return Observable.Using(() => new Subject<TInput2>(),
queue =>
{
input2.Subscribe(queue);
return queue
.Buffer(() => input1)
.Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
.Where(l => l.Count > 0)
.Select(l => l.First()).
Publish().
RefCount();
});
}
}
class Program
{
static void Main(string[] args)
{
Random r = new Random();
var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
var source2 = Observable.Interval(TimeSpan.FromMilliseconds(2000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();
source1.Subscribe(x => Console.WriteLine("Source1 " + x));
source2.Subscribe(x => Console.WriteLine("Source2 " + x));
var merged = source2.RegulatedQueue(source1);
merged.Subscribe(x => Console.WriteLine("Merged1 " + x));
merged.Subscribe(x => Console.WriteLine("Merged2 " + x));
Console.ReadKey();
}
}
static void Main(string[] args)
{
Random r = new Random();
var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
var source2 = Observable.Interval(TimeSpan.FromMilliseconds(7000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();
source1.Subscribe(x => Console.WriteLine("Source1 " + x));
source2.Subscribe(x => Console.WriteLine("Source2 " + x));
//THIS BIT
Subject<int> queue = new Subject<int>();
source2.Subscribe(queue);
var merged=queue
.Buffer(() => source1)
.Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
.Where(l=>l.Count > 0)
.Select(l => l.First());
merged.Subscribe(x => Console.WriteLine("Merged "+x));
Console.ReadKey();
}
Testcode:
var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<int>(
ReactiveTest.OnNext(1000.Ms(), 1),
ReactiveTest.OnNext(2000.Ms(), 2),
ReactiveTest.OnNext(3000.Ms(), 3),
ReactiveTest.OnNext(4000.Ms(), 4),
ReactiveTest.OnNext(5000.Ms(), 5),
ReactiveTest.OnNext(6000.Ms(), 6),
ReactiveTest.OnNext(7000.Ms(), 7)
);
var input2 = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(1400.Ms(), "a"),
ReactiveTest.OnNext(1500.Ms(), "b"),
ReactiveTest.OnNext(1600.Ms(), "c"),
ReactiveTest.OnNext(5500.Ms(), "d"),
ReactiveTest.OnNext(5600.Ms(), "e"),
ReactiveTest.OnNext(5700.Ms(), "f")
);
Subject<string> queue = new Subject<string>();
input2.Subscribe(queue);
var result = queue
.Buffer(() => input1)
.Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
.Where(l => l.Count > 0)
.Select(l => l[0]);
result.Timestamp(scheduler)
.Select(t => $"{t.Timestamp.Ticks} ticks: {t.Value}")
.Dump(); //Linqpad
expected output:
//14000000 enqueue a
//15000000 enqueue b
//16000000 enqueue c
20000000 ticks: a
30000000 ticks: b
40000000 ticks: c
50000000 ticks: c
//55000000 enqueue d
//56000000 enqueue e
//57000000 enqueue f
60000000 ticks: c //should really be d, but there's no handling for fake-empty ejection
70000000 ticks: d
80000000 ticks: e
90000000 ticks: f
100000000 ticks: f
110000000 ticks: f
120000000 ticks: f
130000000 ticks: f
140000000 ticks: f
...
actual output:
20000000 ticks: a
30000000 ticks: b
40000000 ticks: c
50000000 ticks: b
60000000 ticks: c
70000000 ticks: b
80000000 ticks: c
90000000 ticks: c
100000000 ticks: b
110000000 ticks: c
120000000 ticks: c
130000000 ticks: b
140000000 ticks: c
150000000 ticks: b
160000000 ticks: c
170000000 ticks: b
180000000 ticks: c
190000000 ticks: c
Upvotes: 1
Reputation: 3697
Would this do the trick? There is a probably a better way to do this buffers so it might be worth revisiting this.
Random r = new Random();
var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
var source2 = Observable.Interval(TimeSpan.FromMilliseconds(7000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();
source1.Subscribe(x => Console.WriteLine("Source1 " + x));
source2.Subscribe(x => Console.WriteLine("Source2 " + x));
var merged =
source2.Select(s2 => Tuple.Create(2, s2))
.Merge(source1.Select(s1 => Tuple.Create(1, (int)s1)))
.Scan(Tuple.Create((int?)null, new Queue<int>(),0), (state, val) =>
{
int? next = state.Item1;
if (val.Item1 == 1)
{
if (state.Item2.Count > 0)
{
next = state.Item2.Dequeue();
}
}
else
{
state.Item2.Enqueue(val.Item2);
}
return Tuple.Create(next, state.Item2,val.Item1);
})
.Where(x=>(x.Item1!=null && x.Item3==1))
.Select(x => x.Item1);
merged.Subscribe(x => Console.WriteLine("Merged "+x));
UPDATE Fixed code for OP:
public class DiscriminatedUnion
{
public static IObservable<TInput2> Process<TInput1, TInput2>(
IObservable<TInput1> input1,
IObservable<TInput2> input2)
{
var merged =
input2.Select(s2 => Tuple.Create(2, (object)s2))
.Merge(input1.Select(s1 => Tuple.Create(1, (object)s1)))
.Scan(Tuple.Create(default(TInput2), new Queue<TInput2>(), 0), (state, val) =>
{
TInput2 next = state.Item1;
if (val.Item1 == 1)
{
if (state.Item2.Count > 0)
{
next = state.Item2.Dequeue();
}
}
else
{
state.Item2.Enqueue((TInput2)val.Item2);
}
return Tuple.Create(next, state.Item2, val.Item1);
})
.Where(x => (!x.Item1.Equals(default(TInput2)) && x.Item3 == 1))
.Select(x => x.Item1);
return merged;
}
}
Upvotes: 1
Reputation: 14350
Here's a testable representation of your problem (or marble diagram), using NuGet package Microsoft.Reactive.Testing
:
var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<int>(
ReactiveTest.OnNext(1000.Ms(), 1),
ReactiveTest.OnNext(2000.Ms(), 2),
ReactiveTest.OnNext(3000.Ms(), 3),
ReactiveTest.OnNext(4000.Ms(), 4),
ReactiveTest.OnNext(5000.Ms(), 5),
ReactiveTest.OnNext(6000.Ms(), 6),
ReactiveTest.OnNext(7000.Ms(), 7)
);
var input2 = scheduler.CreateColdObservable<string>(
ReactiveTest.OnNext(1400.Ms(), "a"),
ReactiveTest.OnNext(1500.Ms(), "b"),
ReactiveTest.OnNext(1600.Ms(), "c"),
ReactiveTest.OnNext(5500.Ms(), "d"),
ReactiveTest.OnNext(5600.Ms(), "e"),
ReactiveTest.OnNext(5700.Ms(), "f")
);
which uses this extension method:
public static class TickExtensions
{
public static long Ms(this int ms)
{
return TimeSpan.FromMilliseconds(ms).Ticks;
}
}
The problem is basically a state-machine problem that involves two observables of different types. The best way to solve this is with a Discriminated Union type, which doesn't exist in C#, so we'll create one. @Sentinel's answer did this with a Tuple, and that can work as well:
public class DUnion<T1, T2>
{
public DUnion(T1 t1)
{
Type1Item = t1;
Type2Item = default(T2);
IsType1 = true;
}
public DUnion(T2 t2)
{
Type2Item = t2;
Type1Item = default(T1);
IsType1 = false;
}
public bool IsType1 { get; }
public bool IsType2 => !IsType1;
public T1 Type1Item { get; }
public T2 Type2Item { get; }
}
We can then take our two differently-typed streams, Select
and Merge
them into one discriminated union stream, where we can manage the state with Scan
. Your state logic is a bit tricky, but doable:
Here's the resulting observable (uses NuGet package System.Collections.Immutable
):
var result = input1.Select(i => new DUnion<int, string>(i))
.Merge(input2.Select(s => new DUnion<int, string>(s)))
.Scan((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false), (state, dItem) => dItem.IsType1
? state.queue.IsEmpty
? (state.queue, null, false, false) //Is integer, but empty queue, so don't emit item
: state.queue.Dequeue().IsEmpty //Is integer, at least one item: dequeue unless only one item, then emit either way
? (state.queue, state.queue.Peek(), true, true)
: (state.queue.Dequeue(), state.queue.Peek(), false, true)
: state.isFakeEmptyState //Is new string, just add to queue, don't emit
? (state.queue.Dequeue().Enqueue(dItem.Type2Item), null, false, false)
: (state.queue.Enqueue(dItem.Type2Item), (string)null, false, false)
)
.Where(t => t.emit)
.Select(t => t.item);
This can then be tested as follows:
var observer = scheduler.CreateObserver<string>();
result.Subscribe(observer);
scheduler.Start();
observer.Messages.Dump(); //Linqpad. Can replace with Console.Writeline loop.
Update: I thought about this a bit, and I think it makes sense to throw some operators around the Discriminated Union functionality. This way you don't have to explicitly deal with the type:
public static class DUnionExtensions
{
public class DUnion<T1, T2>
{
public DUnion(T1 t1)
{
Type1Item = t1;
Type2Item = default(T2);
IsType1 = true;
}
public DUnion(T2 t2)
{
Type2Item = t2;
Type1Item = default(T1);
IsType1 = false;
}
public bool IsType1 { get; }
public bool IsType2 => !IsType1;
public T1 Type1Item { get; }
public T2 Type2Item { get; }
}
public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
{
return a.Select(x => new DUnion<T1, T2>(x))
.Merge(b.Select(x => new DUnion<T1, T2>(x)));
}
public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
TState initialState,
Func<TState, T1, TState> type1Handler,
Func<TState, T2, TState> type2Handler)
{
return source.Scan(initialState, (state, u) => u.IsType1
? type1Handler(state, u.Type1Item)
: type2Handler(state, u.Type2Item)
);
}
}
With those extension methods, the solution changes to this, which I think reads better:
var result = input1
.Union(input2)
.ScanUnion((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false),
(state, _) => state.queue.IsEmpty
? (state.queue, null, false, false) //empty queue, so don't emit item
: state.queue.Dequeue().IsEmpty //At least one item: dequeue unless only one item, then emit either way
? (state.queue, state.queue.Peek(), true, true) //maintain last item, enter Fake-EmptyState
: (state.queue.Dequeue(), state.queue.Peek(), false, true),
(state, s) => state.isFakeEmptyState
? (state.queue.Dequeue().Enqueue(s), null, false, false)
: (state.queue.Enqueue(s), (string)null, false, false)
)
.Where(t => t.emit)
.Select(t => t.item);
If you're having trouble with the named Tuple syntax, then you can use the old tuples:
var result = input1
.Union(input2)
.ScanUnion(Tuple.Create(ImmutableQueue<string>.Empty, (string)null, false, false),
(state, _) => state.Item1.IsEmpty
? Tuple.Create(state.Item1, (string)null, false, false) //empty queue, so don't emit item
: state.Item1.Dequeue().IsEmpty //At least one item: dequeue unless only one item, then emit either way
? Tuple.Create(state.Item1, state.Item1.Peek(), true, true) //maintain last item, enter Fake-EmptyState
: Tuple.Create(state.Item1.Dequeue(), state.Item1.Peek(), false, true),
(state, s) => state.Item3
? Tuple.Create(state.Item1.Dequeue().Enqueue(s), (string)null, false, false)
: Tuple.Create(state.Item1.Enqueue(s), (string)null, false, false)
)
.Where(t => t.Item4)
.Select(t => t.Item2);
Upvotes: 2