Reputation: 2515
Problem: Read-in data piles up, waiting to be written.
I have a basic ETL process that reads in a file, transforms the data, and then writes the data out to another file. Since I'm on a multi-core system, I'm trying to perform this using multiple threads. My problem is that the readers are outpacing the writers: many files end up read and their data transformed, but they pile up waiting to be written.
What I want is a balance between the files read and the files written, while still using multiple threads.
I've tried various things in the .NET library (C# 4.0). I think though that there is something I don't understand, and that this must be more complicated than simply using Thread
or ThreadPool.QueueUserWorkItem
or Task
the way they appear in the basic examples I've found.
For example, suppose I try something like this:
Task task = new Task(() => PerformEtl(sourceFile));
task.start();
If I log the files being read and the files being written, it's something like a 10-to-1 ratio. On a long-running process, this is unsustainable.
There must be some basic multi-threading/multi-processing pattern that I'm ignorant of or can't call to mind. Does anyone know where I should go from here? Thanks.
Solved:
Thanks to @Blam.
Here is some example/pseudo code to illustrate how a producer-consumer pattern can be implemented using the .NET library, as suggested by @Blam.
// Adapted from: https://msdn.microsoft.com/en-us/library/dd997371(v=vs.100).aspx
BlockingCollection<object> dataItems = new BlockingCollection<object>(10);
List<Task> tasks = new List<Task>();
tasks.Add(
// Producer.
Task.Factory.StartNew(() =>
{
for (;;)
{
string filePath = GetNextFile();
if (filePath == null) break;
object data = ProcessData(ReadData(file));
dataItems.Add(data);
}
dataItems.CompleteAdding();
})
);
tasks.Add(
// Consumer.
Task.Factory.StartNew(() =>
{
while (!dataItems.IsCompleted))
{
object data;
try
{
data = dataItems.Take();
WriteData(data);
}
catch(InvalidOperationException ioe)
{
Console.Error.WriteLine(ioe.Message);
}
}
})
);
Task.WaitAll(tasks.ToArray());
The MSDN discussion is here: https://msdn.microsoft.com/en-us/library/dd997371(v=vs.100).aspx
Upvotes: 4
Views: 197
Reputation: 45106
I do exactly that I and break it into 3
Use BlockingCollection with Upperbound (bounded capacity)
With Upperbound the fast steps do not get too far ahead of the slow
So you have multiple cores. You are probably IO bound.
You can process (step 2) in parallel but unless you have some complex transforms it will not make a difference.
Try to read and write on different physical devices.
Upvotes: 4