Sebastian
Sebastian

Reputation: 315

Awaitable adding to a BlockingCollection

I'm using a BlockingCollection to create a queue where multiple threads can add items to, which are processed by a separate thread. The added item also contains a callback function (delegate) which is called after the item is processed. To add the item I use the TryAdd method.

This is all working fine, but know I'm wondering if there is any way to await the processing after running the TryAdd. What I want is, that the adding Thread waits for the processing to be done and then continues.

I hope that I described my problem well, so that anyone can give me a hint.

My target framework is Mono/.Net4.5.

Upvotes: 1

Views: 775

Answers (1)

Luaan
Luaan

Reputation: 63732

The only solution is to coöperate - the processing side must signal you that the item is processed. Implementation of this is quite easy - one way would be this:

public struct SignalizableItem<T>
{
  private readonly T _value;
  private readonly TaskCompletionSource<object> _signaller;

  public SignalizableItem(T value, TaskCompletionSource<object> signaller)
  {
    _value = value;
    _signaller = signaller;
  }

  public void Process(Action<T> action)
  {
    try
    {
      action(_value);
      _signaller.SetResult(default(object));
    }
    catch (Exception ex)
    {
      _signaller.SetException(ex);
    }
  }
}

public static class BlockingCollectionExtensions
{
  public static Task QueueAndWaitAsync<T>
     (this BlockingCollection<SignalizableItem<T>> @this, T value)
  {
    var tcs = new TaskCompletionSource<object>();
    @this.Add(new SignalizableItem<T>(value, tcs));
    return tcs.Task;
  }
}

The usage is quite simple - on the producer side, you simply do

await collection.QueueAndWaitAsync(value);

On the consumer side, you'll unwrap the value and signal when ready:

var item = collection.Take();

item.Process
 (
   data =>
   {
     // Your processing
     ...
   }
 );

And of course, the collection will be BlockingCollection<SignalizableItem<YourType>> instead of BlockingCollection<YourType>.

You could further simplify the processing by adding another extension method:

public static void Process<T>
  (this BlockingCollection<SignalizableItem<T>> @this, Action<T> action)
{
  @this.Take().Process(action);
}

It might also be a good idea to implement cancellation (a simple CancellationToken should work fine) or another form of shutdown.

Something actually usable could end up with

public static void ProcessAll<T>
  (this BlockingCollection<SignalizableItem<T>> @this, Action<T> action, 
   CancellationToken cancellationToken)
{
  SignalizableItem<T> val;
  while (@this.TryTake(out val, -1, cancellationToken)) val.Process(action);
}

abstracting away the whole processing mechanism, and exposing just the simple action delegate.

Upvotes: 1

Related Questions