deafsheep
deafsheep

Reputation: 789

Rx queue implementation and Dispatcher's buffer

I want to implement a queue which is capable of taking events/items from multiple producers in multiple threads, and consume them all on single thread. this queue will work in some critical environment, so I am quite concerned with it's stability.

I have implemented it using Rx capabilities, but I have 2 questions:

  1. Is this implementation OK? Or maybe it is flawed in some way I do not know of? (as an alternative - manual implementation with Queue and locks)
  2. What is Dispatcher's buffer length? Can it handle 100k of queued items?

The code below illustrates my approach, using a simple TestMethod. It's output shows that all values are put in from different threads, but are processed on another single thread.

[TestMethod()]
public void RxTest()
{
    Subject<string> queue = new Subject<string>();

    queue
        .ObserveOnDispatcher()
        .Subscribe(s =>
                        {
                            Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", s, Thread.CurrentThread.ManagedThreadId);
                        },
                    () => Dispatcher.CurrentDispatcher.InvokeShutdown());

    for (int j = 0; j < 10; j++)
    {
        ThreadPool.QueueUserWorkItem(o =>
        {
            for (int i = 0; i < 100; i++)
            {
                Thread.Sleep(10);
                queue.OnNext(string.Format("value: {0}, from thread: {1}", i.ToString(), Thread.CurrentThread.ManagedThreadId));
            }
            queue.OnCompleted();
        });
    }


    Dispatcher.Run();
}

Upvotes: 3

Views: 1899

Answers (2)

Enigmativity
Enigmativity

Reputation: 117064

Take a look at EventLoopScheduler. It's built-in to RX and I think it does everything you want.

You can take any number of observables, call .ObserveOn(els) (els is your instance of an EventLoopScheduler) and you're now marshalling multiple observable from multiple threads onto a single thread and queuing each call to OnNext serially.

Upvotes: 3

yamen
yamen

Reputation: 15618

I'm not sure about the behaviour of Subject in heavily multithreaded scenarios. I can imagine though that something like BlockingCollection (and its underlying ConcurrentQueue) are well worn in the situations you're talking about. And simple to boot.

var queue = new BlockingCollection<long>();

// subscribing
queue.GetConsumingEnumerable()
     .ToObservable(Scheduler.NewThread)
     .Subscribe(i => Debug.WriteLine("Value: {0}, Observed on ThreadId: {1}", i, Thread.CurrentThread.ManagedThreadId));

// sending
Observable.Interval(TimeSpan.FromMilliseconds(500), Scheduler.ThreadPool)
          .Do(i => Debug.WriteLine("Value: {0}, Sent on ThreadId: {1}", i, Thread.CurrentThread.ManagedThreadId))
          .Subscribe(i => queue.Add(i));

You certainly don't want to touch queues and locks. The ConcurrentQueue implementation is excellent and will certainly handle the size queues you're talking about effectively.

Upvotes: 4

Related Questions