Reputation: 119
I need to process every record from an input file asynchronously using batch concept. Example - say my input file has 100 records and my batch size is set to 10, I need to process the records in 10 batches (10 records per batch) and all these batches should be processed asynchronously. The batch size 10 is not fixed and it might vary.
I have a read function which reads each record from the file and if 10 records are read, I need to call an async method which process these records using a task. But the actual thread should continue reading next set of records and fill the next set of batch (read next 10 records) and once they are read, call the same async method which process these records using another task and this should continue until all records are read.
Right now, I am able to read the record and fill in the batch and then process each batch one after the other, but I wanted to do it asynchronously.
I am providing a snippet of my code below:
public async Task ProcessRecordAsync(InputFile)
{
public int recordCount = 0;
List<Task> TaskList = new List<Task>();
While (condition to check if records present)
{
Object getRecordVal = ReadInputRecord();
if(++recordCount >= 10)
{
var LastTask = new Task(async () => await ProcessRecordAsync());
LastTask.Start();
TaskList.Add(LastTask);
}
}
Task.WaitAll(TaskList.ToArray());
}
ProcessRecordAsync() --> This is the function which processes the input record
I think I am going wrong somewhere in calling the task incorrectly. Once the batch is filled, I wanted to call the ProcessRecordAsync function using a task and the main thread should still continue to read the records and fill the next set of batches. With this code, I am getting an exception.
I am getting below error:
System.InvalidOperationException: Collection was modified; enumeration operation may not execute. at System.ThrowHelper.ThrowInvalidOperationException(ExceptionResource resource) at System.Collections.Generic.List`1.Enumerator.MoveNextRare()
Is this the right way handling multiple tasks?
Upvotes: 0
Views: 914
Reputation: 43399
What you are trying to implement is the producer-consumer pattern. The most familiar way to implement this pattern is by using the BlockingCollection
class. There is not much to learn beyond the Add
, CompleteAdding
, and GetConsumingEnumerable
methods. Although easy to learn, it is not the most powerful tool. It is not very efficient when dealing with small workloads, and by being blocking by nature it isn't good for scalability either. There is also no native support for batching. You must do everything yourself by managing lists or arrays. I have tried to make a chunky implementation, with mediocre success.
Recently I invested the time to learn the TPL Dataflow library, and I can clearly say that it is the right tool for the job. You need to define two blocks, one to do the batching (BatchBlock
), and one to do the processing (ActionBlock
). Then form a small pipeline by linking the blocks together, feed the data to the first block, and have them processed automatically in batches by the second block. Finally call Complete
to the first block, and await
for the Completion
of the second block. The library creates and manages all the Task
s needed to do the job. The performance is optimal. Just the API is unfamiliar, not particularly intuitive, and a bit verbose when you have to configure each block by providing options to each constructor.
var batchBlock = new BatchBlock<string>(batchSize: 10);
var actionBlock = new ActionBlock<string[]>(batch =>
{
// Do something with the batch. For example:
Console.WriteLine("Processing batch");
foreach (var line in batch)
{
Console.WriteLine(line);
}
});
batchBlock.LinkTo(actionBlock,
new DataflowLinkOptions() { PropagateCompletion = true });
foreach (var line in File.ReadLines(@".\..\..\_Data.txt"))
{
await batchBlock.SendAsync(line).ConfigureAwait(false);
}
batchBlock.Complete();
await actionBlock.Completion.ConfigureAwait(false);
Upvotes: 3
Reputation: 226
I haven't used MoreLinq nor the TPL DataFlow Library before as the others suggested...
If you wanted to stick with async/await something like this would get the job done (though there are likely some optimizations to be found):
async Task Main()
{
await BatchProcessAsync(GetValues(), ProcessElementAsync);
}
public async Task BatchProcessAsync<T>(
IEnumerable<T> elements,
Func<T, Task> operationAsync,
int batchSize = 10)
{
using (var en = elements.GetEnumerator())
{
var ops = new List<Task>();
while (en.MoveNext())
{
ops.Add(operationAsync(en.Current));
if (ops.Count == batchSize)
{
await Task.WhenAll(ops);
ops.Clear();
}
}
// process any remaining operations
if (ops.Any()) { await Task.WhenAll(ops); }
}
}
public async Task ProcessElementAsync(string element)
{
Print($"Processing element: {element}...");
await Task.Delay(300);
Print($"Completed element: {element}.");
void Print(string output)
=> Console.WriteLine($"[{DateTime.Now:s}] {output}");
}
public IEnumerable<string> GetValues(int maxValues = 100)
=> Enumerable.Range(1, maxValues).Select(i => $"Element #{i}");
EDIT After posting I re-read the original question and realize that this implementation assumes you have already read all the records and therefore misses on the part of having the main thread continuing reading records from the input file. However, it should not be difficult to apply the same batching technique to reading the records from the input file in batches and then feeding the records in to be batch processed.
Upvotes: 3