user2864740
user2864740

Reputation: 61875

BlockingCollection<T>.TakeFromAny, for collections with a different generic type

There is a BlockingCollection<T>.TakeFromAny method in .NET. It first tries a fast-acquire Take and then defaults to a 'slow' method waiting on underlying Handles. I would like to use this to listen to both upstream producers supplying "Messages" and downstream producers supplying "Results".

The following code is not type-valid and naturally fails to compile:

object anyValue;
var collection = new List<BlockingCollection<object>>();
// following fails: cannot convert
//    from 'System.Collections.Concurrent.BlockingCollection<Message>'
//    to 'System.Collections.Concurrent.BlockingCollection<object>'
collection.Add(new BlockingCollection<Message>());
// fails for same reason
collection.Add(new BlockingCollection<Result>());
BlockingCollection<object>.TakeFromAny(collection.ToArray(), out anyValue);

It would be possible to deal only with new BlockingCollection<object>() instances and cast on Take to avoid the compilation type error, although such rubs me wrong(er) - especially because the typing is lost through the method interfaces. Using a wrapping composition type would solve the latter; fsvo 'solve'.


Nothing below here is directly related to the question, although it provides context - for those who are interested. Higher level constructs (eg. Rx or TPL Dataflow) are not available to the code which is providing core infrastructure functionality.

Here is a a basic flow model. The producer, proxy, and workers run on separate threads (the workers could run on the same thread, depending on what the task scheduler does).

[producer]   message -->   [proxy]   message --> [worker 1]
             <-- results             <-- results
                                     message --> [worker N..]
                                     <-- results

The expectation is the proxy listens for Messages (incoming) and Results (going back out). The proxy does some work such as transformation and grouping and it uses the results as feedback.

Having the proxy as a separate thread isolates it from the initial production source which does all sorts of monkey business. The worker tasks are for parallelism, not asynchronicity, and threading (after the contention is reduced/eliminated though the grouping in the proxy) should allow good scaling.

The queues are established between the proxy and workers (instead of direct tasks with a single input/result) because, while a worker is executing, there may be additional incoming work messages that it can process before it ends. This is to ensure the worker can prolong/reuse the contexts it establishes over a stream of related work.

Upvotes: 3

Views: 1366

Answers (1)

svick
svick

Reputation: 244767

I think the best option here is to change the type of both blocking collections to BlockingCollection<object>, which you already mentioned, including its cons.

If you can't or don't want to do that, another solution would be to have a merged BlockingCollection<object> and a thread for each source collection that moves items from its collection to the merged one:

var producerCollection = new BlockingCollection<Message>();
var consumerCollection = new BlockingCollection<Results>();

var combinedCollection = new BlockingCollection<object>();

var producerCombiner = Task.Run(() =>
{
    foreach (var item in producerCollection.GetConsumingEnumerable())
    {
        combinedCollection.Add(item);
    }
});

var consumerCombiner = Task.Run(() =>
{
    foreach (var item in consumerCollection.GetConsumingEnumerable())
    {
        combinedCollection.Add(item);
    }
});

Task.WhenAll(producerCombiner, consumerCombiner)
    .ContinueWith(_ => combinedCollection.CompleteAdding());

foreach (var item in combinedCollection.GetConsumingEnumerable())
{
    // process item here
}

It's not very efficient, since it blocks two additional threads just to do this, but it's the best alternative I could think of without using reflection to get access to the handles used by TakeFromAny.

Upvotes: 2

Related Questions