RegularNormalDayGuy
RegularNormalDayGuy

Reputation: 735

C# multi-threaded foreach loop

I've recently started working on multi-threading calls with C# and I'm unsure whether it's correct or not.

How can I make this go faster ? I'm guessing it's with Parallelism, but I have not been successful in integrating that concept into this.

Edits

Please note this is running in a distant VM and it's a console program; meaning user experience is not an issue. I just want this to run fast, since number of links may go up to 200k elements and we want results as soon as possible. I also removed all questions but one, since it's the one I would like some help.

Here is my code which seems to work:

// Use of my results
public void Main() 
{
  var results = ValidateInternalLinks();
  // Writes results to txt file
  WriteResults(results.Result, "Internal Links");
}

// Validation of data
public async Task<List<InternalLinksModel>> ValidateInternalLinks() 
{
  var tasks = new List<Task>();
  var InternalLinks = new List<InternalLinksModel>();
  // Populate InternalLinks with the data

  foreach (var internalLink in InternalLinks)
  {
    tasks.Add(GetResults(internalLink));
  }

  await Task.WhenAll(tasks);

  return InternalLinks;
}

// Get Results for each piece of data
public async Task GetResults(InternalLinksModel internalLink)
{ 
  var response = await SearchValue(internalLink.SearchValue);
  
// Analyse response and change result (possible values: SUCCESS, FAILED, [])
  internalLink.PossibleResults = ValidateSearchResult(response);
}

// Http Request
public async Task<ResponseModel> SearchValue(string value) 
{
  // RestSharp API creation and headers addition
  var response = await client.ExecuteTaskAsync(request);

  return JsonConvert.DeserializeObject<ResponseModel>(response.Content);
}

Upvotes: 0

Views: 668

Answers (2)

Yuli Bonner
Yuli Bonner

Reputation: 1269

async/await/WhenAll is the correct way to go, your performance bottleneck is likely I/O bound (HTTP requests) not compute bound. Asynchrony is appropriate tool to handle this. How many HTTP requests are you making and are they all to the same server? If so, you may be hitting a connection limit. I'm not very familiar with RestSharp, but you might try increasing the connection limit via ServicePointManager. The more outstanding requests you have, assuming the server can handle them, the faster the WhenAll will complete.

https://learn.microsoft.com/en-us/dotnet/api/system.net.servicepointmanager?view=netframework-4.8

All of that said, I would reorganize your code. Use Task/WhenAll for your HTTP requests. And process the responses after the WhenAll completes. If you do this you can determine with certainty if the HTTP requests are where the bottleneck is, by setting a breakpoint after the WhenAll observing the execution times. If you can't debug, you can log the execution time. This should give you an idea if the bottleneck is primarily network I/O. I'm pretty confident it is.

If it turns out that there is a compute bottleneck, you can use a Parallel.ForEach loop to deserialize, validate, and assign.

            var internalLinks = new List<InternalLinksModel>();
            // Populate InternalLinks with the data
            // I'm assuming this means internalLinks is assumed to contain data. If not I'm not sure I understand your code.
            var dictionary = new Dictionary<Task, InternalLinksModel>(); //You shouldn't need a concurrent dictionary since you'll only be doing reads in parallel.

            //make api calls - I/O bound
            foreach (var l in internalLinks)
            {
                dictionary[client.ExecuteTaskAsync(l.SearchValue)] = l;
            }

            await Task.WhenAll(dictionary.Keys);    
            // I/O is done.

            // Compute bound - deserialize, validate, assign.
            Parallel.ForEach(dictionary.Keys, (task) =>
            {
                var responseModel = JsonConvert.DeserializeObject<ResponseModel>(task.Result.Content);
                dictionary[task].PossibleResults = ValidateSearchResult(responseModel);
            });


            // Writes results to txt file
            WriteResults(dictionary.Values, "Internal Links");

Upvotes: 1

Theodor Zoulias
Theodor Zoulias

Reputation: 43515

It seems that you have a series of I/O bound and CPU bound jobs that you need to do the one after the other, with varying degree of concurrency required for each step. A good tool for dealing with that kind of workloads is the TPL Dataflow library. This library is designed in a way that allows forming pipelines (or even complex networks) of data that flows from one block to the next. I tried to come up with an example that demonstrates using this library, and then realized that your workflow includes a last step where a property must be updated (internalLink.PossibleResults) that belongs to the first type of item entering the pipeline. This complicates things quite a lot, because it implies that the first type must be carried along all the steps of the pipeline. The easiest way to do this would probably be to use ValueTuples as input and output of the blocks. This would make my example too messy though, so I preferred to keep it in its simplest form, since its purpose is mainly to demonstrate the capabilities of the TPL Dataflow library:

var cts = new CancellationTokenSource();
var restClient = new RestClient();

var block1 = new TransformBlock<InternalLinksModel, RestResponse>(async item =>
{
    return await restClient.ExecuteTaskAsync(item);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 10, // 10 concurrent REST requests max
    CancellationToken = cts.Token, // Cancel at any time
});

var block2 = new TransformBlock<RestResponse, ResponseModel>(item =>
{
    return JsonConvert.DeserializeObject<ResponseModel>(item.Content);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 2, // 2 threads max for this CPU bound job
    CancellationToken = cts.Token, // Cancel at any time
});

var block3 = new TransformBlock<ResponseModel, string>(async item =>
{
    return await SearchValue(item);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 10, // Concurrency 10 for this I/O bound job
    CancellationToken = cts.Token, // Cancel at any time
});

var block4 = new ActionBlock<string>(item =>
{
    ValidateSearchResult(item);
}, new ExecutionDataflowBlockOptions()
{
    MaxDegreeOfParallelism = 1, // 1 thread max for this CPU bound job
    CancellationToken = cts.Token, // Cancel at any time
});

block1.LinkTo(block2, new DataflowLinkOptions() { PropagateCompletion = true });
block2.LinkTo(block3, new DataflowLinkOptions() { PropagateCompletion = true });
block3.LinkTo(block4, new DataflowLinkOptions() { PropagateCompletion = true });

var internalLinks = new List<InternalLinksModel>();
// Populate internalLinks with the data
foreach (var internalLink in internalLinks)
{
    await block1.SendAsync(internalLink);
}
block1.Complete();

await block4.Completion;

Two types of blocks are used in this example, TransformBlock and ActionBlock. An ActionBlock is usually the last block of a pipeline, since it doesn't produce any output. In case your workload is too granular, and the overhead of passing the objects around is comparable with the workload itself, you could start the pipeline with a BatchBlock, and then process the next steps in batches of, say, 10 elements each. It doesn't seem that this is required in your case though, since making web requests and parsing JSON responses are pretty bulky jobs.

Upvotes: 2

Related Questions