NokhodSiah
NokhodSiah

Reputation: 55

useing thread (task) for doing work contain I/O

I need to read data from a file,process and write result to another file. I use backgroundworker to show process state .I write something like this to use in DoWork event of backgroundworker

private void ProcData(string fileToRead,string fileToWrite)
        {
            byte[] buffer = new byte[4 * 1024];

            //fileToRead & fileToWrite have same size
            FileInfo fileInfo = new FileInfo(fileToRead);

            using (FileStream streamReader = new FileStream(fileToRead, FileMode.Open))
            using (BinaryReader binaryReader = new BinaryReader(streamReader))
            using (FileStream streamWriter = new FileStream(fileToWrite, FileMode.Open))
            using (BinaryWriter binaryWriter = new BinaryWriter(streamWriter))
            {
                while (streamWriter.Position < fileInfo.Length)
                {
                    if (streamWriter.Position + buffer.Length > fileInfo.Length)
                    {
                        buffer = new byte[fileInfo.Length - streamWriter.Position];
                    }

                    //read
                    buffer = binaryReader.ReadBytes(buffer.Length);

                    //proccess
                    Proc(buffer);

                    //write
                    binaryWriter.Write(buffer);

                    //report if procentage changed
                    //...

                }//while
            }//using
        }

but it is 5 more time slower than just reading from fileToRead and writing to fileToWrite so I think about threading. I read some question in site and try something like this base on this question

private void ProcData2(string fileToRead, string fileToWrite)
        {
            int threadNumber = 4; //for example

            Task[] tasks = new Task[threadNumber];

            long[] startByte = new long[threadNumber];
            long[] length = new long[threadNumber];

            //divide file to threadNumber(4) part
            //and update startByte & length

            var parentTask = Task.Run(() =>
            {
                for (int i = 0; i < threadNumber; i++)
                {
                    tasks[i] = Task.Factory.StartNew(() =>
                    {
                        Proc2(fileToRead, fileToWrite, startByte[i], length[i]);
                    });
                }
            });

            parentTask.Wait();

            Task.WaitAll(tasks);

        }
        //
        private void Proc2(string fileToRead,string fileToWrite,long fileStartByte,long partLength)
        { 
            byte[] buffer = new byte[4 * 1024];

            using (FileStream streamReader = new FileStream(fileToRead, FileMode.Open,FileAccess.Read,FileShare.Read))
            using (BinaryReader binaryReader = new BinaryReader(streamReader))
            using (FileStream streamWriter = new FileStream(fileToWrite, FileMode.Open,FileAccess.Write,FileShare.Write))
            using (BinaryWriter binaryWriter = new BinaryWriter(streamWriter))
            {
                streamReader.Seek(fileStartByte, SeekOrigin.Begin);
                streamWriter.Seek(fileStartByte, SeekOrigin.Begin);

                while (streamWriter.Position < fileStartByte+partLength)
                {
                    if (streamWriter.Position + buffer.Length > fileStartByte+partLength)
                    {
                        buffer = new byte[fileStartByte+partLength - streamWriter.Position];
                    }

                    //read
                    buffer = binaryReader.ReadBytes(buffer.Length);

                    //proccess
                    Proc(buffer);

                    //write
                    binaryWriter.Write(buffer);

                    //report if procentage changed
                    //...

                }//while
            }//using
        }

but I think it have some problem and by each time switching task it needs to seek again. I think about reading file, use threading for Proc() and then writing result, but it seems wrong. How can I do it properly?(reading a buffer from a file, process and write it on other file by using task)

//===================================================================

base on Pete Kirkham post I modified my method. I do not know why ,but it did not work for me. I added new method for who it may help them. thanks every body

 private void ProcData3(string fileToRead, string fileToWrite)
        {
            int bufferSize = 4 * 1024;
            int threadNumber = 4;//example
            List<byte[]> bufferPool = new List<byte[]>();
            Task[] tasks = new Task[threadNumber];

            //fileToRead & fileToWrite have same size
            FileInfo fileInfo = new FileInfo(fileToRead);

            using (FileStream streamReader = new FileStream(fileToRead, FileMode.Open))
            using (BinaryReader binaryReader = new BinaryReader(streamReader))
            using (FileStream streamWriter = new FileStream(fileToWrite, FileMode.Open))
            using (BinaryWriter binaryWriter = new BinaryWriter(streamWriter))
            {
                while (streamWriter.Position < fileInfo.Length)
                {
                    //read
                    for (int g = 0; g < threadNumber; g++)
                    {
                        if (streamWriter.Position + bufferSize <= fileInfo.Length)
                        {
                            bufferPool.Add(binaryReader.ReadBytes(bufferSize));
                        }
                        else
                        {
                            bufferPool.Add(binaryReader.ReadBytes((int)(fileInfo.Length - streamWriter.Position)));
                            break;
                        }
                    }

                    //do
                    var parentTask = Task.Run(() =>
                    {
                        for (int th = 0; th < bufferPool.Count; th++)
                        {
                            int index = th;

                            //threads
                            tasks[index] = Task.Factory.StartNew(() =>
                            {
                                Proc(bufferPool[index]);
                            });

                        }//for th
                    });

                    //stop parent task(run childs)
                    parentTask.Wait();

                    //wait till all task be done
                    Task.WaitAll(tasks);

                    //write
                    for (int g = 0; g < bufferPool.Count; g++)
                    {
                        binaryWriter.Write(bufferPool[g]);
                    }

                    //report if procentage changed
                    //...

                }//while
            }//using
        }

Upvotes: 2

Views: 140

Answers (1)

Pete Kirkham
Pete Kirkham

Reputation: 49331

Essentially you want a split the processing of the data up into parallel tasks, but you don't want want to split the IO up.

How this happens depends on the size of your data. If it is small enough to fit into memory, then you can read it all into an input array and create an output array, then create tasks to process some of the input array and populate some of the output array, then write the whole output array to file.

If the data is too large for this, then you need to put a limit on the amount of data read and written at a time. So you have your main flow which starts off by reading N blocks of data and creating N tasks to process them. You then wait for the tasks to complete in order, and each time one completes you write the block of output and read a new block of input and create another task. Some experimentation will be required for a good value for N and block size which means tasks tend to complete in about the same rate as the IO works at.

Upvotes: 1

Related Questions