AlexandruC
AlexandruC

Reputation: 3637

Program design issue , filesystemwatcher, multithreading c#

I am developing a c# program that uses FilesystemWatcher to monitor the PDF files that are added into a monitor_directory. Every time a file is added to a directory I add it to a BlockingQueue that is popped continuously in another thread infinite while loop, it waits there for file paths to be added, after that I go on with processing the files, the final part of the PDF file processing is moving it to the output directory.

The dispatching thread:

    private static void ThreadProc(object param)
    {

        FileMonitorManager _this = (FileMonitorManager)param;
        FileProcessingManager processingManager = new FileProcessingManager();
        processingManager.RegisterProcessor(new ExcelFileProcessor());
        processingManager.RegisterProcessor(new PdfFileProcessor());

        while (true)
        {
            try
            {
                var path = (string)_this.FileQueue.Dequeue();
                if (path == null)
                    break;
                bool b = processingManager.Process(path);
                if (!b)
                {
                    _this.FileQueue.Enqueue(path);
                    Console.WriteLine("\n\nError on file: " + path);
                }
                else
                    Console.WriteLine("\n\nSucces on file: " + path);

            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        }
    }

The Process function tests if the file exists, does some processing and moves the PDF file to the output directory.

I have faced two problems: 1. The On_Create event handler from the FileSystemWatcher is triggered TWICE so the BlockingQueue has the same entry twice, in this case in the processing routine I verify if the file hasn't been moved to the output directory ( because that is the final part of the processing consists in moving the file there), if so I carry on with the processing, if no I exit. 2. if for some reason I get an Error accessing the content of the file saying: the file is being used by another process I return FALSE from the Process function and add the file path again to the queue.

Now.. This works but it works kind of slow.. How can I do this multi-threaded by taking in consideration the 2 issues I've been facing.. Edit: What if I get the event, add it to the queue, it gets popped, the queue is empty and then I get the same event again, the queue is empty so it gets added, and basically I get the same event processed TWICE?

Upvotes: 1

Views: 1677

Answers (2)

Pragmateek
Pragmateek

Reputation: 13408

1) FileSystemWatcher notifies you twice because file is updated in two steps: first the data, then the metadata. So you could check that the latest write was not already taken into account using something like:

File.GetLastWriteTime(file);

Or you could check for duplicates.

2) You're not using multithreading: you process one file at a time, so you could spawn some threads to execute the Process method, e.g. use:

ThreadPool.QueueUserWorkItem

Upvotes: 1

Kevin
Kevin

Reputation: 4636

FileSystemWatcher is notoriously chatty.

I think this is what I would do...

  1. Check to see if the BlockingQueue already has an entry for the file in question before adding it a second time from the On_Create call.
  2. Do you expect to have a lot of null paths in your Queue? Hopefully the null check is just a precaution. But don't Enqueue null paths if you can help it.
  3. In your worker threads just Dequeue and process
  4. If your worker thread gets an error processing it you can Enqueue it again or you may want to set it aside as an exceptional case since if you get enough unprocessable files they can hog your queue and slow you down as well.

An easy way to do this multithreaded is to just start a new task each time you Dequeue a path...

    Task.Factory.StartNew(() =>
        {
            try
            {
                var path = (string) _this.FileQueue.Dequeue();
                if (path == null)
                    break;
                bool b = processingManager.Process(path);
                if (!b)
                {
                    _this.FileQueue.Enqueue(path);
                    Console.WriteLine("\n\nError on file: " + path);
                }
                else
                    Console.WriteLine("\n\nSucces on file: " + path);

            }
            catch (System.Exception e)
            {
                Console.WriteLine(e.Message);
            }
        });

For production code you'll also want to pass a cancellation token into the Task and have a mechanism for stopping the loop and Tasks.

Upvotes: 3

Related Questions