Reputation: 2539
Looking for a best approach to reading from data source such as Azure Table Storage which is time consuming and converting the data in to json or csv and writing in to local file with file name depending on partition key.
One approach being considered is running the writing to file task on timer elapsed event trigger with fixed time interval.
Upvotes: 0
Views: 1907
Reputation: 127603
For things that do not parallize well (like I/O) the best thing to do is use the "Producer-Consumer model".
The way it works is you have one thread handling the non parallizeable task, all that task does is read in to a buffer. Then you have a set of parallel tasks that all read from the buffer and process the data, they then put the data in to another buffer when they are done processing the data. If you then need to write out the result again in a non parallizeable way you then have another single task writing out the result.
public Stream ProcessData(string filePath)
{
using(var sourceCollection = new BlockingCollection<string>())
using(var destinationCollection = new BlockingCollection<SomeClass>())
{
//Create a new background task to start reading in the file
Task.Factory.StartNew(() => ReadInFile(filePath, sourceCollection), TaskCreationOptions.LongRunning);
//Create a new background task to process the read in lines as they come in
Task.Factory.StartNew(() => TransformToClass(sourceCollection, destinationCollection), TaskCreationOptions.LongRunning);
//Process the newly created objects as they are created on the same thread that we originally called the function with
return TrasformToStream(destinationCollection);
}
}
private static void ReadInFile(string filePath, BlockingCollection<string> collection)
{
foreach(var line in File.ReadLines(filePath))
{
collection.Add(line);
}
//This lets the consumer know that we will not be adding any more items to the collection.
collection.CompleteAdding();
}
private static void TransformToClass(BlockingCollection<string> source, BlockingCollection<SomeClass> dest)
{
//GetConsumingEnumerable() will take items out of the collection and block the thread if there are no items available and CompleteAdding() has not been called yet.
Parallel.ForEeach(source.GetConsumingEnumerable(),
(line) => dest.Add(SomeClass.ExpensiveTransform(line));
dest.CompleteAdding();
}
private static Stream TrasformToStream(BlockingCollection<SomeClass> source)
{
var stream = new MemoryStream();
foreach(var record in source.GetConsumingEnumerable())
{
record.Seralize(stream);
}
return stream;
}
I highly recommend you read the free book Patterns for Parallel Programming, it goes in to some detail about this. There is a entire section explaining the Producer-Consumer model in detail.
UPDATE: For small performance boot use GetConsumingPartitioner()
instead of GetConsumingEnumerable()
from Parallel Extension Extras in the Parallel.ForEach
loop. ForEach
makes some assumptions about the IEnumerable
being passed in that cause it to take extra locks out that it does not need to, by passing a partitioner instead of a enumerable it does not need to take those extra locks.
Upvotes: 3