Reputation: 13888
I am trying to achieve Queue like functionality using Rx (I know I can accomplish it using a Queue + Locking, but trying to learn + use Rx as I don't get many opportunities to use it).
Functionality is, I want to take some action on an event to do, but only want to process one at a time, and once finished, process the next/last one (if it is new).
This is what I have at the moment (using an incrementing flag observable so I can do DistinctUntilChanged
, but that feels like a hack).
// Some source
var events = new Subject<string>();
events.Subscribe(s => Console.WriteLine("Event: " + s));
// How can I get rid of this
var counter = 0;
var flag = new Subject<int>();
flag.Subscribe(i => Console.WriteLine("Flag: " + i));
var combined = events
.CombineLatest(flag, (s, i) => new {Event = s, Flag = i});
var worker = combined
.DistinctUntilChanged(arg => arg.Flag)
.DistinctUntilChanged(arg => arg.Event);
worker.Subscribe(x => Console.WriteLine("\r\nDo Work: " + x.Event + "\r\n"));
flag.OnNext(counter++); // Ready
events.OnNext("one"); // Idle, Start eight
events.OnNext("two");
events.OnNext("three");
events.OnNext("four");
events.OnNext("five"); // Process
flag.OnNext(counter++); // Finished one, start five
events.OnNext("six");
events.OnNext("seven");
events.OnNext("eight"); // Idle, Start eight
flag.OnNext(counter++); // Finished five, start eight
flag.OnNext(counter++); // Finished eight
events.OnNext("nine"); // Should be processed
If you run this code, you will see that the last event does not get processed, even though the actor is idle.
Feels like I am missing something here....
At the moment I am looking at using Observables of Observables somehow.... but been trying to figure this out for last couple or so hours :-(
Managed to do it by changing subject to ReplaySubject
var events = new ReplaySubject<string>(1);
and introducing one more variable, but seems like another hack, but it did let me get rid of the counter.
string lastSeen = null;
flag.Select(x => events.Where(s => s != lastSeen).Take(1))
.Subscribe(x =>
x.Subscribe(s =>
{
lastSeen = s;
Console.WriteLine(s);
})
);
If someone knows a better way where I can get rid of string lastSeen
or simplify my nesting/subscriptions that would be great.
Upvotes: 0
Views: 222
Reputation: 3259
If you are willing to mix in TPL DataFlow Blocks you can do what you want. Though I must say it's a bit non-Rxish.
var source = Observable.Interval(TimeSpan.FromSeconds(1));
var queue = new BroadcastBlock<long>(null);
var subscription = queue.AsObservable().DistinctUntilChanged().Subscribe(l =>
{
Thread.Sleep(2500);
Console.WriteLine(l);
});
source.SubscribeOn(ThreadPoolScheduler.Instance).Subscribe(queue.AsObserver());
The BroadcastBlock is the queue that will only hold the last value. The TPL DataFlow Blocks do have nice support with Rx. So we can subscribe to the event and put that into the BroadcastBlock and then subscribe off the BroadcastBlock.
Upvotes: 1