Reputation: 6870
I am trying to create a worker pool that can take as input any Func<T>
or Action
, return a Task and schedule the result of that task in some thread at some future time.
I am aware I can use ThreadPool
or Task.Factory
but I am doing this for the sake of learning.
Now my implementation below relies on me being able to queue TaskCompletionSource<T>
by wrapping it around in dynamic
inside TaskWrapper
. I don't feel comfortable doing this (as I can imagine this has a non-negligible runtime cost) but I don't know of any alternatives.
public class WorkerHub
{
private readonly ConcurrentQueue<TaskWrapper> _tasks;
private readonly Timer _timer;
public WorkerHub()
{
_timer = new Timer();
_tasks = new ConcurrentQueue<TaskWrapper>();
}
public Task<TResult> Post<TResult>(Func<TResult> func)
{
var cts = new TaskCompletionSource<TResult>();
var wrapper = new TaskWrapper {CompletionSource = cts, Function = func};
_tasks.Enqueue(wrapper);
return cts.Task;
}
public Task Post(Action action)
{
var cts = new TaskCompletionSource<bool>();
var wrapper = new TaskWrapper {CompletionSource = cts, Function = action, isVoid = true};
_tasks.Enqueue(wrapper);
return cts.Task;
}
private TaskWrapper Pop()
{
_tasks.TryDequeue(out var wrapper);
return wrapper;
}
public void Start()
{
_timer.Enabled = true;
_timer.AutoReset = true;
_timer.Interval = 2500;
_timer.Elapsed += (sender, args) =>
{
var wrapper = Pop();
if (wrapper != null) wrapper.CompletionSource.SetResult(wrapper.isVoid ? true : wrapper.Function());
};
_timer.Start();
}
public void Stop()
{
}
private class TaskWrapper
{
public bool isVoid { get; set; }
public dynamic Function { get; set; }
public dynamic CompletionSource { get; set; }
}
What is the "proper" way of being able to bind to the different types of completion sources and the different types of input functions in the same collection?
Upvotes: 1
Views: 389
Reputation: 239714
I think moving some functionality into TaskWrapper
and defining a generic implementation that derives from it makes a lot of sense:
private class TaskWrapper
{
private readonly Action _function;
private readeonly TaskCompletionSource<bool> _cts;
public TaskWrapper (Action function, TaskCompletionSource<bool> cts) {
_function = function;
_cts = cts;
}
protected TaskWrapper () {
_function = null;
_cts = null;
}
public virtual void DoWork() {
_function();
cts.SetResult(true);
}
}
private class TaskWrapper<T> : TaskWrapper {
private readonly Func<T> _function;
private readeonly TaskCompletionSource<T> _cts;
public TaskWrapper (Func<T> function, TaskCompletionSource<T> cts) : base() {
_function = function;
_cts = cts;
}
public override void DoWork(){
_cts.SetResult(_function());
}
}
And now your dispatcher can just call DoWork
without knowing whether the generic was the one used during construction.
I'd also note that your current implementation should make you feel more than uncomfortable. It fails to call Function
for the Action
items.
Upvotes: 2
Reputation: 247153
Another approach that does not involve holding on to the delegate or task completion source is via lambda expressions
public class WorkerHub {
private readonly ConcurrentQueue<TaskWrapper> _tasks;
private readonly Timer _timer;
public WorkerHub() {
_timer = new Timer();
_tasks = new ConcurrentQueue<TaskWrapper>();
}
public Task<TResult> Post<TResult>(Func<TResult> func) {
var cts = new TaskCompletionSource<TResult>();
Action handler = () => {
cts.SetResult(func());
};
var wrapper = new TaskWrapper { Invoke = handler };
_tasks.Enqueue(wrapper);
return cts.Task;
}
public Task Post(Action action) {
var cts = new TaskCompletionSource<bool>();
Action handler = () => {
action();
cts.SetResult(true);
};
var wrapper = new TaskWrapper { Invoke = handler };
_tasks.Enqueue(wrapper);
return cts.Task;
}
private TaskWrapper Pop()
{
_tasks.TryDequeue(out var wrapper);
return wrapper;
}
public void Start() {
_timer.Enabled = true;
_timer.AutoReset = true;
_timer.Interval = 2500;
_timer.Elapsed += (sender, args) => {
var wrapper = Pop();
if (wrapper != null)
wrapper.Invoke();
};
_timer.Start();
}
public void Stop() {
}
private class TaskWrapper {
public Action Invoke { get; set; }
}
}
an Action
delegate is created to handle the desired behavior and that is given to the wrapper to be invoked when needed.
The wrapper now becomes redundant and can be removed altogether
public class WorkerHub {
private readonly ConcurrentQueue<Action> _tasks;
private readonly Timer _timer;
public WorkerHub() {
_timer = new Timer();
_tasks = new ConcurrentQueue<Action>();
}
public Task<TResult> Post<TResult>(Func<TResult> func) {
var cts = new TaskCompletionSource<TResult>();
Action handler = () => {
cts.SetResult(func());
};
_tasks.Enqueue(handler);
return cts.Task;
}
public Task Post(Action action) {
var cts = new TaskCompletionSource<bool>();
Action handler = () => {
action();
cts.SetResult(true);
};
_tasks.Enqueue(handler);
return cts.Task;
}
public void Start() {
_timer.Enabled = true;
_timer.AutoReset = true;
_timer.Interval = 2500;
_timer.Elapsed += (sender, args) => {
Action handler = null;
if (_tasks.TryDequeue(out handler) && handler != null)
handler.Invoke();
};
_timer.Start();
}
public void Stop() {
}
}
Yes, there are more refactors that can be done to improve this design, but this should be enough to get the general idea across
Upvotes: 4