Reputation: 86937
I'm trying to see how to efficiently read in some data from a file, do some parallel work (per line) then write the new line back to the file system.
I know I can do this, one line at a time .. but I was hoping to do this a few lines at a time -or- .. if one line is 'busy' waiting for the async work to complete, then move on to the next line, etc.
Here's some sample data and logic...
Header
SomeId#1, SomeId#2, SomeId#3, Name, Has this line been processed and cleaned(true/false)
File Data
444,2,12,Leia Organa, true
121,33333,4,Han Solo, true
1,2,3,Jane Doe, false
1,4,11,John Doe, false
So the first 2 lines have been processed and I will skip those lines. The 3rd and 4th line need to be processed. When the data has been checked, I wish to save this back to the file like
1,33333,3,Jane Doe, true
So this is the general logic...
DoWorkAsync()
<-- which could take a second or 5I was just hoping that I didn't have to wait for the DoWorkAsync()
to complete before I can save then read the next line. I was hoping that I could start reading the next line ... and if the previous line finishes .. fine .. then save that line to the same line number in the file .. and move on again to the next line.
It's like I could have 5 or 10 lines all working at the same time .. waiting for the results to come back from the 3rd party api ... working in parallel or whatever.
Can this be done in .NET? I'm sure .NET has the functionality for this .. I just can't see the pattern to do this.
NOTE: I usually do async/await for I/O intensive operations (like hitting the filesystem or calling some 3rd party api endpoint) vs Parallel.ForEach which I use for cpu intensive work.
NOTE: Why the true/false
at the end of the line? Because I can't process all the lines at once. I have api limits.
Other ideas were to have two files, one for PENDING and one for PROCESSED.
Upvotes: 0
Views: 1786
Reputation: 18265
Here it is a stub of a parallel processor which uses async
/await
while processing lines in batches.
This approach ensures that the same order is preserved when writing.
public async Task ProcessFile()
{
const int parallelism = 5;
using (var readStream = File.OpenRead(@"c:\myinputfile"))
{
// put HERE your logic for skipping to a specific line
// e.g. readStream.Seek(lastPosition);
using (var reader = new StreamReader(readStream))
{
while (!reader.EndOfStream)
{
var tasks = new List<Task<string>>();
for (var i = 0; i < parallelism; i++)
{
var line = await reader.ReadLineAsync();
tasks.Add(DoWorkAsync(line));
if (reader.EndOfStream)
break;
}
var results = await Task.WhenAll(tasks);
using (var writeStream = File.Open(@"d:\myresultfile", FileMode.Append))
using (var writer = new StreamWriter(writeStream))
{
foreach (var line in results)
await writer.WriteLineAsync(line);
}
}
}
}
}
public async Task<string> DoWorkAsync(string line)
{
await Task.Delay(new Random().Next(1000, 5000));
// do some work and return line with last parameter = true
return line.Replace("false", "true"); // e.g.
}
It surely needs improvement, but it should give you a good base for writing your own.
Upvotes: 1