arynaq
arynaq

Reputation: 6870

Type erasure of generic types, better alternative to dynamic

I am trying to create a worker pool that can take as input any Func<T> or Action, return a Task and schedule the result of that task in some thread at some future time.

I am aware I can use ThreadPool or Task.Factory but I am doing this for the sake of learning.

Now my implementation below relies on me being able to queue TaskCompletionSource<T> by wrapping it around in dynamic inside TaskWrapper. I don't feel comfortable doing this (as I can imagine this has a non-negligible runtime cost) but I don't know of any alternatives.

public class WorkerHub
{
    private readonly ConcurrentQueue<TaskWrapper> _tasks;
    private readonly Timer _timer;

    public WorkerHub()
    {
        _timer = new Timer();
        _tasks = new ConcurrentQueue<TaskWrapper>();
    }

    public Task<TResult> Post<TResult>(Func<TResult> func)
    {
        var cts = new TaskCompletionSource<TResult>();
        var wrapper = new TaskWrapper {CompletionSource = cts, Function = func};
        _tasks.Enqueue(wrapper);
        return cts.Task;
    }

    public Task Post(Action action)
    {
        var cts = new TaskCompletionSource<bool>();
        var wrapper = new TaskWrapper {CompletionSource = cts, Function = action, isVoid = true};
        _tasks.Enqueue(wrapper);
        return cts.Task;
    }

    private TaskWrapper Pop()
    {
        _tasks.TryDequeue(out var wrapper);
        return wrapper;
    }


    public void Start()
    {
        _timer.Enabled = true;
        _timer.AutoReset = true;

        _timer.Interval = 2500;
        _timer.Elapsed += (sender, args) =>
        {
            var wrapper = Pop();
            if (wrapper != null) wrapper.CompletionSource.SetResult(wrapper.isVoid ? true : wrapper.Function());
        };
        _timer.Start();
    }

    public void Stop()
    {
    }

    private class TaskWrapper
    {
        public bool isVoid { get; set; }
        public dynamic Function { get; set; }
        public dynamic CompletionSource { get; set; }
    }

What is the "proper" way of being able to bind to the different types of completion sources and the different types of input functions in the same collection?

Upvotes: 1

Views: 389

Answers (2)

Damien_The_Unbeliever
Damien_The_Unbeliever

Reputation: 239714

I think moving some functionality into TaskWrapper and defining a generic implementation that derives from it makes a lot of sense:

private class TaskWrapper
{
    private readonly Action _function;
    private readeonly TaskCompletionSource<bool> _cts;
    public TaskWrapper (Action function, TaskCompletionSource<bool> cts) {
       _function = function;
       _cts = cts;
    }
    protected TaskWrapper () {
       _function = null;
       _cts = null;
    }
    public virtual void DoWork() {
       _function();
       cts.SetResult(true);
    }
}
private class TaskWrapper<T> : TaskWrapper {
    private readonly Func<T> _function;
    private readeonly TaskCompletionSource<T> _cts;
    public TaskWrapper (Func<T> function, TaskCompletionSource<T> cts) : base() {
       _function = function;
       _cts = cts;
    }
    public override void DoWork(){
       _cts.SetResult(_function());
    }
}

And now your dispatcher can just call DoWork without knowing whether the generic was the one used during construction.


I'd also note that your current implementation should make you feel more than uncomfortable. It fails to call Function for the Action items.

Upvotes: 2

Nkosi
Nkosi

Reputation: 247153

Another approach that does not involve holding on to the delegate or task completion source is via lambda expressions

public class WorkerHub {
    private readonly ConcurrentQueue<TaskWrapper> _tasks;
    private readonly Timer _timer;

    public WorkerHub() {
        _timer = new Timer();
        _tasks = new ConcurrentQueue<TaskWrapper>();
    }

    public Task<TResult> Post<TResult>(Func<TResult> func) {
        var cts = new TaskCompletionSource<TResult>();

        Action handler = () => {
            cts.SetResult(func());
        };

        var wrapper = new TaskWrapper { Invoke = handler };
        _tasks.Enqueue(wrapper);
        return cts.Task;
    }

    public Task Post(Action action) {
        var cts = new TaskCompletionSource<bool>();
        Action handler = () => {
            action();
            cts.SetResult(true);
        };
        var wrapper = new TaskWrapper { Invoke = handler };
        _tasks.Enqueue(wrapper);
        return cts.Task;
    }

    private TaskWrapper Pop()
    {
        _tasks.TryDequeue(out var wrapper);
        return wrapper;
    }


    public void Start() {
        _timer.Enabled = true;
        _timer.AutoReset = true;

        _timer.Interval = 2500;
        _timer.Elapsed += (sender, args) => {
            var wrapper = Pop();
            if (wrapper != null)
                wrapper.Invoke();
        };
        _timer.Start();
    }

    public void Stop() {
    }

    private class TaskWrapper {
        public Action Invoke { get; set; }
    }
}

an Action delegate is created to handle the desired behavior and that is given to the wrapper to be invoked when needed.

The wrapper now becomes redundant and can be removed altogether

public class WorkerHub {
    private readonly ConcurrentQueue<Action> _tasks;
    private readonly Timer _timer;

    public WorkerHub() {
        _timer = new Timer();
        _tasks = new ConcurrentQueue<Action>();
    }

    public Task<TResult> Post<TResult>(Func<TResult> func) {
        var cts = new TaskCompletionSource<TResult>();
        Action handler = () => {
            cts.SetResult(func());
        };
        _tasks.Enqueue(handler);
        return cts.Task;
    }

    public Task Post(Action action) {
        var cts = new TaskCompletionSource<bool>();
        Action handler = () => {
            action();
            cts.SetResult(true);
        };
        _tasks.Enqueue(handler);
        return cts.Task;
    }

    public void Start() {
        _timer.Enabled = true;
        _timer.AutoReset = true;

        _timer.Interval = 2500;
        _timer.Elapsed += (sender, args) => {
            Action handler = null;
            if (_tasks.TryDequeue(out  handler) && handler != null)
                handler.Invoke();
        };
        _timer.Start();
    }

    public void Stop() {
    }
}

Yes, there are more refactors that can be done to improve this design, but this should be enough to get the general idea across

Upvotes: 4

Related Questions