developer82
developer82

Reputation: 13733

Set a limit to parallel tasks received by azure DownloadToStreamAsync

I have a bunch of files (about 10k) that I need to download from windows azure storage. In order for them to download in parallel instead of one at a time I'm using the blob DownloadToStreamAsync method which returns a Task object. I then set the task ContinueWith with a method that save the stream to a file.

Here is the code:

foreach (var File in ServerFiles)
{
    string sFileName = File.Uri.LocalPath.ToString();
    CloudBlockBlob oBlob = BiActionscontainer.GetBlockBlobReference(sFileName.Replace("/" + Container + "/", ""));

    MemoryStream ms = new MemoryStream();
    BlobRequestOptions f = new BlobRequestOptions();
    Task downloadTask = oBlob.DownloadToStreamAsync(ms);

    downloadTask.ContinueWith((Task task) =>
    {
         ms.Position = 0;
         lock(lockObject)
         {
              using (FileStream file = new FileStream(ResultPath, FileMode.Append, FileAccess.Write))
              {
                   byte[] bytes = ms.ToArray();
                   file.Write(bytes, 0, bytes.Length);
              }
         }
         ms.Dispose();
    });
}

this code is part of a tool that runs on one of our servers (not on azure) - windows 2003 server. the problem is that on that server I get "The operation has timed out. Microsoft.WindowsAzure.Storage on windows 2003 standard", so I figured it could be that a lot of files are making a request at the same time and chock the bandwidth.

So I was wondering, how could I limit the number of parallels that run at a time in such a scenario where I get the Task object from a third party library? and still queue the rest of the tasks coming in?

Upvotes: 2

Views: 600

Answers (1)

svick
svick

Reputation: 244978

You can use SemaphoreSlim for this. Set it up with the number of concurrent requests you want to have and then use await WaitAsync() before starting each request, Release() after each request finishes, and finally wait for the remaining tasks.

Encapsulated in a helper method, it could look like this:

public static async Task ForEachAsync<T>(
    this IEnumerable<T> items, Func<T, Task> action, int maxDegreeOfParallelism)
{
    var semaphore = new SemaphoreSlim(maxDegreeOfParallelism);

    var tasks = new List<Task>();

    foreach (var item in items)
    {
        await semaphore.WaitAsync();

        Func<T, Task> loopAction = async x =>
        {
            await action(x);
            semaphore.Release();
        };

        tasks.Add(loopAction(item));
    }

    await Task.WhenAll(tasks);
}

Usage (with some changes to your code, mostly to simplify it and also to make it more asynchronous):

ServerFiles.ForEachAsync(async file =>
{
    string sFileName = File.Uri.LocalPath.ToString();
    CloudBlockBlob oBlob = BiActionscontainer.GetBlockBlobReference(sFileName.Replace("/" + Container + "/", ""));

    var ms = new MemoryStream();
    BlobRequestOptions f = new BlobRequestOptions();
    await oBlob.DownloadToStreamAsync(ms);

    ms.Position = 0;
    lock (lockObject)
    {
         using (var file = new FileStream(ResultPath, FileMode.Append, FileAccess.Write))
         {
              await ms.CopyToAsync(file);
         }
    }
});

An alternative implementation would use ActionBlock from TPL Dataflow. It knows how do all that's needed here, you just need to set it up:

public static Task ForEachAsync<T>(
    this IEnumerable<T> items, Func<T, Task> action, int maxDegreeOfParallelism)
{
    var block = new ActionBlock<T>(
        action,
        new ExecutionDataflowBlockOptions
        {
            MaxDegreeOfParallelism = maxDegreeOfParallelism
        });

    foreach (var item in items)
    {
        block.Post(item);
    }

    block.Complete();
    return block.Completion;
}

Upvotes: 2

Related Questions