Reputation: 107
I have a list of commands that is processed concurrently on multiple threads. The list is static, each thread generates its output and does not interfere with other threads, so everything works ok so far.
Some of the commands require complex calculations on a dataset that accompanies the command. The result of the calculations is the same for all threads. At this moment each thread performs the calculations when it reaches the command but this is a waste of time and resources.
What I would like to do is to perform the calculation only once and share the results between threads. The thread that reaches first the command starts the calculation, the other threads that reach the command wait till the calculation is complete and then use the result.
I have little experience to thread synchronization and I do not know what synchronization primitives I should use for this scenario and where I should put the lock on the calculation. Can you tell me what classes (for synchronization) should I use in this scenario and where the thread should wait and on what object?
My code looks like this:
private void ThreadFunc(object state)
{
Context ctx = (Context)state;
Command cmd = ctx.CommandList;
Processor proc = ctx.Processor;
while (cmd != null)
{
switch(cmd.Type)
{
case CommandType.Simple:
proc.ExecuteSimpleCommand(cmd);
break;
case CommandType.Complex:
cmd.Data = ComputeData(cmd.Dataset);
proc.ExecuteComplexCommand(cmd);
break;
}
cmd = cmd.Next;
}
}
The ComputeData method performs the complex calculation and the result is stored on the command.
The code has a problem at this moment because each thread sets the Data property on the same Command object but since the calculation result is the same for all threads the code works.
I was thinking of something like this but I'm not sure if it correct:
case CommandType.Complex:
lock (cmd)
{
if (cmd.Data == null)
{
cmd.Data = ComputeData(cmd.Dataset);
}
}
proc.ExecuteComplexCommand(cmd);
break;
EDIT: I'm limited on .NET 2.0 at this moment.
EDIT 2: The list is fixed, its elements do not change. The threads only read the list, they do not modify the list.
I'll try an example: the list contains the elements A, B and C and it is processed by threads T1 and T2. When T1 reaches B, it calls ComputeData method and stores the result in a property of B. When T2 reaches B it waits till the calculation is over (assuming T1 reached B before and it already called ComputeData) and it uses the result. This is what I want to achieve.
Upvotes: 1
Views: 134
Reputation: 31156
Basically you're attempting to grab the head of the linked list and then move to the next location. Obviously the 'grab' is the issue here; it's easy to put a loop around it.
With locks
The easy solution is to use a lock:
private static object lockObject = new object();
// ...
Command current;
lock (lockObject)
{
current = CommandList;
CommandList = CommandList.Next;
}
// use current.
Alternatively, you can use a spinlock.
Without locks
While it's always tricky to make thread-safe code without locks, here's my attempt (WARNING: there might be bugs in here; I haven't thoroughly checked my code!):
Thread.MemoryBarrier(); // read barrier
var list = CommandList;
if (list != null)
{
var next = list.Next;
if (Interlocked.CompareExchange(ref CommandList, next, list) == list)
{
// execute code on 'list'.
}
else
{
// something changed. Try again.
}
}
Let's change the question...
Sometimes the dialogue makes things more confusing...
The thread that reaches first the command starts the calculation, the other threads that reach the command wait till the calculation is complete and then use the result.
and
Simple commands can run in parallel without problems. complex commands can also run in parallel (this is what I'm doing now) but the same calculation is performed on each thread and this is a waste of time.
So let's get this straight: Let's assume we have chain A->B->C with A and C simple commands and B a complex command. We want A,B to run in parallel and want to run C after B has finished. While executing B we should all wait until its done.
One simple solution comes to mind: let's assume that simple commands have no data and complex commands do have data. You've also said that the list is created before invoking it all. This means we don't have to do a lot of synchronization.
Basically you can do this as follows:
var current = this.CommandList;
while (current != null)
{
if (current.Data != null) // is it a complex command? (B)
{
lock (current.Data) // all threads wait here except one
{
if (current.Executed) // execute it once.
{
// Go ahead and execute it, single threaded
// [code]
current.Executed = true;
}
}
}
else
{
bool executeHere = false;
// simple command.
lock (lockObject) // shared lock object
{
executeHere = !command.Executed; // execute it in this thread?
command.Executed = true;
}
// will be true in 1 thread only, but multiple A's/C's can be executed in parallel.
if (executeHere)
{
// execute simple command
// [code]
}
}
current = current.Next;
}
Upvotes: 2
Reputation: 4410
Try this:
namespace ConsoleApplication6 {
public class Context
{
public Command CommandList { get; set; }
public Processor Processor { get; set; }
}
public class Processor
{
public void ExecuteSimpleCommand(Command command)
{
}
public void ExecuteComplexCommand(Command command) {
}
}
public enum CommandType
{
Simple,
Complex
}
public class Command {
public CommandType Type { get; set; }
public Command Next { get; set; }
public object Data { get; set; }
}
class Program
{
private readonly object signalObject = new object();
private object computationResult;
private int sharedPriority = -1;
static void Main(string[] args) {
}
private void ThreadFunc(object state) {
Context ctx = (Context)state;
Command cmd = ctx.CommandList;
Processor proc = ctx.Processor;
while (cmd != null) {
switch (cmd.Type) {
case CommandType.Simple:
proc.ExecuteSimpleCommand(cmd);
break;
case CommandType.Complex:
//each thread will atomically increment the sharedPriority; only the thread with priority=0 (we start from -1) will do the heavy computation
cmd.Data = ComputeData(null, Interlocked.Increment(ref sharedPriority)); //pass around your input
proc.ExecuteComplexCommand(cmd);
break;
}
cmd = cmd.Next;
}
}
private object ComputeData(object input, int priority)
{
// we allow only the thread with the priority 0 to perform the heavy computation
if (priority != 0)
{
// for all other threads, it will wait for the thread that got the chance to do the heavy computation to signal
Monitor.Wait(signalObject);
return computationResult;
}
//heavy computation here
computationResult = new object(); // .. implement your heavy logic here
Monitor.PulseAll(signalObject); // we've computed the result, let all other threads to do use the heavy computation result
return computationResult;
}
}
}
You gotta make sure that the computationResult
is not mutated (only read) by your threads, otherwise you will get some nasty race conditions.
Upvotes: 0
Reputation: 44680
I don't quite understand the question. But the common situation is when you have a list of work items and you want to process them in multiple threads. In C# there are a few thread-safe collection that you could use. Such as ConcurrentDictionary
, ConcurrentBag
, ConcurrentQueue
, ConcurrentStack
etc. Please see MSDN for more details.
All these classes implement locking inside. You just need to choose the one that is more suitable for your situation. Here is a ConcurrentQueue
example:
class Program
{
static long _total;
static ConcurrentQueue<int> _queued;
static void Main(string[] args)
{
IEnumerable<int> numbers = Enumerable.Range(1, 1000000);
_queued = new ConcurrentQueue<int>(numbers);
_total = 0;
Task task1 = Task.Run(() => ProcessQueue());
Task task2 = Task.Run(() => ProcessQueue());
Task.WaitAll(
Task.Run(() => ProcessQueue()),
Task.Run(() => ProcessQueue()));
Console.WriteLine("Total: {0}", _total);
}
static void ProcessQueue()
{
int value;
while (_queued.TryDequeue(out value))
{
Interlocked.Add(ref _total, value);
}
}
}
Upvotes: -1