Chris Wright
Chris Wright

Reputation: 301

Use multiple tasks to retrieve all records from a large collection

I am working on an application which calls an external service and has to add all entries of the external collection into a local collection. The problem currently is that the external collection can exceed 1000 records, but the returned search results can only include up to twenty items.

For the sake of speed I figured using a collection of Tasks would be the way forward, so I came up with the code below:

int totalCount = returnedCol.total_count;
while (totalCount > myDict.Count)
{
    int numberOfTasks = // logic to calculate how many tasks to run

    List<Task> taskList = new List<Task>();

    for (int i = 1; i <= numberOfTasks; i++)
    {
        Interlocked.Add(ref pageNumber, pageSize);

        Task<SearchResponse> testTask = Task.Run(() =>
        {
            return ExternalCall.GetData(pageNumber, pageSize);
        });

        Thread.Sleep(100);

        taskList.Add(testTask);
        testTask.ContinueWith(o =>
        {
            foreach (ExternalDataRecord dataiwant in testTask.Result.dataiwant)
            {
                if (!myDict.ContainsKey(dataiwant.id))
                    myDict.GetOrAdd(dataiwant.id, dataiwant);
            }
        });
    }
    Task.WaitAll(taskList.ToArray());
}

However, this does not yield all results. The pageNumber variable is incrementing correctly each time, but it seems that not all task results are being analysed (as the same logic on a single thread on a smaller data set returns all expected results). Also, I have tried declaring individual tasks in a chain (rather than a loop) and the test data is all returned. It seems that the higher the value I pass into Thread.Sleep() the more the results are added into the local collection (but this isn't ideal, as it means the process takes longer!)

Currently in a sample of 600 records I'm only getting about 150-200 added to the myDict collection. Am I missing something obvious?

Upvotes: 10

Views: 2853

Answers (2)

Berin Loritsch
Berin Loritsch

Reputation: 11463

You're missing the fact that ContinueWith() results in another task and you aren't adding that your your taskList.

A better approach would be to use the async/await available since .NET 4.5. It provides a less heavy approach to the solution.

You would change the algorithm to be more like this:

public async Task Process()
{
    int totalCount = returnedCol.total_count;

    while (totalCount > myDict.Count)
    {
        int numberOfTasks = // logic to calculate how many tasks to run

        List<Task> taskList = new List<Task>();

        for (int i = 1; i <= numberOfTasks; i++)
        {
            Interlocked.Add(ref pageNumber, pageSize);

            taskList.Add(ProcessPage(pageNumber, pageSize));
        }

        await Task.WhenAll(taskList.ToArray());
    }
 }

 private async Task ProcessPage(int pageNumber, int pageSize)
 {
       SearchResponse result = await Task.Run(() => 
           ExternalCall.GetData(pageNumber, pageSize)).ConfigureAwait(false);

       foreach (ExternalDataRecord dataiwant in result.dataiwant)
       {
           myDict.GetOrAdd(dataiwant.id, dataiwant);
       }
 }

The async keyword tells the compiler that there will be an await later on. await essentially handles the details around your ContinueWith call. If you really want the ExternalCall to happen in another task, then you would simply await the results from that call.

Upvotes: 1

StriplingWarrior
StriplingWarrior

Reputation: 156524

I think if you take a more functional and less imperative approach to your code, you'll be a lot less likely to run into hard-to-understand issues. I think something like this would have the same effect you're going for:

int totalCount = returnedCol.total_count;
var tasks = Enumerable.Range(1, totalCount / pageSize)
    .Select(async page => {
        await Task.Delay(page * 100);
        return ExternalCall.GetData(page, pageSize));
    })
    .ToArray();
myDict = (await Task.WhenAll(tasks))
    .ToDictionary(dataiwant => dataiwant.id);

The above code assumes you still want to wait 100ms between requests for throttling purposes. If you just had that Thread.Sleep() there to try fixing issues you were having, you could further simplify it:

int totalCount = returnedCol.total_count;
var tasks = Enumerable.Range(1, totalCount / pageSize)
    .Select(async page => await Task.Run(() => ExternalCall.GetData(page, pageSize)))
    .ToArray();
myDict = (await Task.WhenAll(tasks))
    .ToDictionary(dataiwant => dataiwant.id);

Upvotes: 2

Related Questions