Julian Gold
Julian Gold

Reputation: 1286

Synchronizing a collection of shared resources in C#

I have a static collection of shared resources (text processor classes) in my C# application which respond to keywords. Each processor is stateful, and so does not work in a multithreaded context. I am looking for a good way to organize these so that I can make thread-safe access to them.

For instance, I could embed a sync object in each processor

public abstract class TextProcessor
{
    protected internal readonly object sync = new object();

    public abstract void Run( string text );
}

public class DocumentProcessor
{
    private static Dictionary<string,TextProcessor> s_procMap = ...;

    public void Run( string [] words1, string[] words2 )
    {
        ThreadPool.QueueUserWorkItem( Process, words1 );
        ThreadPool.QueueUserWorkItem( Process, words2 );
    }

    private void Process( object arg )
    {
        foreach( var word in words )
        {
            var proc = s_processor[word];
            lock( proc.sync )
            {
                proc.Run( word );
            }
        }
    }

Assuming that I can't rewrite my processors to be stateless (which is an option but would be a last resort due to the volume of refactoring), is there a better way to express this?

Upvotes: 2

Views: 108

Answers (2)

poy
poy

Reputation: 10507

If you really don't want to change the processor... then I would make a wrapper.

public class ProcWrapper
{
  private TextProcessor _proc;
  private ActionBlock<string> _actBlock;

  public ProcWrapper(TextProcessor proc)
  {
    _proc = proc;

    _actBlock = new ActionBlock<string[]>(word=>
    {
      _proc.Run(word);
    });
  }

  public void AddWord(string words)
  {
    _actBlock.Post(word);
  }

  public void WaitForCompletion()
  {
    _actBlock.Completion.Wait();
  }
}

And use it similar to how you had it before:

Dictionary<string,ProcWrapper> s_procMap = ...;

void Run( string [] words )
{
  // NOTE: This assumes the same thread will access s_procMap.
  foreach(var word in words)
    s_procMap[word].AddWord(word);
}

I think this is close to what you're looking for.

Take a look at http://msdn.microsoft.com/en-us/library/hh228603.aspx for more info on DataFlow with TPL and http://msdn.microsoft.com/en-us/library/hh194684.aspx for info specific to ActionBlock<T>.

Upvotes: 1

usr
usr

Reputation: 171178

The problem with the current approach is that a single busy TextProcessor can halt processing of the remaining words. If you instead gave each TextProcessor a (thread-safe) queue of words then you could enqueue work without blocking and have the TextProcessor dequeue and process the words in order.

Look at ActionBlock of the TPL Dataflow library. It handles the queueing, starting of a processing Task and the eventual shutdown of that Task in case no more work is left.

Note, that when no work is queued, no thread is blocked. This is an important scalability property.

Upvotes: 2

Related Questions