Mahmoud Samy
Mahmoud Samy

Reputation: 2852

Multithreaded Uploader

I want to upload huge files (between 2 to 40 GB) to Azure Blob Storage.

First I successfully split each file to chunks (each chunk = 2 MB). Then I upload chunks one by one, after each chunk is uploaded successfully I update a temp file to be able to resume the upload in case the application is closed.

Now I want to make the uploading operation Multithreaded. After reviewing the TPL I got confused where to start!

any guidance where to start with TPL?

void Upload(int segmentId)
{
    try
    {
        string blockId = GetBlockId(segmentId);
        var segment = GetSegment(FilePath, segmentId, SeqmentSize);
        var md5Hash = CalcMd5Hash(segment);

        var blob = new CloudBlockBlob(_link.Uri);
        using (var memoryStream = new MemoryStream(segment))
        {
            blob.PutBlock(blockId, memoryStream, md5Hash);
        }

        SerializeStatus();
    }
    catch (Exception exception)
    {
        ...
    }
}

Upvotes: 2

Views: 2483

Answers (1)

Gaurav Mantri
Gaurav Mantri

Reputation: 136369

I built something similar long time back (though I used async approach rather than TPL) where I wanted to upload really large blobs with resumable capability. Here's what I did :

  1. First based on the block size, I split the file into chunks. Each chunk is assigned an id. I then created an object which hold the chunk id and the status of that chunk. For simplicity, I kept the following statuses for a chunk - NotStarted, Successful, and Failed.
  2. I then created a collection of these chunks and serialized that data into a file.
  3. Based on the number of parallel threads (let's say x), I fetch x items from the collection where status is NotStarted. Then I processed these chunks in parallel. I passed the chunk id as user state so that when I get a call back, based on the upload status, I update the collection accordingly and serialize the data back.
  4. Once all chunks have been uploaded, I checked if there were any failed chunks. If there were any, I retry those chunks.
  5. Once all chunks are successfully completed, I simply create a block list from the chunks collection and commit that block list. If the commit block list operation is successful, I simply deleted that file containing chunks data.

Hope this helps.

Update

Do take a look at this pseudo code and see if this helps you:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace UploadLargeBlob
{
    class Program
    {
        static void Main(string[] args)
        {
            List<ChunkInformation> chunksToUpload = new List<ChunkInformation>();
            CreateChunkCollection("MyVeryLargeFile", 2*1024*1024);
            int numberOfParallelThreads = 8;
            do
            {
                var chunksToProcess = chunksToUpload.Where(c => c.Status == ChunkStatus.NotStarted || c.Status == ChunkStatus.Failed).Take(numberOfParallelThreads);
                if (chunksToProcess.Count() == 0)
                {
                    break;
                }
                List<Task> tasks = new List<Task>();
                try
                {
                    foreach (var chunk in chunksToProcess)
                    {
                        tasks.Add(Task.Factory.StartNew(() =>
                            {
                                DoUpload(chunk);
                            }, chunk));
                    }
                    Task.WaitAll(tasks.ToArray());
                }
                catch (AggregateException excep)
                {
                    foreach (var task in tasks)
                    {
                        if (task.Exception != null)
                        {
                            ChunkInformation chunk = task.AsyncState as ChunkInformation;
                            chunk.Status = ChunkStatus.Failed;
                            //Now serialize the data.
                        }
                    }
                }
            }
            while (true);
        }

        static void DoUpload(ChunkInformation chunk)
        {
            //Do the actual upload

            //Update chunk status once chunk is uploaded
            chunk.Status = ChunkStatus.Successful;

            //Serialize the data.
        }

        static void CreateChunkCollection(string fileName, int chunkSize)
        {
        }
    }

    public class ChunkInformation
    {
        public string Id
        {
            get;
            set;
        }

        public ChunkStatus Status
        {
            get;
            set;
        }
    }

    public enum ChunkStatus
    {
        NotStarted,
        Successful,
        Failed
    }
}

Upvotes: 3

Related Questions