Peter Morris
Peter Morris

Reputation: 23224

Feedback wanted on threading code

Here's some code that someone wants to use in a production app (not me, honest) - I'd like some independent feedback on it please.

 public class JobProcessor<TJob> : IJobProcessor<TJob> where TJob : class
 {
  private readonly IJobQueue<TJob> theJobQueue =
   new NullQueue<TJob>();

  private Thread processorThread;

  private bool shutdownRequested;

  private readonly IJobObserver<TJob> theObserver = new NullObserver<TJob>();

  public AutoResetEvent threadStartedEvent = new AutoResetEvent(false);

  private int processorThreadId = 0;

  private volatile TJob currentJob = null;

  public JobProcessor(IJobQueue<TJob> jobQueue, IJobObserver<TJob> observer)
  {
   if (observer != null)
   {
    theObserver = observer;
   }
   shutdownRequested = false;
   theJobQueue = jobQueue;
   CreateAndRunThread();
  }

  private void CreateAndRunThread()
  {
   processorThread = new Thread(Run)
                      {
                       Name = "Tpk Processor Thread", IsBackground = true
                      };
   processorThread.SetApartmentState(ApartmentState.STA);
   processorThread.Priority = ThreadPriority.BelowNormal;
   processorThread.Start();
  }

  public void Shutdown()
  {
   threadStartedEvent.WaitOne();

   shutdownRequested = true;

   theJobQueue.Interrupt(processorThreadId);
  }

  public TJob CurrentJob()
  {
   return currentJob;
  }

  private void Run()
  {
   processorThreadId = Thread.CurrentThread.ManagedThreadId;

   threadStartedEvent.Set();

   while (!BufferClearedAndShutDown())
   {
    try
    {
     ProcessNextMessage();
    }
    catch (ThreadAbortException)
    {
     CreateAndRunThread();
     break;
    }
    catch (Exception e)
    {  }
   }
  }

  private void ProcessNextMessage()
  {
   currentJob = theJobQueue.RetrieveJob();
   if (currentJob != null)
   {
    theObserver.ProcessMessage(this, currentJob);
   }
   currentJob = null;
  }

  private bool BufferClearedAndShutDown()
  {
   return theJobQueue.IsEmpty && shutdownRequested;
  }
 }
}

Upvotes: 0

Views: 454

Answers (5)

Peter Morris
Peter Morris

Reputation: 23224

Now that some people have already answered I feel it is safe to add my own comments without biasing anyone :-)

  1. Why would you want to wait for the thread to start before shutting it down?
  2. Recording the thread ID and using it as a way to identify which thread can shut it down?
  3. currentJob is marked volatile but not shutdownRequested (and it's not set within a lock)
  4. Why mix a queue + a worker in one? Why not have a queue separate, and then one or more threads working with it.
  5. I can't think why he'd be expecting a ThreadAbortException so as to ditch the current worker thread and create a new one.
  6. Despite the comment at the top of the code about how the class could simply be extended to utilise multiple worker threads I don't think that this design makes moving to multiple worker threads very easy at all (unlike an approach where the queue / workers are separate).

I'm no multi-thread guru or anything so I wanted to post it just to make sure it wasn't just me that thought this code looked a bit dodgy.

Upvotes: 0

Matt
Matt

Reputation: 4301

theres a lot of context missing from the code, but id just like to add my 2 pennies to the above, which i agree with.

in Randy's item 2, i would just use a lock statement as opposed to memory barrier, this is MSDN recommended approach unless you are using multi Itanium CPU's on your boxes (which have weak memory ordering)

http://msdn.microsoft.com/en-us/library/system.threading.thread.memorybarrier.aspx

i wholly agree with item 6. a public method exposing a ref to an item in the jobqueue? that cant be good can it?, especially when you are discussing thread safety, if you need some info from this class exposing to the outside world, make it immutable and expose that instead. Much better encapsulation.

Upvotes: 0

Marc Gravell
Marc Gravell

Reputation: 1062510

Is this just a producer/consumer queue? There are lots of pre-rolled examples - I posted one here myself a few days ago (but I can't find it at the moment)... put it this way - the version I posted was much simpler - i.e. "obviously no bugs" rather than "no obvious bugs". I'll see if I can find it...

(edit: found it)

The point is; with that version, the worker thread just does:

T item;
while(queue.TryDequeue(out item)) {
    // process item
}
// queue has been closed and drained

Upvotes: 2

Randy Kern
Randy Kern

Reputation: 11

Very hard to give you useful feedback without knowing the semantics of IJobQueue, IJobObserver, or IJobProcessor, but here are a few details that stand out:

  1. processorThread doesn't seem like it is really needed; can/should just be a local in CreateAndRunThread
  2. shutdownRequested should be marked volatile, call Thread.MemoryBarrier() after setting shutdownReqeusted to true, or use Thread.VolatileWrite to set the shutdownRequested field
  3. why wait for the thread to start before asking it to shutdown?
  4. don't know why you need a threadStartedEvent, what is it used for? espesially making it public is a bit scary
  5. without knowing how IJobQueue.Interrupt is implemented, hard to say if there are issues there or not
  6. Who uses CurrentJob, and why? feels risky
  7. While catching your own ThreadAbortException will catch most cases, it won't catch everything. You could use a separate monitor thread, which calls Thread.Join, and after double checking BufferClearedAndShutdown() invokes CreateAndRunThread()

Upvotes: 1

Alex Reitbort
Alex Reitbort

Reputation: 13696

Is you thread trying to catch ThreadAbortException and then recreate itself? I am not sure it is possible, but anyway it is not a nice way to play with the OS.

And you will lose jobs if exception happens after currentJob = theJobQueue.RetrieveJob(); but before theObserver.ProcessMessage(this, currentJob);

And unless your jobQueue is thread safe you should add locking around accessing it.

Upvotes: 2

Related Questions