Reputation: 789
I want to implement a queue which is capable of taking events/items from multiple producers in multiple threads, and consume them all on single thread. this queue will work in some critical environment, so I am quite concerned with it's stability.
I have implemented it using Rx capabilities, but I have 2 questions:
The code below illustrates my approach, using a simple TestMethod. It's output shows that all values are put in from different threads, but are processed on another single thread.
[TestMethod()]
public void RxTest()
{
Subject<string> queue = new Subject<string>();
queue
.ObserveOnDispatcher()
.Subscribe(s =>
{
Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", s, Thread.CurrentThread.ManagedThreadId);
},
() => Dispatcher.CurrentDispatcher.InvokeShutdown());
for (int j = 0; j < 10; j++)
{
ThreadPool.QueueUserWorkItem(o =>
{
for (int i = 0; i < 100; i++)
{
Thread.Sleep(10);
queue.OnNext(string.Format("value: {0}, from thread: {1}", i.ToString(), Thread.CurrentThread.ManagedThreadId));
}
queue.OnCompleted();
});
}
Dispatcher.Run();
}
Upvotes: 3
Views: 1899
Reputation: 117064
Take a look at EventLoopScheduler
. It's built-in to RX and I think it does everything you want.
You can take any number of observables, call .ObserveOn(els)
(els
is your instance of an EventLoopScheduler
) and you're now marshalling multiple observable from multiple threads onto a single thread and queuing each call to OnNext
serially.
Upvotes: 3
Reputation: 15618
I'm not sure about the behaviour of Subject
in heavily multithreaded scenarios. I can imagine though that something like BlockingCollection
(and its underlying ConcurrentQueue
) are well worn in the situations you're talking about. And simple to boot.
var queue = new BlockingCollection<long>();
// subscribing
queue.GetConsumingEnumerable()
.ToObservable(Scheduler.NewThread)
.Subscribe(i => Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", i, Thread.CurrentThread.ManagedThreadId));
// sending
Observable.Interval(TimeSpan.FromMilliseconds(500), Scheduler.ThreadPool)
.Do(i => Debug.WriteLine("Value: {0}, Sent on ThreadId: {1}", i, Thread.CurrentThread.ManagedThreadId))
.Subscribe(i => queue.Add(i));
You certainly don't want to touch queues and locks. The ConcurrentQueue
implementation is excellent and will certainly handle the size queues you're talking about effectively.
Upvotes: 4