Silvestre
Silvestre

Reputation: 854

Best way to make send bulk emails parallel

I am new to TPL (Task Parallel Library) and I'm having a hard time trying to configure my process to run tasks in parallel.

I'm working on an application to send bulk emails (like thousands per minute, that's the idea), but when I see the processors performance, it's not good: I'm pretty sure there's a lot of overhead cause I'm not using the Task library properly.

Here's my code:

public async void MainProcess()
{
    var batches = emailsToProcess.Batch(CONST_BATCHES_SIZE);
    
    foreach (var batch in batches.AsParallel()
        .WithDegreeOfParallelism(Environment.ProcessorCount))
    {
         await Task.WhenAll(from emailToProcess in batch 
                    select ProcessSingleEmail(emailToProcess));
        _emailsToProcessRepository.MarkBatchAsProcessed(batch);
    }
}

private async Task ProcessSingleEmail(EmailToProcess emailToProcess)
{
    try
    {
        MailMessage mail = GetMail(emailToProcess); //static light method
        await _smtpClient.SendAsync(sendGridMail);
        emailToProcess.Processed = true;
    }
    catch (Exception e)
    {
        _logger.Error(ErrorHelper.GetExceptionMessage(e, 
                    string.Format("Error sending Email ID #{0} : ", 
                    emailToProcess.Id)), e);
    }
}

(I know it might look awful: please feel free to roast me ☺)

I need it to behave this way: I need to process a number of records in a batch (btw, I'm using a library that allows me to use the "Batch" method), cause I need to mark a batch of records as processed in the database as the process completes sending them.

The process is actually doing what I want: except is slow as hell. And as you can see in the perfmon, the processors are not working at a very high capacity:

enter image description here

What's the best way to do this? Any advice?

EDIT: I realize that what I have is an overhead problem. Is there any tool or easy way to detect and correct them?

Upvotes: 3

Views: 5023

Answers (1)

NeddySpaghetti
NeddySpaghetti

Reputation: 13495

What you are doing is not CPU bound but I/O bound so using limiting the number of concurrent tasks to the number if processors is likely impacting your performance. Try starting more tasks in parallel.

For example the code below will process all emails asynchronously but limit to 100 emails in parallel. It uses a ForEachAsync extension method to do the processing, the method allows to limit the degree of parallelism with a parameter so I would try and experiment with making that parameter larger.

You may also want to make the MarkBatchAsProcessed method asynchronous if possible as that is likely to limit performance as well.

public static class Extensions
{
    public static async Task ExecuteInPartition<T>(IEnumerator<T> partition, Func<T, Task> body)
    {
        using (partition)
            while (partition.MoveNext())
                await body(partition.Current);
    }

    public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
    {
        return Task.WhenAll(
            from partition in Partitioner.Create(source).GetPartitions(dop)
            select ExecuteInPartition(partition, body));
    }
}

public Task MainProcess()
{
    // Process 100 emails at a time
    return emailsToProcess.ForEachAsync(100, async (m) =>
    {
        await ProcessSingleEmail(m);                
    });

    _emailsToProcessRepository.MarkBatchAsProcessed(emailsToProcess);
}

You should also avoid using void returning async methods, they don't propagate exceptions and cannot be composed or awaited and their use is mostly for event handlers, so I changed MainProcess to return Task.

Update

The number 100 in the code above means that at any one time there will be a maximum of 100 concurrent tasks, so it's more like a sliding window rather than a batch. If you wanted to process the emails in batches, you can do something like this(assuming batches have a Count property:

public async Task MainProcess()
{
    var batches = emailsToProcess.Batch(CONST_BATCHES_SIZE);

    foreach (var batch in batches)
    {
         return batch.ForEachAsync(batch.Count, async (m) =>
         {
             await ProcessSingleEmail(m);                
         });

       _emailsToProcessRepository.MarkBatchAsProcessed(batch);             
    }
}

Upvotes: 8

Related Questions