jlarsch
jlarsch

Reputation: 2307

Reactive Rx zip queue in .Net

I am fairly new to the concept of reactive programming. I am using Bonsai, which exposes some but not all .Net rx commands through c#.

I am trying to get a behavior like this marble diagram:

input1: ---1--------2--------3--------4--------5--------6--------7
input2: -------abc----------------------------------def-----------
result: ------------a--------b--------c--------c---------d-------e

Basically, input 2 generates waves of events that should be stored in a queue. Input 1 acts as a trigger to emit single items from this queue.

When the queue is empty, the last item of the queue should be emitted. I tried various combinations of zip and combineLatest but I cannot get the desired behavior.

I also tried an implementation of WithLatestFrom based on this post, but I realize in retrospect this is also not going to produce the desired behavior.

public IObservable<Tuple<TSource, TOther>> Process<TSource, TOther>(
            IObservable<TSource> source,
            IObservable<TOther> other)
        {


            // return source1.WithLatestFrom(source2, (xs, ys) => Tuple.Create(xs, ys));
            return source.Publish(os => other.Select(a => os.Select(b => Tuple.Create(b, a))).Switch());
        }

Are there any operators or combinations of operators that will produce this behavior? I can do the implementation to Bonsai once I understand which operators to use.

UPDATE 1: 2018/05/18

