Reputation: 315
I'm using a BlockingCollection
to create a queue where multiple threads can add items to, which are processed by a separate thread. The added item also contains a callback function (delegate) which is called after the item is processed. To add the item I use the TryAdd
method.
This is all working fine, but know I'm wondering if there is any way to await
the processing after running the TryAdd
.
What I want is, that the adding Thread waits for the processing to be done and then continues.
I hope that I described my problem well, so that anyone can give me a hint.
My target framework is Mono/.Net4.5.
Upvotes: 1
Views: 775
Reputation: 63732
The only solution is to coöperate - the processing side must signal you that the item is processed. Implementation of this is quite easy - one way would be this:
public struct SignalizableItem<T>
{
private readonly T _value;
private readonly TaskCompletionSource<object> _signaller;
public SignalizableItem(T value, TaskCompletionSource<object> signaller)
{
_value = value;
_signaller = signaller;
}
public void Process(Action<T> action)
{
try
{
action(_value);
_signaller.SetResult(default(object));
}
catch (Exception ex)
{
_signaller.SetException(ex);
}
}
}
public static class BlockingCollectionExtensions
{
public static Task QueueAndWaitAsync<T>
(this BlockingCollection<SignalizableItem<T>> @this, T value)
{
var tcs = new TaskCompletionSource<object>();
@this.Add(new SignalizableItem<T>(value, tcs));
return tcs.Task;
}
}
The usage is quite simple - on the producer side, you simply do
await collection.QueueAndWaitAsync(value);
On the consumer side, you'll unwrap the value and signal when ready:
var item = collection.Take();
item.Process
(
data =>
{
// Your processing
...
}
);
And of course, the collection will be BlockingCollection<SignalizableItem<YourType>>
instead of BlockingCollection<YourType>
.
You could further simplify the processing by adding another extension method:
public static void Process<T>
(this BlockingCollection<SignalizableItem<T>> @this, Action<T> action)
{
@this.Take().Process(action);
}
It might also be a good idea to implement cancellation (a simple CancellationToken
should work fine) or another form of shutdown.
Something actually usable could end up with
public static void ProcessAll<T>
(this BlockingCollection<SignalizableItem<T>> @this, Action<T> action,
CancellationToken cancellationToken)
{
SignalizableItem<T> val;
while (@this.TryTake(out val, -1, cancellationToken)) val.Process(action);
}
abstracting away the whole processing mechanism, and exposing just the simple action delegate.
Upvotes: 1