Reputation: 2852
I want to upload huge files (between 2 to 40 GB) to Azure Blob Storage.
First I successfully split each file to chunks (each chunk = 2 MB). Then I upload chunks one by one, after each chunk is uploaded successfully I update a temp file to be able to resume the upload in case the application is closed.
Now I want to make the uploading operation Multithreaded. After reviewing the TPL I got confused where to start!
any guidance where to start with TPL?
void Upload(int segmentId)
{
try
{
string blockId = GetBlockId(segmentId);
var segment = GetSegment(FilePath, segmentId, SeqmentSize);
var md5Hash = CalcMd5Hash(segment);
var blob = new CloudBlockBlob(_link.Uri);
using (var memoryStream = new MemoryStream(segment))
{
blob.PutBlock(blockId, memoryStream, md5Hash);
}
SerializeStatus();
}
catch (Exception exception)
{
...
}
}
Upvotes: 2
Views: 2483
Reputation: 136369
I built something similar long time back (though I used async approach rather than TPL) where I wanted to upload really large blobs with resumable capability
. Here's what I did :
NotStarted
, Successful
, and Failed
.NotStarted
. Then I processed these chunks in parallel. I passed the chunk id as user state so that when I get a call back, based on the upload status, I update the collection accordingly and serialize the data back.Hope this helps.
Update
Do take a look at this pseudo code and see if this helps you:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace UploadLargeBlob
{
class Program
{
static void Main(string[] args)
{
List<ChunkInformation> chunksToUpload = new List<ChunkInformation>();
CreateChunkCollection("MyVeryLargeFile", 2*1024*1024);
int numberOfParallelThreads = 8;
do
{
var chunksToProcess = chunksToUpload.Where(c => c.Status == ChunkStatus.NotStarted || c.Status == ChunkStatus.Failed).Take(numberOfParallelThreads);
if (chunksToProcess.Count() == 0)
{
break;
}
List<Task> tasks = new List<Task>();
try
{
foreach (var chunk in chunksToProcess)
{
tasks.Add(Task.Factory.StartNew(() =>
{
DoUpload(chunk);
}, chunk));
}
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException excep)
{
foreach (var task in tasks)
{
if (task.Exception != null)
{
ChunkInformation chunk = task.AsyncState as ChunkInformation;
chunk.Status = ChunkStatus.Failed;
//Now serialize the data.
}
}
}
}
while (true);
}
static void DoUpload(ChunkInformation chunk)
{
//Do the actual upload
//Update chunk status once chunk is uploaded
chunk.Status = ChunkStatus.Successful;
//Serialize the data.
}
static void CreateChunkCollection(string fileName, int chunkSize)
{
}
}
public class ChunkInformation
{
public string Id
{
get;
set;
}
public ChunkStatus Status
{
get;
set;
}
}
public enum ChunkStatus
{
NotStarted,
Successful,
Failed
}
}
Upvotes: 3