Logging device
Logging device

Reputation: 1

Calling async functions on parallel threads in c#

I have a three async function that I want to call from multiple threads in parallel at the same time. Till now I have tried the following approach -

int numOfThreads = 4; 
var taskList = List<Task>(); 
using(fs = new FileStream(inputFilePath, FileMode.OpenOrCreate,FileAccess.ReadWrite,FileShare.ReadWrite))
{
    for(int i=1; i<= numOfThreads ; i++) 
   {
      taskList.Add(Task.Run( async() => {
       byte[] buffer = new byte[length]; // length could be upto a few thousand
       await Function1Async(); // Reads from the file into a byte array
       long result = await Function2Aync(); // Does some async operation with that byte array data
       await Function3Async(result); // Writes the result into the file 
      }
   }
}
Task.WaitAll(taskList.toArray());  

However, not all of the tasks complete before the execution reaches an end. I have limited experience with threading in c#. What am I doing wrong in my code? Or should I take an alternative approach?

EDIT - So I made some changes to my approach. I got rid of the Function3Async for now -

for(int i=1;i<=numOfThreads; i++) 
{
   using(fs = new FileStream(----))
   {
      taskList.Add(Task.Run( async() => {
       byte[] buffer = new byte[length]; // length could be upto a few thousand
       await Function1Async(buffer); // Reads from the file into a byte array
       Stream data = new MemoryStream(buffer); 
       /** Write the Stream into a file and return 
        * the offset at which the write operation was done
        */
       long blockStartOffset = await Function2Aync(data); 

       Console.WriteLine($"Block written at - {blockStartOffset}");
      }
   }
}
Task.WaitAll(taskList.toArray());

Now all threads seem to proceed to completion but the Function2Async seems to randomly write some Japanese characters to the output file. I guess it is some threading issue perhaps? Here is the implementation of the Function2Async ->

public async Task<long> Function2Async(Stream data)
{
        long offset = getBlockOffset(); 
        using(var outputFs = new FileStream(fileName,
            FileMode.OpenOrCreate,
            FileAccess.ReadWrite,
            FileShare.ReadWrite))
        {
            outputFs.Seek(offset, SeekOrigin.Begin);
            await data.CopyToAsync(outputFs);     
        }
         return offset;
}

Upvotes: 0

Views: 74

Answers (1)

nvoigt
nvoigt

Reputation: 77285

In your example you have passed neither fs nor buffer into Function1Async but your comment says it reads from fs into buffer, so I will assume that is what happens.

You cannot read from a stream in parallel. It does not support that. If you find one that supports it, it will be horribly inefficient, because that is how hard disk storage works. Even worse if it is a network drive.

Read from the stream into your buffers first and in sequence, then let your threads loose and run your logic. In parallel, on the already existing buffers in memory.

Writing by the way would have the same problem if you wrote to the same file. If you write to one file per buffer, that's fine, otherwise, do it sequentially.

Upvotes: 1

Related Questions