Reputation: 76
I would like a thread pool that will allow work to be partitioned or "bucketed" in such a way so that tasks will be distributed across threads but tasks for a given id will never be processed in parallel.
So for example if I have 5 unique ids and 5 threads when I schedule tasks I would expect the tasks for id "1" to always be assigned to thread 1, tasks for id "2" to always be assigned to thread 2, tasks for id "3" to always be assigned to thread 3, etc etc.
If there are more ids than threads then each thread can be assigned more than one id, for example if there are 10 ids and 5 threads then thread 1 can be assigned to id "1" and "5", thread 2 and be assigned to id "2" and "6" etc etc
Upvotes: 1
Views: 472
Reputation: 76
The cleanest way I have found to solve this problem is to use a ConcurrentExclusiveSchedulerPair, this is part of the TPL and basically works in a similar way to a reader/writer lock, it exposes an ExclusiveScheduler and a ConcurrentScheduler. The ConcurrentScheduler allows multiple threads to execute concurrently, the ExclusiveScheduler is restricted to only one thread at a time, threads are not allowed to execute on the ConcurrentScheduler while there are threads executing in the ExclusiveScheduler.
You can implement a partitioned scheduler by maintaining a pool of ExclusiveScheduler objects that are assigned to the unique ids using a sharding algorithm.
public class TaskSchedulerPool
{
private readonly List<Lazy<TaskScheduler>> _taskSchedulers;
public TaskSchedulerPool(int maxSize)
{
_taskSchedulers = Enumerable.Range(1, maxSize)
.Select(
_ => new Lazy<TaskScheduler>(() => new ConcurrentExclusiveSchedulerPair().ExclusiveScheduler))
.ToList();
}
public TaskScheduler GetTaskScheduler(object o)
{
var partition = Math.Abs(o.GetHashCode())%_taskSchedulers.Count;
return _taskSchedulers[partition].Value;
}
}
When you are creating tasks you can get a scheduler for a given id and use that to schedule the task.
Task.Factory.StartNew(() => Console.WriteLine("Doing work here"), cancel.Token, TaskCreationOptions.None, pool.GetTaskScheduler(key));
The executive scheduler will ensure that a task will never be processed concurrently for a given key.
Upvotes: 2