dotnetster
dotnetster

Reputation: 1611

async i/o and process results as they become available

I has a simple console app where I want to call many Urls in a loop and put the result in a database table. I am using .Net 4.5 and using async i/o to fetch the URL data. Here is a simplified version of what I am doing. All methods are async except for the database operation. Do you guys see any issues with this? Are there better ways of optimizing?

   private async Task Run(){
        var items = repo.GetItems(); // sync method to get list from database
        var tasks = new List<Task>();

        // add each call to task list and process result as it becomes available 
        // rather than waiting for all downloads
        foreach(Item item in items){
            tasks.Add(GetFromWeb(item.url).ContinueWith(response => { AddToDatabase(response.Result);}));
        }
        await Task.WhenAll(tasks); // wait for all tasks to complete.
    }

    private async Task<string> GetFromWeb(url) {
       HttpResponseMessage response = await GetAsync(url);
       return await response.Content.ReadAsStringAsync();
    }

    private void AddToDatabase(string item){
        // add data to database.
    }

Upvotes: 0

Views: 230

Answers (3)

Aron
Aron

Reputation: 15772

Just though I'd throw in my hat as well with the Rx solution

using System.Reactive;
using System.Reactive.Linq;
private Task Run()
{
    var fromWebObservable = from item in repo.GetItems.ToObservable(Scheduler.Default)
                            select GetFromWeb(item.url);

    fromWebObservable
                    .Select(async x => await x)
        .Do(AddToDatabase)
        .ToTask();

}

Upvotes: 0

Servy
Servy

Reputation: 203825

Your solution is pretty much correct, with just two minor mistakes (both of which cause compiler errors). First, you don't call ContinueWith on the result of List.Add, you need call continue with on the task and then add the continuation to your list, this is solved by just moving a parenthesis. You also need to call Result on the reponse Task.

Here is the section with the two minor changes:

tasks.Add(GetFromWeb(item.url)
    .ContinueWith(response => { AddToDatabase(response.Result);}));

Another option is to leverage a method that takes a sequence of tasks and orders them by the order that they are completed. Here is my implementation of such a method:

public static IEnumerable<Task<T>> Order<T>(this IEnumerable<Task<T>> tasks)
{
    var taskList = tasks.ToList();

    var taskSources = new BlockingCollection<TaskCompletionSource<T>>();

    var taskSourceList = new List<TaskCompletionSource<T>>(taskList.Count);
    foreach (var task in taskList)
    {
        var newSource = new TaskCompletionSource<T>();
        taskSources.Add(newSource);
        taskSourceList.Add(newSource);

        task.ContinueWith(t =>
        {
            var source = taskSources.Take();

            if (t.IsCanceled)
                source.TrySetCanceled();
            else if (t.IsFaulted)
                source.TrySetException(t.Exception.InnerExceptions);
            else if (t.IsCompleted)
                source.TrySetResult(t.Result);
        }, CancellationToken.None, TaskContinuationOptions.PreferFairness, TaskScheduler.Default);
    }

    return taskSourceList.Select(tcs => tcs.Task);
}

Using this your code can become:

private async Task Run()
{
    IEnumerable<Item> items = repo.GetItems(); // sync method to get list from database

    foreach (var task in items.Select(item => GetFromWeb(item.url))
        .Order())
    {
        await task.ConfigureAwait(false);
        AddToDatabase(task.Result);
    }
}

Upvotes: 1

Stephen Cleary
Stephen Cleary

Reputation: 457207

Your solution is acceptable. But you should check out TPL Dataflow, which allows you to set up a dataflow "mesh" (or "pipeline") and then shove the data through it.

For a problem this simple, Dataflow won't really add much other than getting rid of the ContinueWith (I always find manual continuations awkward). But if you plan to add more steps or change your data flow in the future, Dataflow should be something you consider.

Upvotes: 1

Related Questions