Matt Austin
Matt Austin

Reputation: 249

Simple way to concurrently subscribe to observable collection with limited subscribers

I've been trying to implement a simple producer-consumer pattern using Rx and observable collections. I also need to be able to throttle the number of subscribers easily. I have seen lots of references to LimitedConcurrencyLevelTaskScheduler in parallel extensions but I don't seem to be able to get this to use multiple threads.

I think I'm doing something silly so I was hoping someone could explain what. In the unit test below, I expect multiple (2) threads to be used to consume the strings in the blocking collection. What am I doing wrong?

[TestClass]
public class LimitedConcurrencyLevelTaskSchedulerTestscs
{
    private ConcurrentBag<string> _testStrings = new ConcurrentBag<string>();
    ConcurrentBag<int> _threadIds= new ConcurrentBag<int>();

    [TestMethod]
    public void WhenConsumingFromBlockingCollection_GivenLimitOfTwoThreads_TwoThreadsAreUsed()
    {

        // Setup the command queue for processing combinations
        var commandQueue = new BlockingCollection<string>();

        var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(2));
        var scheduler = new TaskPoolScheduler(taskFactory);

        commandQueue.GetConsumingEnumerable()
            .ToObservable(scheduler)
            .Subscribe(Go, ex => { throw ex; });

        var iterationCount = 100;
        for (int i = 0; i < iterationCount; i++)
        {
            commandQueue.Add(string.Format("string {0}", i));
        }
        commandQueue.CompleteAdding();

        while (!commandQueue.IsCompleted)
        {
            Thread.Sleep(100);
        }

        Assert.AreEqual(iterationCount, _testStrings.Count);
        Assert.AreEqual(2, _threadIds.Distinct().Count());
    }

    private void Go(string testString)
    {
        _testStrings.Add(testString);
        _threadIds.Add(Thread.CurrentThread.ManagedThreadId);
    }
}

Upvotes: 1

Views: 2506

Answers (2)

Enigmativity
Enigmativity

Reputation: 117064

Everyone seems to go through the same learning curve with Rx. The thing to understand is that Rx doesn't do parallel processing unless you explicitly make a query that forces parallelism. Schedulers do not introduce parallelism.

Rx has a contract of behaviour that says zero or more values are produced in series (regardless of how many threads might be used), one after another, with no overlap, finally to be followed by an optional single error or a single complete message, and then nothing else.

This is often written as OnNext*(OnError|OnCompleted).

All that schedulers do is define the rule to determine which thread a new value is processed on if the scheduler has no pending values it is processing for the current observable.

Now take your code:

var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(2));
var scheduler = new TaskPoolScheduler(taskFactory);

This says that the scheduler will run values for a subscription on one of two threads. But it doesn't mean that it will do this for every value produced. Remember, since values are produced in series, one after another, it is better to re-use an existing thread than to go to the high cost of creating a new thread. So what Rx does is re-use the existing thread if a new value is scheduled on the scheduler before the current value is finished being processed.

This is the key - it re-uses the thread if a new value is scheduled before the processing of existing values is complete.

So your code does this:

commandQueue.GetConsumingEnumerable()
    .ToObservable(scheduler)
    .Subscribe(Go, ex => { throw ex; });

It means that the scheduler will only create a thread when the first value comes along. But by the time the expensive thread creation operation is complete then the code that adds values to the commandQueue is also done so it has queued them all and hence it can more efficiently use a single thread rather than create a costly second one.

To avoid this you need to construct the query to introduce parallelism.

Here's how:

public void WhenConsumingFromBlockingCollection_GivenLimitOfTwoThreads_TwoThreadsAreUsed()
{
    var taskFactory = new TaskFactory(new LimitedConcurrencyLevelTaskScheduler(2));
    var scheduler = new TaskPoolScheduler(taskFactory);

    var iterationCount = 100;

    Observable
        .Range(0, iterationCount)
        .SelectMany(n => Observable.Start(() => n.ToString(), scheduler)
        .Do(x => Go(x)))
        .Wait();

    (iterationCount ==  _testStrings.Count).Dump();
    (2 == _threadIds.Distinct().Count()).Dump();
}

Now, I've used the Do(...)/.Wait() combo to give you the equivalent of a blocking .Subscribe(...) method.

This results is your asserts both returning true.

Upvotes: 7

Matt Austin
Matt Austin

Reputation: 249

I have found that by modifying the subscription as follows I can add 5 subscribers but only two threads will process the contents of the collection so this serves my purpose.

        for(int i = 0; i < 5; i++)
            observable.Subscribe(Go, ex => { throw ex; });

I'd be interested to know if there is a better or more elegant way to achieve this!

Upvotes: 0

Related Questions