Reputation: 13
I need to process incoming xml files (they will be created by other application directly in specific folder) and I need to do it fast.
There can be up to 200 000 files per day and my current assumption is to use .NET 4
and tpl
.
My current service concept is:
In a loop I want to check folder for new files, if I find any of them, I will put them to queue, which will be processed by another loop which will take files from queue and create for each of them new task(thread). Number of simultaneous tasks should be configurable. First part is easy but creating two main loops with queue between them is something new for me.
And the question: How to create two loops(one for checking folder and adding files and second for taking files from queue and process them parallel) and add queue to communicate between them.
For first part (folder checking) suggested solution is to use FileSystemWatcher. Now second part needs to be discussed (maybe some Task Scheduler).
Upvotes: 1
Views: 1696
Reputation: 85
I am quite surprised that no one has yet asked, but considering what you're trying to achieve is some kind of messaging between two applications, have you considered using WCF?
Upvotes: 2
Reputation: 38112
Sounds like the missing piece in your puzzle is a BlockingCollection
:
FileSystemWatcher watcher;
BlockingCollection<string> bc;
private readonly object _lock = new object();
Task[] tasks;
void PrepareWatcher()
{
watcher = new FileSystemWatcher(@"c:");
watcher.Created += (s,e) =>
{
lock(_lock) //Prevents race condition when stopping
{
if (!bc.IsAddingCompleted)
bc.Add(e.FullPath);
}
};
}
void StartProcessing(int taskCount)
{
tasks = new Task[taskCount];
bc = new BlockingCollection<string>();
for (int i = 0; i< taskCount; i++)
tasks[i] = (Task.Factory.StartNew(() =>
{
foreach (var x in bc.GetConsumingEnumerable())
ProcessXml(x);
}, TaskCreationOptions.LongRunning));
watcher.EnableRaisingEvents = true;
}
void ProcessXml(string path)
{
//Do your processing here...
//Note many events will be called multiple times, see:
//http://weblogs.asp.net/ashben/archive/2003/10/14/31773.aspx
}
void StopProcessing()
{
watcher.EnableRaisingEvents = false;
lock (_lock) //The above line doesn't guarantee no more events will be called,
//And Add() and CompleteAdding() can't be called concurrently
bc.CompleteAdding();
Task.WaitAll(tasks);
foreach (var task in tasks)
task.Dispose();
bc.Dispose();
tasks = null;
}
Upvotes: 3
Reputation: 20320
May not need loops, not sure parallel is necessary either. That would be useful if you want to process a batch of new files. FileSystemWatcher on the folder where new files will appear, will give you an event to add a file to the queue.
Add an event for item added to queue, to trigger a thread to process an individual file.
If you knock up a simple class, File, state, detected time etc.
You'd have a detection thread adding to the queue, a threadpool to process them and on success remove them from the queue.
You might find this previous question useful threasafe "lists" in .net 4
Particularly if you want to process all new files since X.
Note if you aren't going to use FileSystem watcher and just get files from the folder, a Processed folder to move them to and maybe a Failed Folder as well, would be a good idea. Reading 200,00 filenames in to check to see if you've processed them would sort of remove any benefit from parallel processing them.
Even if you do, I'd recomend it. Just moving it back in to To Process (or after an edit in case of failures) will trigger it to be reprocessed. Another advantage is say if you are processing into a database and it all goes nipples up and your last back up was at X. You restore and then simply move all the files you did process back into the "toprocess" folder.
You can also do test runs with known input and check the db's state before and after.
Further to comment.
ThreadPool which is used by Task has a ThreadPool limit put that's for all for or background tasks in yor app.
After comment.
If you want to limit the number of concurrent tasks...
Starter for ten you can easily improve upon, for tuning and boosting.
In your class that manages kicking off tasks from the file queue, something like
private object _canRunLock;
private int _maxTasks;
private int _activeTasks;
public MyTaskManager(int argMaxTasks)
{
_maxTasks = argMaxTasks;
_canRunLock = new object();
_activeTasks = 0;
}
public bool CanRunTask(MyTask argTask)
{
get
{
lock(_canRunLock)
{
if (_activeTasks < _maxTasks)
{
ExecuteTask(argTask);
_activeTasks++;
return true;
}
}
return false;
}
}
public void TaskCompleted()
{
lock(_canRunLock)
{
if (_activeTasks > 0)
{
_activeTasks--;
}
else
{
throw new WTFException("Okay how did this happen?");
}
}
}
Simple and safe (I think). You could have another property pause or disable to check as well. Might want to make the above a singleton ( :( ), or at least bear in mind that what if you run more than one....
Best advice I can give is start simple, open and decoupled, and then complicate as necessary, be easy to start optimising prematurely here. A good idea not to have a load a of threads all waiting on say the FileSystem, or a backend, but I doubt number of processors is ever going to be a bottleneck, so your maxTasks is a bit thumb in the air. Some sort of self tune between a lower and upper limit might be a good thing as opposed to one fixed number.
Upvotes: 0
Reputation: 4177
IMO what you want is something like a cron job. A version of the algorithm can be:
for every job (called periodically via cron/scheduler) run
//
// your program
//
if job_is_running {
// Still busy...
// don't process anything and just return back
return
}
// Create your array
//
Array a = new Array()
for each file in folder {
a.append(file)
}
// Process each file
//
for each item in a {
process_item(item);
// Move it (or delete)
//
remove_from_input_folder(item)
}
Now, you can call remove_from input()
before processing, to avoid double processing if the system crashes.
I had to do something like that for a phone company a while ago and this was the most comfortable solution we got :)
Update: The parallel bit
Looping through files to build the array is theoretically negligible compared to actual processing. Therefore, you can convert the second loop into a worker-based parallel variant easily enough.
HTH
Upvotes: 0
Reputation: 1435
I think you can check new file is coming by FileSystemWatcher. There is an article at http://www.codeproject.com/Articles/25443/Watching-Folder-Activity-in-C-NET.
FileSystemWatcher is help you not loop in specific folder.
Hope this help.
Upvotes: 0