Martin Ongtangco
Martin Ongtangco

Reputation: 23535

Proper Queue threading technique in c#?

I wanted to implement a windows service that captures dropped flat delimited files to a folder for import to the database. What I originally envision is to have a FileSystemWatcher looking over new files imported and creating a new thread for importing.

I wanted to know how I should properly implement an algorithm for this and what technique should I use? Am I going to the right direction?

Upvotes: 1

Views: 721

Answers (4)

Brian Gideon
Brian Gideon

Reputation: 48979

The traditional approach is to create a finite set of threads (could be as few as 1) and have them watch a blocking queue. The code in the FileSystemWatcher1 event handlers will enqueue work items while the worker thread(s) dequeue and process them. It might look like the following which uses the BlockingCollection class which is available in .NET 4.0 or as part of the Reactive Extensions download.

Note: The code is left short and concise for brevity. You will have to expand and harden it yourself.

public class Example
{
  private BlockingCollection<string> m_Queue = new BlockingCollection<string>();

  public Example()
  {
    var thread = new Thread(Process);
    thread.IsBackground = true;
    thread.Start();
  }

  private void FileSystemWatcher_Event(object sender, EventArgs args)
  {
    string file = GetFilePathFromEventArgs(args);
    m_Queue.Add(file);
  }

  private void Process()
  {
    while (true)
    {
      string file = m_Queue.Take();
      // Process the file here.
    }
  }
}

You could take advantage of the Task class in the TPL for a more modern and ThreadPool-like approach. You would start a new task for each file (or perhaps batch them) that needs to be processed. The only gotcha I see with this approach is that it would be harder to control the number of database connections being opened simultaneously. Its definitely not a showstopper and it might be of no concern.

1The FileSystemWatcher has been known to be a little flaky so it is often advised to use a secondary method of discovering file changes in case they get missed by the FileSystemWatcher. Your mileage may vary on this issue.

Upvotes: 1

Ed Power
Ed Power

Reputation: 8531

I've fielded a service that does this as well. I poll via a timer whose elapsed event handler acts as a supervisor, adding new files to a queue and launching a configurable number of threads that consume the queue. Once the files are processed, it restarts the timer.

Each thread including the event handler traps and reports all exceptions. The service is always running, and I use a separate UI app to tell the service to start and stop the timer. This approach has been rock solid and the service has never crashed in several years of processing.

Upvotes: 1

Brian Rasmussen
Brian Rasmussen

Reputation: 116501

Creating a thread per message will most likely be too expensive. If you can use .NET 4, you could start a Task for each message. That would run the code on a thread pool thread and thus reduce the overhead of creating threads.

You could also do something similar with asynchronous delegates if .NET 4 is not an option. However, the code gets a bit more complicated in that case. That would utilize the thread pool as well and save you the overhead of creating a new thread for each message.

Upvotes: 0

Andreas Paulsson
Andreas Paulsson

Reputation: 7813

I developed an product like this for a customer. The service were monitoring a number of folders for new files and when the files were discovered, the files were read, processed (printed on barcode printers), archived and deleted.

We used a "discoverer" layer that discovered files using FileSystemWatcher or polling depending on environment (since FileSystemWatcher is not reliable when monitoring e.g. samba shares), a "file reader" layer and a "processor" layer.

The "discoverer" layer discovered files and put the filenames in a list that the "file reader" layer processed. The "discoverer" layer signaled that there were new files to process by settings an event that the "file reader" layer were waiting on.

The "file reader" layer then read the files (using retry functionality since you may get notifications for new files before the files has been completely written by the process that create the file).

After the "file reader" layer has read the file, a new "processor" thread were created using the ThreadPool.QueueWorkItem to process the file contents.

When the file has been processed, the original file were copied to an archive and deleted from the original location. The archive were also cleaned up regularly to keep from flooding the server. The archive were great for troubleshooting.

This has now been used in production in a number of different environments in over two years now and has proved to be very reliable.

Upvotes: 1

Related Questions