Reputation: 73
I'm trying to do a parallel SqlBulkCopy to multiple targets over WAN, many of which may be having slow connections and/or connection cutoffs; their connection speed varies from 2 to 50 mbits download, and I am sending from a connection with 1000 mbit upload; a lot of the targets need multiple retries to correctly finish.
I'm currently using a Parallel.ForEach
on the GetConsumingEnumerable()
of a BlockingCollection (queue
); however I either stumbled upon some bug, or I am having problems fully understanding its purpose, or simply got something wrong..
The code never calls the CompleteAdding()
method of the blockingcollection,
it seems that somewhere in the parallel-foreach-loop some of the targets get lost.
Even if there are different approaches to this, and disregarding the kind of work it is doing in the loop, the blockingcollection shouldn't behave the way it does in this example, should it?
In the foreach-loop, I do the work, and add the target to a results
-collection in case it completed successfully, or re-add the target to the BlockingCollection in case of an error until the target reached the max retries threshold; at that point I add it to the results
-collection.
In an additional Task, I loop until the count of the results
-collection equals the initial count of the targets; then I do the CompleteAdding()
on the blocking collection.
I already tried using a locking object for the operations on the results
-collection (using a List<int>
instead) and the queue, with no luck, but that shouldn't be necessary anyways. I also tried adding the retries to a separate collection, and re-adding those to the BlockingCollection in a different Task instead of in the parallel.foreach.
Just for fun I also tried compiling with .NET from 4.5 to 4.8, and different C# language versions.
Here is a simplified example:
List<int> targets = new List<int>();
for (int i = 0; i < 200; i++)
{
targets.Add(0);
}
BlockingCollection<int> queue = new BlockingCollection<int>(new ConcurrentQueue<int>());
ConcurrentBag<int> results = new ConcurrentBag<int>();
targets.ForEach(f => queue.Add(f));
// Bulkcopy in die Filialen:
Task.Run(() =>
{
while (results.Count < targets.Count)
{
Thread.Sleep(2000);
Console.WriteLine($"Completed: {results.Count} / {targets.Count} | queue: {queue.Count}");
}
queue.CompleteAdding();
});
int MAX_RETRIES = 10;
ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 50 };
Parallel.ForEach(queue.GetConsumingEnumerable(), options, target =>
{
try
{
// simulate a problem with the bulkcopy:
throw new Exception();
results.Add(target);
}
catch (Exception)
{
if (target < MAX_RETRIES)
{
target++;
if (!queue.TryAdd(target))
Console.WriteLine($"{target.ToString("D3")}: Error, can't add to queue!");
}
else
{
results.Add(target);
Console.WriteLine($"Aborted after {target + 1} tries | {results.Count} / {targets.Count} items finished.");
}
}
});
I expected the count of the results
-collection to be the exact count of the targets
-list in the end, but it seems to never reach that number, which results in the BlockingCollection never being marked as completed, so the code never finishes.
I really don't understand why not all of the targets get added to the results
-collection eventually! The added count always varies, and is mostly just shy of the expected final count.
EDIT: I removed the retry-part, and replaced the ConcurrentBag with a simple int-counter, and it still doesn't work most of the time:
List<int> targets = new List<int>();
for (int i = 0; i < 500; i++)
targets.Add(0);
BlockingCollection<int> queue = new BlockingCollection<int>(new ConcurrentQueue<int>());
//ConcurrentBag<int> results = new ConcurrentBag<int>();
int completed = 0;
targets.ForEach(f => queue.Add(f));
var thread = new Thread(() =>
{
while (completed < targets.Count)
{
Thread.Sleep(2000);
Console.WriteLine($"Completed: {completed} / {targets.Count} | queue: {queue.Count}");
}
queue.CompleteAdding();
});
thread.Start();
ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(queue.GetConsumingEnumerable(), options, target =>
{
Interlocked.Increment(ref completed);
});
Upvotes: 3
Views: 1131
Reputation: 131364
Parallel.ForEach
is meant for data parallelism (ie processing 100K rows using all 8 cores), not concurrent operations. This is essentially a pub/sub and async problem, if not a pipeline problem. There's nothing for the CPU to do in this case, just start the async operations and wait for them to complete.
.NET handles this since .NET 4.5 through the Dataflow classes and lately, the lower-level System.Threading.Channel namespace.
In its simplest form, you can create an ActionBlock<> that takes a buffer and target connection and publishes the data. Let's say you use this method to send the data to a server :
async Task MyBulkCopyMethod(string connectionString,DataTable data)
{
using(var bcp=new SqlBulkCopy(connectionString))
{
//Set up mappings etc.
//....
await bcp.WriteToServerAsync(data);
}
}
You can use this with an ActionBlock class with a configured degree of parallelism. Dataflow classes like ActionBlock have their own input, and where appropriate, output buffers, so there's no need to create a separate queue :
class DataMessage
{
public string Connection{get;set;}
public DataTable Data {get;set;}
}
...
var options=new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 50,
BoundedCapacity = 8
};
var block=new ActionBlock<DataMessage>(msg=>MyBulkCopyMethod(msg.Connection,msg.Data, options);
We can start posting messages to the block now. By setting the capacity to 8 we ensure the input buffer won't get filled with large messages if the block is too slow. MaxDegreeOfParallelism
controls how may operations run concurrently. Let's say we want to send the same data to many servers :
var data=.....;
var servers=new[]{connString1, connString2,....};
var messages= from sv in servers
select new DataMessage{ ConnectionString=sv,Table=data};
foreach(var msg in messages)
{
await block.SendAsync(msg);
}
//Tell the block we are done
block.Complete();
//Await for all messages to finish processing
await block.Completion;
Retries
One possibility for retries is to use a retry loop in the worker function. A better idea would be to use a different block and post failed messages there.
var block=new ActionBlock<DataMessage>(async msg=> {
try {
await MyBulkCopyMethod(msg.Connection,msg.Data, options);
}
catch(SqlException exc) when (some retry condition)
{
//Post without awaiting
retryBlock.Post(msg);
});
When the original block completes we want to tell the retry block to complete as well, no matter what :
block.Completion.ContinueWith(_=>retryBlock.Complete());
Now we can await the retryBlock
to complete.
That block could have a smaller DOP and perhaps a delay between attempts :
var retryOptions=new ExecutionDataflowBlockOptions {
MaxDegreeOfParallelism = 5
};
var retryBlock=new ActionBlock<DataMessage>(async msg=>{
await Task.Delay(1000);
try {
await MyBulkCopyMethod(msg.Connection,msg.Data, options);
}
catch (Exception ....)
{
...
}
});
This pattern can be repeated to create multiple levels of retry, or different conditions. It can also be used to create different priority workers by giving a larger DOP to high priority workers, or a larger delay to low priority workers
Upvotes: 1
Reputation: 73
Sorry, found the answer: the default partitioner used by blockingcollection and parallel foreach is chunking and buffering, which results in the foreach loop to forever wait for enough items for the next chunk.. for me, it sat there for a whole night, without processing the last few items!
So, instead of:
ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(queue.GetConsumingEnumerable(), options, target =>
{
Interlocked.Increment(ref completed);
});
you have to use:
var partitioner = Partitioner.Create(queue.GetConsumingEnumerable(), EnumerablePartitionerOptions.NoBuffering);
ParallelOptions options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.ForEach(partitioner, options, target =>
{
Interlocked.Increment(ref completed);
});
Upvotes: 4