user1869878
user1869878

Reputation: 83

C# Lock Threading Issue

The task here is quite simple (or so I thought...). I want to fill a queue with methods to be executed (all of which will return an object result), and then I want to have some arbitrary number of threads pull from this queue, execute the methods, and add the results to some other collection (a dictionary in this case) which will be returned when all the work is complete. A main method will be called in the main thread which will start the processing and should block until all the threads are finished doing whatever they're doing and return the collection with the results. So I put together this class:

public class BackgroundWorkManager
{
    public delegate object ThreadTask();

    private Thread[] workers;
    private ManualResetEvent workerThreadMre;
    private ManualResetEvent mainThreadMre;
    private Queue<WorkItem> workQueue;
    private Dictionary<string, object> results;
    private object writeLock;
    private int activeTasks;

    private struct WorkItem
    {
        public string name;
        public ThreadTask task;

        public WorkItem(string name, ThreadTask task)
        {
            this.name = name;
            this.task = task;
        }
    }

    private void workMethod()
    {
        while (true)
        {
            workerThreadMre.WaitOne();

            WorkItem task;

            lock (workQueue)
            {
                if (workQueue.Count == 0)
                {
                    workerThreadMre.Reset();
                    continue;
                }

                task = workQueue.Dequeue();
            }

            object result = task.task();

            lock (writeLock)
            {
                results.Add(task.name, result);
                activeTasks--;

                if (activeTasks == 0)
                    mainThreadMre.Set();
            }
        }
    }

    public BackgroundWorkManager()
    {
        workers = new Thread[Environment.ProcessorCount];
        workerThreadMre = new ManualResetEvent(false);
        mainThreadMre = new ManualResetEvent(false);
        workQueue = new Queue<WorkItem>();
        writeLock = new object();
        activeTasks = 0;

        for (int i = 0; i < Environment.ProcessorCount; i++)
        {
            workers[i] = new Thread(workMethod);
            workers[i].Priority = ThreadPriority.Highest;
            workers[i].Start();
        }
    }

    public void addTask(string name, ThreadTask task)
    {
        workQueue.Enqueue(new WorkItem(name, task));
    }

    public Dictionary<string, object> process()
    {
        results = new Dictionary<string, object>();

        activeTasks = workQueue.Count;

        mainThreadMre.Reset();
        workerThreadMre.Set();
        mainThreadMre.WaitOne();
        workerThreadMre.Reset();

        return results;
    }
}

This works fine if I used the object once to process a queue of methods, but if I try something like this

BackgroundWorkManager manager = new BackgroundWorkManager();

for (int i = 0; i < 20; i++)
{
    manager.addTask("result1", (BackgroundWorkManager.ThreadTask)delegate
    {
        return (object)(1);
    });

    manager.process();
}

Things break. I either get deadlocking or I get an exception saying the dictionary that I'm writing the results to already contains the key (yet the Visual Studio Debugger says it's empty). Adding a 'Thread.Sleep(1)' to the work method seems to fix it, which is bizarre. This is my first time working with threads, so I'm not sure if I'm horribly abusing locks, or what. If anyone could provide some insight into what I'm doing wrong, it would be greatly appreciated.

Upvotes: 2

Views: 227

Answers (2)

Roman Bezrabotny
Roman Bezrabotny

Reputation: 186

version with Parallel class:

List<Func<object>> actions = new List<Func<object>>();

actions.Add(delegate { return (object)(1); });
actions.Add(delegate { return (object)(1); });
actions.Add(delegate { return (object)(1); });

Dictionary<string, object> results = new Dictionary<string,object>();

Parallel.ForEach(actions,(f)=> {
    lock (results)
    {
        results.Add(Guid.NewGuid().ToString(), f());
    }
});

Upvotes: 1

Yuval Itzchakov
Yuval Itzchakov

Reputation: 149628

There are plenty of options of how to work with the producer-consumer pattern. For example, you can simplify your code drastically by using an ActionBlock<T> (which is part of TPL Dataflow):

var concurrentDictionary = new ConcurrentDictionary<string, object>();

ActionBlock<Func<object>> actionBlock = new ActionBlock<Func<object>>((func) => 
{
    var obj = func();
    concurrentDictionary.AddOrUpdate("someKey", obj, (s,o) => o);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism =
                                       Environment.ProcessorCount });

And then simply post your delegates:

foreach (var task in tasks)
{
    actionBlock.Post(() => (object) 1);
}

Upvotes: 1

Related Questions