Based on Sentinel's post, I wrote a new class DiscriminatedUnion inside the Bonsai namespace. I didn't manage to specify the appropriate types though. The compiler states 'type arguments for Merge cannot be inferred' (in .Merge(input1.Select...). Where do I add the correct type specification?

using System.Reactive.Linq;
using System.ComponentModel;
using System.Collections.Immutable;    
namespace Bonsai.Reactive
{
    [Combinator]
   // [XmlType(Namespace = Constants.XmlNamespace)]
    [Description("Implementation of Discriminated Union")]
    public class DiscriminatedUnion
    {
        public IObservable<int?> Process<TInput1, TInput2>(
           IObservable<TInput1> input1,
            IObservable<TInput2> input2)
        {
            var merged =
                        input2.Select(s2 => Tuple.Create(2, (TInput2)s2))
                        .Merge(input1.Select(s1 => Tuple.Create(1, (TInput1)s1)))
                        .Scan(Tuple.Create((int?)null, new Queue<int>(), 0), (state, val) =>
                        {
                            int? next = state.Item1;
                            if (val.Item1 == 1)
                            {
                                if (state.Item2.Count > 0)
                                {
                                    next = state.Item2.Dequeue();
                                }
                            }
                            else
                            {
                                state.Item2.Enqueue(val.Item2);
                            }
                            return Tuple.Create(next, state.Item2, val.Item1);
                        })
                        .Where(x => (x.Item1 != null && x.Item3 == 1))
                        .Select(x => x.Item1);
            return merged;
        }
    }
}

Upvotes: 2

Views: 993

Answers (3)

Sentinel
Sentinel

Reputation: 3697

I love these Rx puzzles. Can't believe one get paid to do this. So I came up with a slightly different approach. I think there are some weaknesses with race conditions here, but I would be curious what you think and how these can be eliminated.

The basic idea is to think of the queue as a recursive buffer-until over source1, where the buffer is replayed into the queue sans first element.

UPDATE

Based on shlomo's observation that publish().refcount() is needed, I updated the code and turned the solution into an extension "RegulatedQueue". Please see the below code. Input2 is the source to regulate via a queue, Input1 is the regulating signal.

public static class RxHelpers
{
    public static IObservable<TInput2> RegulatedQueue<TInput1, TInput2>(this IObservable<TInput2> input2,
       IObservable<TInput1> input1
        )
    {
        return Observable.Using(() => new Subject<TInput2>(),
        queue =>
        {
            input2.Subscribe(queue);
            return queue
                .Buffer(() => input1)
                .Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
                .Where(l => l.Count > 0)
                .Select(l => l.First()).
                Publish().
                RefCount();
        });
    }
}


class Program
{


    static void Main(string[] args)
    {
        Random r = new Random();
        var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
        var source2 = Observable.Interval(TimeSpan.FromMilliseconds(2000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();

        source1.Subscribe(x => Console.WriteLine("Source1 " + x));
        source2.Subscribe(x => Console.WriteLine("Source2 " + x));

        var merged = source2.RegulatedQueue(source1);

        merged.Subscribe(x => Console.WriteLine("Merged1 " + x));
        merged.Subscribe(x => Console.WriteLine("Merged2 " + x));






        Console.ReadKey();

    }
}

OBSOLETE

  static void Main(string[] args)
    {
        Random r = new Random();
        var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
        var source2 = Observable.Interval(TimeSpan.FromMilliseconds(7000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();

        source1.Subscribe(x => Console.WriteLine("Source1 " + x));
        source2.Subscribe(x => Console.WriteLine("Source2 " + x));

        //THIS BIT
         Subject<int> queue = new Subject<int>();
        source2.Subscribe(queue);
        var merged=queue
            .Buffer(() => source1)
            .Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
            .Where(l=>l.Count > 0)
            .Select(l => l.First());





            merged.Subscribe(x => Console.WriteLine("Merged "+x));







        Console.ReadKey();

    }

Testcode:

var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<int>(
    ReactiveTest.OnNext(1000.Ms(), 1),
    ReactiveTest.OnNext(2000.Ms(), 2),
    ReactiveTest.OnNext(3000.Ms(), 3),
    ReactiveTest.OnNext(4000.Ms(), 4),
    ReactiveTest.OnNext(5000.Ms(), 5),
    ReactiveTest.OnNext(6000.Ms(), 6),
    ReactiveTest.OnNext(7000.Ms(), 7)
);
var input2 = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(1400.Ms(), "a"),
    ReactiveTest.OnNext(1500.Ms(), "b"),
    ReactiveTest.OnNext(1600.Ms(), "c"),
    ReactiveTest.OnNext(5500.Ms(), "d"),
    ReactiveTest.OnNext(5600.Ms(), "e"),
    ReactiveTest.OnNext(5700.Ms(), "f")
);

Subject<string> queue = new Subject<string>();
input2.Subscribe(queue);
var result = queue
    .Buffer(() => input1)
    .Do(l => { foreach (var n in l.Skip(l.Count > 1 ? 1 : 0)) queue.OnNext(n); })
    .Where(l => l.Count > 0)
    .Select(l => l[0]);

result.Timestamp(scheduler)
    .Select(t => $"{t.Timestamp.Ticks} ticks: {t.Value}")
    .Dump(); //Linqpad

expected output:

//14000000 enqueue a
//15000000 enqueue b
//16000000 enqueue c
20000000 ticks: a 
30000000 ticks: b 
40000000 ticks: c 
50000000 ticks: c 
//55000000 enqueue d
//56000000 enqueue e
//57000000 enqueue f
60000000 ticks: c //should really be d, but there's no handling for fake-empty ejection
70000000 ticks: d 
80000000 ticks: e 
90000000 ticks: f 
100000000 ticks: f 
110000000 ticks: f 
120000000 ticks: f 
130000000 ticks: f 
140000000 ticks: f 
...

actual output:

20000000 ticks: a 
30000000 ticks: b 
40000000 ticks: c 
50000000 ticks: b 
60000000 ticks: c 
70000000 ticks: b 
80000000 ticks: c 
90000000 ticks: c 
100000000 ticks: b 
110000000 ticks: c 
120000000 ticks: c 
130000000 ticks: b 
140000000 ticks: c 
150000000 ticks: b 
160000000 ticks: c 
170000000 ticks: b 
180000000 ticks: c 
190000000 ticks: c 

Upvotes: 1

Sentinel
Sentinel

Reputation: 3697

Would this do the trick? There is a probably a better way to do this buffers so it might be worth revisiting this.

        Random r = new Random();
        var source1 = Observable.Interval(TimeSpan.FromSeconds(1)).Publish().RefCount();
        var source2 = Observable.Interval(TimeSpan.FromMilliseconds(7000)).Select(x => Enumerable.Range(1, 3).Select(y => r.Next(200)).ToObservable()).SelectMany(x => x).Publish().RefCount();

        source1.Subscribe(x => Console.WriteLine("Source1 " + x));
        source2.Subscribe(x => Console.WriteLine("Source2 " + x));



        var merged =
            source2.Select(s2 => Tuple.Create(2, s2))
            .Merge(source1.Select(s1 => Tuple.Create(1, (int)s1)))
            .Scan(Tuple.Create((int?)null, new Queue<int>(),0), (state, val) =>
                 {
                     int? next = state.Item1;
                     if (val.Item1 == 1)
                     {
                         if (state.Item2.Count > 0)
                         {
                             next = state.Item2.Dequeue();
                         }
                     }
                     else
                     {
                         state.Item2.Enqueue(val.Item2);

                     }
                     return Tuple.Create(next, state.Item2,val.Item1);
                 })
            .Where(x=>(x.Item1!=null && x.Item3==1))
            .Select(x => x.Item1);



        merged.Subscribe(x => Console.WriteLine("Merged "+x));

UPDATE Fixed code for OP:

 public class DiscriminatedUnion
{
    public static IObservable<TInput2> Process<TInput1, TInput2>(
       IObservable<TInput1> input1,
        IObservable<TInput2> input2)
    {
        var merged =
                    input2.Select(s2 => Tuple.Create(2, (object)s2))
                    .Merge(input1.Select(s1 => Tuple.Create(1, (object)s1)))
                    .Scan(Tuple.Create(default(TInput2), new Queue<TInput2>(), 0), (state, val) =>
                    {
                        TInput2 next = state.Item1;
                        if (val.Item1 == 1)
                        {
                            if (state.Item2.Count > 0)
                            {
                                next = state.Item2.Dequeue();
                            }
                        }
                        else
                        {
                            state.Item2.Enqueue((TInput2)val.Item2);
                        }
                        return Tuple.Create(next, state.Item2, val.Item1);
                    })
                    .Where(x => (!x.Item1.Equals(default(TInput2)) && x.Item3 == 1))
                    .Select(x => x.Item1);
        return merged;
    }
}

Upvotes: 1

Shlomo
Shlomo

Reputation: 14350

Here's a testable representation of your problem (or marble diagram), using NuGet package Microsoft.Reactive.Testing:

var scheduler = new TestScheduler();
var input1 = scheduler.CreateColdObservable<int>(
    ReactiveTest.OnNext(1000.Ms(), 1),
    ReactiveTest.OnNext(2000.Ms(), 2),
    ReactiveTest.OnNext(3000.Ms(), 3),
    ReactiveTest.OnNext(4000.Ms(), 4),
    ReactiveTest.OnNext(5000.Ms(), 5),
    ReactiveTest.OnNext(6000.Ms(), 6),
    ReactiveTest.OnNext(7000.Ms(), 7)
);
var input2 = scheduler.CreateColdObservable<string>(
    ReactiveTest.OnNext(1400.Ms(), "a"),
    ReactiveTest.OnNext(1500.Ms(), "b"),
    ReactiveTest.OnNext(1600.Ms(), "c"),
    ReactiveTest.OnNext(5500.Ms(), "d"),
    ReactiveTest.OnNext(5600.Ms(), "e"),
    ReactiveTest.OnNext(5700.Ms(), "f")
);

which uses this extension method:

public static class TickExtensions
{
    public static long Ms(this int ms)
    {
        return TimeSpan.FromMilliseconds(ms).Ticks;
    }
}

The problem is basically a state-machine problem that involves two observables of different types. The best way to solve this is with a Discriminated Union type, which doesn't exist in C#, so we'll create one. @Sentinel's answer did this with a Tuple, and that can work as well:

public class DUnion<T1, T2>
{
    public DUnion(T1 t1) 
    { 
        Type1Item = t1;
        Type2Item = default(T2);
        IsType1 = true;
    }

    public DUnion(T2 t2) 
    { 
        Type2Item = t2;
        Type1Item = default(T1);
        IsType1 = false;
    }

    public bool IsType1 { get; }
    public bool IsType2 => !IsType1;

    public T1 Type1Item { get; }
    public T2 Type2Item { get; }
}

We can then take our two differently-typed streams, Select and Merge them into one discriminated union stream, where we can manage the state with Scan. Your state logic is a bit tricky, but doable:

  • if a number arrives and there's no items in the queue, do nothing
  • if a number arrives and there's items in the queue, emit the first item in the queue.
    • If there's more than one item, remove the recent emmision from the queue.
    • If the queue only has one item, don't remove it, and go into 'fake-empty' state.
  • if a string arrives, stick it in the queue.
    • If the queue is 'fake-empty', eject the last item and exit 'fake-empty' state.

Here's the resulting observable (uses NuGet package System.Collections.Immutable):

var result = input1.Select(i => new DUnion<int, string>(i))
    .Merge(input2.Select(s => new DUnion<int, string>(s)))
    .Scan((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false), (state, dItem) => dItem.IsType1
        ? state.queue.IsEmpty   
            ? (state.queue, null, false, false)     //Is integer, but empty queue, so don't emit item
            : state.queue.Dequeue().IsEmpty //Is integer, at least one item: dequeue unless only one item, then emit either way
                ? (state.queue,           state.queue.Peek(), true,  true)
                : (state.queue.Dequeue(), state.queue.Peek(), false, true)
        : state.isFakeEmptyState //Is new string, just add to queue, don't emit
            ? (state.queue.Dequeue().Enqueue(dItem.Type2Item), null, false, false) 
            : (state.queue.Enqueue(dItem.Type2Item),   (string)null, false, false) 
    )
    .Where(t => t.emit)
    .Select(t => t.item);

This can then be tested as follows:

var observer = scheduler.CreateObserver<string>();
result.Subscribe(observer);
scheduler.Start();
observer.Messages.Dump(); //Linqpad. Can replace with Console.Writeline loop.

Update: I thought about this a bit, and I think it makes sense to throw some operators around the Discriminated Union functionality. This way you don't have to explicitly deal with the type:

public static class DUnionExtensions
{
    public class DUnion<T1, T2>
    {
        public DUnion(T1 t1)
        {
            Type1Item = t1;
            Type2Item = default(T2);
            IsType1 = true;
        }

        public DUnion(T2 t2)
        {
            Type2Item = t2;
            Type1Item = default(T1);
            IsType1 = false;
        }

        public bool IsType1 { get; }
        public bool IsType2 => !IsType1;

        public T1 Type1Item { get; }
        public T2 Type2Item { get; }
    }

    public static IObservable<DUnion<T1, T2>> Union<T1, T2>(this IObservable<T1> a, IObservable<T2> b)
    {
        return a.Select(x => new DUnion<T1, T2>(x))
            .Merge(b.Select(x => new DUnion<T1, T2>(x)));
    }

    public static IObservable<TState> ScanUnion<T1, T2, TState>(this IObservable<DUnion<T1, T2>> source,
            TState initialState,
            Func<TState, T1, TState> type1Handler,
            Func<TState, T2, TState> type2Handler)
        {
            return source.Scan(initialState, (state, u) => u.IsType1
                ? type1Handler(state, u.Type1Item)
                : type2Handler(state, u.Type2Item)
            );
        }
}

With those extension methods, the solution changes to this, which I think reads better:

var result = input1
    .Union(input2)
    .ScanUnion((queue: ImmutableQueue<string>.Empty, item: (string)null, isFakeEmptyState: false, emit: false), 
        (state, _) => state.queue.IsEmpty
            ? (state.queue, null, false, false)     //empty queue, so don't emit item
            : state.queue.Dequeue().IsEmpty         //At least one item: dequeue unless only one item, then emit either way
                ? (state.queue, state.queue.Peek(), true, true) //maintain last item, enter Fake-EmptyState
                : (state.queue.Dequeue(), state.queue.Peek(), false, true),
        (state, s) => state.isFakeEmptyState 
            ? (state.queue.Dequeue().Enqueue(s), null, false, false)
            : (state.queue.Enqueue(s), (string)null, false, false)
    )
    .Where(t => t.emit)
    .Select(t => t.item); 

If you're having trouble with the named Tuple syntax, then you can use the old tuples:

var result = input1
    .Union(input2)
    .ScanUnion(Tuple.Create(ImmutableQueue<string>.Empty, (string)null, false, false),
        (state, _) => state.Item1.IsEmpty
            ? Tuple.Create(state.Item1, (string)null, false, false)     //empty queue, so don't emit item
            : state.Item1.Dequeue().IsEmpty         //At least one item: dequeue unless only one item, then emit either way
                ? Tuple.Create(state.Item1, state.Item1.Peek(), true, true) //maintain last item, enter Fake-EmptyState
                : Tuple.Create(state.Item1.Dequeue(), state.Item1.Peek(), false, true),
        (state, s) => state.Item3
            ? Tuple.Create(state.Item1.Dequeue().Enqueue(s), (string)null, false, false)
            : Tuple.Create(state.Item1.Enqueue(s), (string)null, false, false)
    )
    .Where(t => t.Item4)
    .Select(t => t.Item2);

Upvotes: 2

Related Questions