Chris L
Chris L

Reputation: 2292

How to process files in directory concurrently in .net

I'm having issues processing files in parallel within a directory. I've read several similar questions and examples but I can't seem to find why my code causes exception.

My directory gets populated by other processes and will contain thousands of files at any one time. Each file has to be parsed and validated which takes time filesystem/network io etc. I need this step to be done in parallel, the rest has to be done serially.

Here's my code:

public void run()
{
    XmlMessageFactory factory = new XmlMessageFactory();
    DirectoryInfo dir = new DirectoryInfo(m_sourceDir);
    Dictionary<string, int> retryList = new Dictionary<string, int>();
    ConcurrentQueue<Tuple<XmlMsg,FileInfo>> MsgQueue = new
                                      ConcurrentQueue<Tuple<XmlMsg,FileInfo>>();

    //start worker to handle messages
    System.Threading.ThreadPool.QueueUserWorkItem(o =>
        {
            XmlMsg msg;
            Tuple<XmlMsg, FileInfo> item;
            while (true)
            {
                if (!MsgQueue.TryDequeue(out item))
                {
                    System.Threading.Thread.Sleep(5000);
                    continue;
                }
                try
                {
                    msg = item.Item1;
                    /* processing on msg happens here */
                    handleMessageProcessed(item.Item2, ref retryList);
                }
                catch (Exception e)
                {
                    //if this method is called it gives the 
                    //exception below
                    handleMessageFailed(item.Item2, e.ToString()); 
                }
            }
        }
    );

    while (true)
    {
        try
        {
            FileInfo[] files = dir.GetFiles(m_fileTypes);
            Partitioner<FileInfo> partitioner = Partitioner.Create(files, true);
            Parallel.ForEach(partitioner, f => 
            {
                try
                {
                    XmlMsg msg = factory.getMessage(messageType);
                    try
                    {
                        msg.loadFile(f.FullName);
                        MsgQueue.Enqueue(new Tuple<XmlMsg, FileInfo>(msg, f));
                    }
                    catch (Exception e)
                    {
                        handleMessageFailed(f, e.ToString());
                    }
                }
            });
        }
    }
}

static void handleMessageFailed(FileInfo f, string message)
{
    //Erorr here: 
    f.MoveTo(m_failedDir + f.Name);
    //"The process cannot access the file because it is 
    //being used by another process."}  System.Exception {System.IO.IOException}
}

Using ConcurrentQueue how can it end up attempting to access a file twice at the same time?

I have a test setup currently with 5000 files and this will happen at least once per run and on a different file each time. When I inspect the directory, the source file causing exception will have already been processed and is in the "processed" directory.

Upvotes: 2

Views: 1542

Answers (2)

Chris L
Chris L

Reputation: 2292

After a fair bit of head scratching the problem turned out to be annoyingly simple! What was happening was the parallel processing of the files in the directory was completing before the serial activity on the file, so the loop was restarting and re-adding some of the files to the Queue that were already in there.

For completeness here's the modified section of code:

while (true)
    {
        try
        {
            FileInfo[] files = dir.GetFiles(m_fileTypes);
            Partitioner<FileInfo> partitioner = Partitioner.Create(files, true);
            Parallel.ForEach(partitioner, f => 
            {
                try
                {
                    XmlMsg msg = factory.getMessage(messageType);
                    try
                    {
                        msg.loadFile(f.FullName);
                        MsgQueue.Enqueue(new Tuple<XmlMsg, FileInfo>(msg, f));
                    }
                    catch (Exception e)
                    {
                        handleMessageFailed(f, e.ToString());
                    }
                }
            });
            //Added check to wait for queue to deplete before 
            //re-scanning the directory
            while (MsgQueue.Count > 0)
            {
                System.Threading.Thread.Sleep(5000);
            }
        }
    }

Upvotes: 1

Matthew Watson
Matthew Watson

Reputation: 109792

I suspect a problem in XmlMsg.loadFile()

I think that you may have code like this in it:

public void loadFile(string filename)
{
    FileStream file = File.OpenRead(filename);

    // Do something with file

    file.Close();
}

If an exception occurs in the "do something with file" part, the file won't be closed because file.Close() will never be executed. Then you'll get the "file in use" exception inside handleMessageFailed().

If so, the solution is to access the file in a using block as follows; then it will be closed even if an exception occurs:

public void loadFile(string filename)
{
    using (FileStream file = File.OpenRead(filename))
    {
        // Do something with file
    }
}

But assuming that this does turn out to be the problem, when you start using real files produced by external processes, you may have another issue if the external processes still have the files open when your worker threads try to process them.

Upvotes: 0

Related Questions