raysefo
raysefo

Reputation: 472

How to handle Parallel.ForEachAsync completed tasks if there is an error?

I am working on a worker service. In this worker service, I am calling a web API of mine that calls an external API. My web API is an asp.net core web API saves requests and responses into database. This external web API returns online game codes.(I am sending how many game codes and for which game.) In a single request there are maximum of 10 codes. The user enters the total amount of online codes and the game number from a blazor server web application. Behind the scenes, this total amount of codes are divided into 20 codes each and are saved into database. The reason why it is saved as 20 is because I am sending 2 parallel tasks to the web API in Parallel.ForEachAsync in my worker service. This worker service checks the database if there is a record which status = 0, it gets this record and starts the process.

For example;

Below are 20 coupon requests waiting to be processed in the screenshot. When the worker service receives this record (status = 0) from the database and sends it to my ASP.NET Core web API, then the external API is called 10 by 10 from my API. (Because I want the maximum number of game codes to receive in 1 request.) Therefore, in this scenario, 2 requests will be sent parallel, and 2 responses will be returned. The reason why I make 2 tasks with Parallel.ForEachAsync in the worker is to speed up the process a little more because sometimes a total of 20.000 game codes are requested.

enter image description here

Here is a portion of worker service where I am calling my API. If the responses are a success, I am updating the status = 1 to the database table above saying that this job is completed. In this particular scenario, if both threads/tasks return success there is no problem. The row in the database will be updated, status = 1. If both threads are failed again no problem. (still status = 0, worker service will try it again after a random time interval) But if one thread/task is a success and the other one is failed, the update data method still updates the record which is wrong logic. 1 successful response was received from the external API and saved in the database, but the update method in the worker service updated the record as status=1. Only 10 game codes have been received. Since there is no logic for checking each task's status, this is causing a problem because only one task ran successfully so only 10 codes were returned. But the database has been updated for 20 codes due to the fact that the task states are not checked separately.

var num = 20;
var firstNum = 10;
var secondNum = 10;

if (num < 20)
{
    firstNum = (num + 1) / 2;
    secondNum = num - firstNum;
}

var quantities = new List<int> { firstNum, secondNum };
var cts = new CancellationTokenSource();
ParallelOptions parallelOptions = new()
{
    MaxDegreeOfParallelism = 2,
    CancellationToken = cts.Token
};
try
{
    await Parallel.ForEachAsync(quantities, parallelOptions, 
       async (quantity, ct) =>
    {
        var content = new FormUrlEncodedContent(new[]
        {
            new KeyValuePair<string, string>("productCode", productCode),
            new KeyValuePair<string, string>("quantity", quantity.ToString()),
            new KeyValuePair<string, string>("clientTrxRef", bulkId.ToString())
        });
        try
        {
            using var response =
                   await httpClient.PostAsync(_configuration["Razer:Production"], 
       content, ct);

            if ((int)response.StatusCode == 200)
            {
                var coupon = await response.Content.ReadFromJsonAsync<Root>(cancellationToken: ct);

                await UpdateData(id);
            }
            else
            {
                //logging
            }
        }
        catch (HttpRequestException ex)
        {
            //logging
        }
                           
    });
}
catch (OperationCanceledException ex)
{
    //logging
}

Is there a way to get the statuses of each task so that I can build logic for my problem?

Upvotes: 1

Views: 576

Answers (1)

Michal Diviš
Michal Diviš

Reputation: 2206

I think you might resolve the problem by doing all the work in a transaction of sorts. Basically, retreive all the data first and only if all the requests succeed, save all the data to the database. Otherwise, exit and save nothing.

Does this help?

private async Task RetreiveAndSaveCouponsAsync()
{
    // code was ommited here because you haven't shown all of your solution

    var quantities = new List<int> { 10, 10 };

    // create a task that retreives coupones for each quantity
    var tasks = quantities
        .Select(quantity => GetCodesAsync("someProductCode", quantity, 123, ct))
        .ToList();

    try
    {
        // await all the tasks
        await Task.WhenAll(tasks);
    }
    catch
    {
        // extract the exceptions from the tasks
        var exceptions = tasks
            .Where(x => x.Exception is not null)
            .Select(x => x.Exception);

        // log exceptions
        foreach (var ex in exceptions)
        {
            _logger.LogError(ex);
        }

        return;
    }

    // check if any of them failed
    if (tasks.Any(x => x.Result is null))
    {
        // one or more tasks failed, log error and don't save anything into db
        _logger.LogError("one or more tasks failed");
        return;
    }

    // extract the coupons from the tasks
    IEnumerable<Root> coupons = tasks.Select(x => x.Result).Cast<Root>();

    // save all coupons to the database
    foreach (var coupon in coupons)
    {
        await SaveCouponToDatabaseAsync(coupon);
    }
}

private async Task<Root?> GetCodesAsync(string productCode, int quantity, int bulkId, CancellationToken ct)
{
    var httpClient = new HttpClient(); // realistically, you'd get the HttpClient from IHttpClientFactory here

    var content = new FormUrlEncodedContent(new[]
            {
            new KeyValuePair<string, string>("productCode", productCode),
            new KeyValuePair<string, string>("quantity", quantity.ToString()),
            new KeyValuePair<string, string>("clientTrxRef", bulkId.ToString())
        });

    using var response = await httpClient.PostAsync(_configuration["Razer:ProductionMock"], content, ct);

    if ((int)response.StatusCode == 200)
    {
        var coupon = await response.Content.ReadFromJsonAsync<Root>(cancellationToken: ct);
        _logger.LogInformation("REFERENCE ID: {referenceId}", coupon.ReferenceId);
        return coupon;
    }

    _logger.LogError("Purchase ServiceError: {statusCode}", (int)response.StatusCode);
    return null; // I'm returning null in case the request fail, you should decide how to handle this problem in your app
}

Upvotes: 1

Related Questions