Vlad M
Vlad M

Reputation: 13

Multithreaded file processing

I want to optimize this code:

    public static void ProcessTo(this StreamReader sr, StreamWriter sw, Action<StreamWriter, string> action, FileProcessOptions fpo = null)
    {
        if (fpo == null)
        {
            fpo = new FileProcessOptions();
        }

        List<string> buffer = new List<string>(fpo.BuferSize);

        while (!sr.EndOfStream)
        {
            buffer.Clear();

            while (!sr.EndOfStream && buffer.Count < fpo.BuferSize)
            {
                buffer.Add(sr.ReadLine());
            }

            if (fpo.UseThreads)
            {
                buffer.AsParallel().ForAll(line => action(sw, line));
            }
            else
            {
                buffer.ForEach(line => action(sw, line));
            }
        }
    }

I process large amounts of data and want to parallelize the process. Usually data archived, so it is very important to use multiple threads to process flow of data

Upvotes: 1

Views: 385

Answers (2)

Vlad M
Vlad M

Reputation: 13

final solution:

        public static IEnumerable<string> GetEnumirator(this StreamReader sr)
    {
        while (!sr.EndOfStream)
        {
            yield return sr.ReadLine();
        }
    }

    public static void ProcessParalel(this StreamReader sr, Action<string> action)
    {
        sr.GetEnumirator().AsParallel().ForAll(action);
    }

    public static void ProcessTo(this StreamReader sr, BinaryWriter bw, Action<BinaryWriter, string> action, FileProcessOptions fpo = null)
    {
        sr.ProcessParalel(line =>
        {
            using (MemoryStream ms = new MemoryStream())
            {
                BinaryWriter lbw = new BinaryWriter(ms);

                action(lbw, line);
                ms.Seek(0, SeekOrigin.Begin);

                lock (bw)
                {
                    ms.WriteTo(bw.BaseStream);
                }
            }
        });
    }

with compressed input stream, I'v got the acceleration in 3 times

Upvotes: 0

Jim Mischel
Jim Mischel

Reputation: 133950

If you don't pass a StreamReader, and instead just pass the file name, you could write:

Parallel.Foreach(File.ReadLines(filename), (line) => action(sw, line));

You can still do this if you pass a StreamReader. You just have to create an enumerator that will read it. Something like what's done here: Recommended behaviour of GetEnumerator() when implementing IEnumerable<T> and IEnumerator<T>. Using that, you'd write:

LineReaderEnumerable myEnumerable = new LineEnumerator(sr);
Parallel.Foreach(myEnumerable, (line) => action(sw, line));

However, you have a potential problem with that because you then could have multiple threads writing to that stream writer. And StreamWriter doesn't support concurrent writes. It will throw an exception. If you're synchronizing access to the output file (using a lock, for example), then you're okay here.

One other problem you'll run into is the order in which things are output. It's almost certain that if you read lines in the order [1, 2, 3, 4, ... n], the output order is going to be different. You might get [1, 2, 4, 3, 6, 5, 7, 9, 8 ... n, n-1]. If output order is important, you have to come up with a way to make sure that things are output in the proper order.

Regarding the lock, you have:

sr.ProcessParalel(line => 
{ 
    string[] ls = line.Split('\t');
    lock (sw)
    {
        sw.Write(float.Parse(ls[0]));
        sw.Write(int.Parse(ls[1]) * 10 + 1);
        for (int i = 2; i < ls.Length; i++)
        {
            sw.Write(int.Parse(ls[1]));
        }
    }
 });

The problem isn't the lock. The problem is that you're holding the lock for too long. The way you have it written, the code is effectively single-threaded because all the threads are waiting on that lock to do their processing. You need to change your processing so that the lock is held for as short a time as possible.

Build your output into a StringBuilder, convert it to a string, and then output that string. For example:

string[] ls = line.Split('\t');
StringBuilder sb = new StringBuilder();
sb.Append(float.Parse(ls[0]));
sb.Append(' ');
sb.Append(int.Parse(ls[1])) * 10 + 1);
for (int i = 2; i < ls.Length; i++)
{
    sb.Append(' ');
    sb.Append(int.Parse(ls[i]));    }
}
var sout = sb.ToString();
// lock and write
lock (sw)
{
    sw.Write(sout);
}

You could do much the same thing with a StringWriter.

Upvotes: 2

Related Questions