Reputation: 1286
I have a static collection of shared resources (text processor classes) in my C# application which respond to keywords. Each processor is stateful, and so does not work in a multithreaded context. I am looking for a good way to organize these so that I can make thread-safe access to them.
For instance, I could embed a sync object in each processor
public abstract class TextProcessor
{
protected internal readonly object sync = new object();
public abstract void Run( string text );
}
public class DocumentProcessor
{
private static Dictionary<string,TextProcessor> s_procMap = ...;
public void Run( string [] words1, string[] words2 )
{
ThreadPool.QueueUserWorkItem( Process, words1 );
ThreadPool.QueueUserWorkItem( Process, words2 );
}
private void Process( object arg )
{
foreach( var word in words )
{
var proc = s_processor[word];
lock( proc.sync )
{
proc.Run( word );
}
}
}
Assuming that I can't rewrite my processors to be stateless (which is an option but would be a last resort due to the volume of refactoring), is there a better way to express this?
Upvotes: 2
Views: 108
Reputation: 10507
If you really don't want to change the processor... then I would make a wrapper.
public class ProcWrapper
{
private TextProcessor _proc;
private ActionBlock<string> _actBlock;
public ProcWrapper(TextProcessor proc)
{
_proc = proc;
_actBlock = new ActionBlock<string[]>(word=>
{
_proc.Run(word);
});
}
public void AddWord(string words)
{
_actBlock.Post(word);
}
public void WaitForCompletion()
{
_actBlock.Completion.Wait();
}
}
And use it similar to how you had it before:
Dictionary<string,ProcWrapper> s_procMap = ...;
void Run( string [] words )
{
// NOTE: This assumes the same thread will access s_procMap.
foreach(var word in words)
s_procMap[word].AddWord(word);
}
I think this is close to what you're looking for.
Take a look at http://msdn.microsoft.com/en-us/library/hh228603.aspx for more info on DataFlow
with TPL
and http://msdn.microsoft.com/en-us/library/hh194684.aspx for info specific to ActionBlock<T>
.
Upvotes: 1
Reputation: 171178
The problem with the current approach is that a single busy TextProcessor
can halt processing of the remaining words. If you instead gave each TextProcessor
a (thread-safe) queue of words then you could enqueue work without blocking and have the TextProcessor
dequeue and process the words in order.
Look at ActionBlock
of the TPL Dataflow library. It handles the queueing, starting of a processing Task
and the eventual shutdown of that Task
in case no more work is left.
Note, that when no work is queued, no thread is blocked. This is an important scalability property.
Upvotes: 2