Reputation: 23
Think the title I've given is a bit confusing but hard to express what I'm trying to ask.
Basically I am writing a program in C# using .NET that uses the Google cloud API in order to upload data.
I am trying to do this in an efficient way and have used parallel.foreach with success but I need finer control. I collect the files to be uploaded into one list, which I want to sort by file size and then split into say 3 equally sized (in terms of gigabytes not file count) lists.
One of these lists will contain say a third in terms of total upload size but be comprised of the largest files (in gigabytes) but therefore the smallest count of files, the next list will be the same total gigabytes as the first list but be comprised of a greater number of smaller files and finally the last list will be comprised of many many small files but should also total the same size as the other sub lists.
I then want to assign a set number of threads to the upload process. (e.g. I want the largest file list to have 5 threads assigned, the middle to have 3 and the small file list to have only 2 thread.) Is it possible to set up these 3 lists to be iterated over in parallel, while controlling how many threads are allocated?
What is the best method to do so?
Upvotes: 2
Views: 1645
Reputation: 43921
Here is another idea. You could have a single list, but upload the files with a dynamic degree of parallelism. This would be easy to implement if the SemaphoreSlim
class had a WaitAsync
method that could reduce the CurrentCount
by a value other than 1
. You could then initialize the SemaphoreSlim
with a large initialCount
like 1000
, and then call WaitAsync
with a value relative to the size of each file. Lets call this value weight. The semaphore would guarantee that the sum weight of all files currently uploaded would not exceed 1000
. This could be a single huge file with weight of 1000
, or 10
medium files each weighing 100
, or a mix of small, medium and large files with total weight around 1000
. The degree of parallelism would constantly change depending on the weight of the next file in the list.
This is an example of the code that you'd have to write:
var semaphore = new SemaphoreSlim(1000);
var tasks = Directory.GetFiles(@"D:\FilesToUpload")
.Select(async filePath =>
{
var fi = new FileInfo(filePath);
var weight = (int)Math.Min(1000, fi.Length / 1_000_000);
await semaphore.WaitAsync(weight); // Imaginary overload that accepts weight
try
{
await cloudService.UploadFile(filePath);
}
finally
{
semaphore.Release(weight);
}
})
.ToArray();
await Task.WhenAll(tasks);
Below is a custom AsyncSemaphorePlus
class that provides the missing overload. It is based on Stephen Toub's AsyncSemaphore
class from the blog post Building Async Coordination Primitives, Part 5: AsyncSemaphore. It is slightly modernized with features like Task.CompletedTask
and TaskCreationOptions.RunContinuationsAsynchronously
, that were not available at the time the blog post was written.
public class AsyncSemaphorePlus
{
private readonly object _locker = new object();
private readonly Queue<(TaskCompletionSource<bool>, int)> _queue
= new Queue<(TaskCompletionSource<bool>, int)>();
private int _currentCount;
public int CurrentCount { get { lock (_locker) return _currentCount; } }
public AsyncSemaphorePlus(int initialCount)
{
if (initialCount < 0)
throw new ArgumentOutOfRangeException(nameof(initialCount));
_currentCount = initialCount;
}
public Task WaitAsync(int count)
{
lock (_locker)
{
if (_currentCount - count >= 0)
{
_currentCount -= count;
return Task.CompletedTask;
}
else
{
var tcs = new TaskCompletionSource<bool>(
TaskCreationOptions.RunContinuationsAsynchronously);
_queue.Enqueue((tcs, count));
return tcs.Task;
}
}
}
public void Release(int count)
{
lock (_locker)
{
_currentCount += count;
while (_queue.Count > 0)
{
var (tcs, weight) = _queue.Peek();
if (weight > _currentCount) break;
(tcs, weight) = _queue.Dequeue();
_currentCount -= weight;
tcs.SetResult(true);
}
}
}
}
Update: This approach is intended for uploading a medium/large number of files. It is not suitable for extremely huge number of files, because all uploading tasks are created upfront. If the files that have to be uploaded are, say, 100,000,000, then the memory required to store the state of all these tasks may exceed the available RAM of the machine. For uploading that many files the solution proposed by Panagiotis Kanavos is probably preferable, because in can be easily modified with bounded dataflow blocks, and by feeding them with SendAsync
instead of Post
, so that the memory required for the whole operation is kept under control.
Upvotes: 0
Reputation: 131722
Parallel.ForEach
and PLINQ
are meant for data parallelism - processing big chunks of data using multiple cores. It's meant for scenarios where you have eg 1GB of data in memory (or a very fast IEnumerable source) and want to process it using all cores. In such scenarios, you need to partition the data into independent chunks and have one worker crunch one crunch at a time, to limit the synchronization overhead.
What you describe though is concurrent uploads for a large number of files. That's pure IO, not data parallelism. Most of the time will be spent loading the data from disk or writing it to the network. This is a job for Task.Run
and async/await
. To upload multiple files concurrently, you could use an ActionBlock or a Channel to queue the files and upload them asynchronously. With channels you have to write a bit of worker boilerplate but you get greater control, especially in cases where you want to use eg the same client instance for multiple calls. An ActionBlock is essentially stateless.
Finally, you describe queues with different DOP based on size, which is a very nice idea when you have both big and small files. You can do that by using multiple ActionBlock instances, each with a different DOP, or multiple Channel workers, each with a different DOP.
Dataflows
Let's say you already have a method that uploads a file by path name :
//Adopted from the Google SDK example
async Task UploadFile(DriveService service,FileInfo file)
{
var fileName=Path.GetFileName(filePath);
using var uploadStream = file.OpenRead();
var request insertRequest = service.Files.Insert(
new File { Title = file.Name },
uploadStream,
"image/jpeg");
await insert.UploadAsync();
}
You can create three different ActionBlock instances, each with a different DOP :
var small=new ActionBlock<FileInfo>(
file=>UploadFile(service,file),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 15
});
var medium=new ActionBlock<FileInfo>(
file=>UploadFile(service,file),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 10
});
var big=new ActionBlock<FileInfo>(
path=>UploadFile(service,file),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = 2
});
And post different files to different blocks based on size :
var directory=new DirectoryInfo(...);
var files=directory.EnumerateFiles(...);
foreach(var file in files)
{
switch (file.Length)
{
case int x when x < 1024:
small.Post(file);
break;
case int x when x < 10240:
medium.Post(file);
break;
default:
big.Post(file);
break;
}
}
Or, in C# 8 :
foreach(var file in files)
{
var block = file.Length switch {
long x when x < 1024 => small,
long x when x < 10240=> medium,
_ => big
};
block.Post(file)
}
When iteration completes, we need to tell the blocks we are done by calling Complete()
on each one and waiting for all of them to finish with :
small.Complete();
medium.Complete();
big.Complete();
await Task.WhenAll(small.Completion, medium.Completion, big.Completion);
Upvotes: 2