Mario
Mario

Reputation: 2515

How to balance reader and writer threads in .NET

Problem: Read-in data piles up, waiting to be written.

I have a basic ETL process that reads in a file, transforms the data, and then writes the data out to another file. Since I'm on a multi-core system, I'm trying to perform this using multiple threads. My problem is that the readers are outpacing the writers: many files end up read and their data transformed, but they pile up waiting to be written.

What I want is a balance between the files read and the files written, while still using multiple threads.

I've tried various things in the .NET library (C# 4.0). I think though that there is something I don't understand, and that this must be more complicated than simply using Thread or ThreadPool.QueueUserWorkItem or Task the way they appear in the basic examples I've found.

For example, suppose I try something like this:

Task task = new Task(() => PerformEtl(sourceFile));
task.start();

If I log the files being read and the files being written, it's something like a 10-to-1 ratio. On a long-running process, this is unsustainable.

There must be some basic multi-threading/multi-processing pattern that I'm ignorant of or can't call to mind. Does anyone know where I should go from here? Thanks.


Solved:

Thanks to @Blam.

Here is some example/pseudo code to illustrate how a producer-consumer pattern can be implemented using the .NET library, as suggested by @Blam.

// Adapted from: https://msdn.microsoft.com/en-us/library/dd997371(v=vs.100).aspx
BlockingCollection<object> dataItems = new BlockingCollection<object>(10);
List<Task> tasks = new List<Task>();

tasks.Add(
    // Producer.
    Task.Factory.StartNew(() =>
    {
        for (;;)
        {
            string filePath = GetNextFile();
            if (filePath == null) break;

            object data = ProcessData(ReadData(file));
            dataItems.Add(data);
        }

        dataItems.CompleteAdding();
    })
);

tasks.Add(
    // Consumer.
    Task.Factory.StartNew(() =>
    {
        while (!dataItems.IsCompleted))
        {
            object data;

            try
            {
                data = dataItems.Take();
                WriteData(data);
            }
            catch(InvalidOperationException ioe)
            {
                Console.Error.WriteLine(ioe.Message);
            }
        }
    })
);

Task.WaitAll(tasks.ToArray());

The MSDN discussion is here: https://msdn.microsoft.com/en-us/library/dd997371(v=vs.100).aspx

Upvotes: 4

Views: 197

Answers (1)

paparazzo
paparazzo

Reputation: 45106

I do exactly that I and break it into 3

  • Read
    There is just one set of heads - doing this in parallel does no good
    Close the file and pass text to next step
  • Process
  • Write

Use BlockingCollection with Upperbound (bounded capacity)
With Upperbound the fast steps do not get too far ahead of the slow

So you have multiple cores. You are probably IO bound.

You can process (step 2) in parallel but unless you have some complex transforms it will not make a difference.

Try to read and write on different physical devices.

Upvotes: 4

Related Questions