Michal Ciechan
Michal Ciechan

Reputation: 13888

Rx - CombineLatest Queue

I am trying to achieve Queue like functionality using Rx (I know I can accomplish it using a Queue + Locking, but trying to learn + use Rx as I don't get many opportunities to use it).

Functionality is, I want to take some action on an event to do, but only want to process one at a time, and once finished, process the next/last one (if it is new).

This is what I have at the moment (using an incrementing flag observable so I can do DistinctUntilChanged, but that feels like a hack).

        // Some source
        var events = new Subject<string>();
        events.Subscribe(s => Console.WriteLine("Event: " + s));

        // How can I get rid of this
        var counter = 0;

        var flag = new Subject<int>();
        flag.Subscribe(i => Console.WriteLine("Flag: " + i));
        var combined = events
            .CombineLatest(flag, (s, i) => new {Event = s, Flag = i});

        var worker = combined
            .DistinctUntilChanged(arg => arg.Flag)
            .DistinctUntilChanged(arg => arg.Event);

        worker.Subscribe(x => Console.WriteLine("\r\nDo Work: " + x.Event + "\r\n"));


        flag.OnNext(counter++); // Ready
        events.OnNext("one"); // Idle, Start eight

        events.OnNext("two");
        events.OnNext("three");
        events.OnNext("four");

        events.OnNext("five"); // Process
        flag.OnNext(counter++); // Finished one, start five

        events.OnNext("six");
        events.OnNext("seven");

        events.OnNext("eight"); // Idle, Start eight
        flag.OnNext(counter++); // Finished five, start eight
        flag.OnNext(counter++); // Finished eight
        
        events.OnNext("nine"); // Should be processed

If you run this code, you will see that the last event does not get processed, even though the actor is idle.

Feels like I am missing something here....

At the moment I am looking at using Observables of Observables somehow.... but been trying to figure this out for last couple or so hours :-(

Edit

Managed to do it by changing subject to ReplaySubject

        var events = new ReplaySubject<string>(1);

and introducing one more variable, but seems like another hack, but it did let me get rid of the counter.

        string lastSeen = null;
        flag.Select(x => events.Where(s => s != lastSeen).Take(1))
            .Subscribe(x =>
                x.Subscribe(s =>
                {
                    lastSeen = s;
                    Console.WriteLine(s);
                })
            );

If someone knows a better way where I can get rid of string lastSeen or simplify my nesting/subscriptions that would be great.

Upvotes: 0

Views: 222

Answers (1)

CharlesNRice
CharlesNRice

Reputation: 3259

If you are willing to mix in TPL DataFlow Blocks you can do what you want. Though I must say it's a bit non-Rxish.

var source = Observable.Interval(TimeSpan.FromSeconds(1));
var queue = new BroadcastBlock<long>(null);
var subscription = queue.AsObservable().DistinctUntilChanged().Subscribe(l =>
{
    Thread.Sleep(2500);
    Console.WriteLine(l);
});

source.SubscribeOn(ThreadPoolScheduler.Instance).Subscribe(queue.AsObserver());

The BroadcastBlock is the queue that will only hold the last value. The TPL DataFlow Blocks do have nice support with Rx. So we can subscribe to the event and put that into the BroadcastBlock and then subscribe off the BroadcastBlock.

Upvotes: 1

Related Questions