cubesnyc
cubesnyc

Reputation: 1545

Looping Async Task List in C#

I am trying to parse data from several websites continuously. I would like this action to be preformed individually in a loop in an asynchronous manner until the program is closed. I am not sure what the structure should be for this kind of logic.

Right now I am following this pattern.

async public void ParseAll(List<Site> SiteList)
{
    List<Task> TaskList = new List<Task>();

    foreach(Site s in SiteList)
    {
        TaskList.Add(s.ParseData);
    }

    await Task.WhenAll(TaskList)
}

The issue is that if I construct a Loop around this method then the sites that are updated first will have to wait until the whole list is finished before the method can run again. Theoretically, what I would like to do is just put each site back on the bottom of the TaskList when it finished its ParseData method but I am not sure if thats possible, or if thats the best way.

Upvotes: 2

Views: 5896

Answers (4)

noseratio
noseratio

Reputation: 61656

Theoretically, what I would like to do is just put each site back on the bottom of the TaskList when it finished its ParseData

Looks like you need to maintain a queue of sites to be processed. Below is my take on this, using SemaphoreSlim. This way you can also limit the number of concurrent tasks to be less than the actual number of sites, or add new sites on-the-fly. A CancellationToken is used to stop the processing from outside. The use of async void is justified here IMO, QueueSiteAsync keeps track of the tasks it starts.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace AsyncLoop
{
    class Program
    {
        public class Site
        {
            public string Url { get; set; }
            public async Task ParseDataAsync(CancellationToken token)
            {
                // simulate download and parse
                int delay = new Random(Environment.TickCount).Next(100, 1000);
                await Task.Delay(delay, token);
                Console.WriteLine("Processed: #{0}, delay: {1}", this.Url, delay);
            }
        }

        object _lock = new Object();
        HashSet<Task> _pending = new HashSet<Task>(); // sites in progress
        SemaphoreSlim _semaphore;

        async void QueueSiteAsync(Site site, CancellationToken token)
        {
            Func<Task> processSiteAsync = async () =>
            {
                await _semaphore.WaitAsync(token).ConfigureAwait(false);
                try 
                {           
                    await site.ParseDataAsync(token);
                    QueueSiteAsync(site, token);
                }
                finally
                {
                    _semaphore.Release();
                }
            };

            var task = processSiteAsync();
            lock (_lock)
                _pending.Add(task);
            try
            {
                await task;
                lock (_lock)
                    _pending.Remove(task);
            }
            catch
            {
                if (!task.IsCanceled && !task.IsFaulted)
                    throw; // non-task error, re-throw

                // leave the faulted task in the pending list and exit
                // ProcessAllSites will pick it up
            }
        }

        public async Task ProcessAllSites(
            Site[] sites, int maxParallel, CancellationToken token)
        {
            _semaphore = new SemaphoreSlim(Math.Min(sites.Length, maxParallel));

            // start all sites
            foreach (var site in sites)
                QueueSiteAsync(site, token);

            // wait for cancellation
            try
            {
                await Task.Delay(Timeout.Infinite, token);
            }
            catch (OperationCanceledException)
            {
            }

            // wait for pending tasks
            Task[] tasks;
            lock (_lock)
                tasks = _pending.ToArray();
            await Task.WhenAll(tasks);
        }

        // testing
        static void Main(string[] args)
        {
            // cancel processing in 10s
            var cts = new CancellationTokenSource(millisecondsDelay: 10000); 
            var sites = Enumerable.Range(0, count: 10).Select(i => 
                new Site { Url = i.ToString() });
            try
            {
                new Program().ProcessAllSites(
                    sites.ToArray(), 
                    maxParallel: 5, 
                    token: cts.Token).Wait();
            }
            catch (AggregateException ex)
            {
                foreach (var innerEx in ex.InnerExceptions)
                    Console.WriteLine(innerEx.Message);
            }
        }
    }
}

You may also want to separate download and parsing into separate pipelines, check this for more details.

Upvotes: 3

antlersoft
antlersoft

Reputation: 14786

If you want to visit the site again as soon as it is complete, you probably want to use Task.WhenAny and integrate your outer loop with your inner loop, something like this (assuming the ParseData function will return the Site it is parsing for):

async public void ParseAll(List<Site> SiteList)
{
    while (true)
    {
        List<Task<Site>> TaskList = new List<Task<Site>>();

        foreach(Site s in SiteList)
        {
            TaskList.Add(s.ParseData());
        }

        await Task.WhenAny(TaskList);
        TaskList = TaskList.Select(t => t.IsCompleted ? t.Result.ParseData() : t).ToList();
    }
}

Upvotes: 0

Servy
Servy

Reputation: 203802

It's easy enough to create a method to loop continuously and parse a single site over and over again. Once you have that method, you can call it once on each site in the list:

private async void ParseSite(Site s)
{
    while (true)
    {
        await s.ParseData();
    }
}

public void ParseAll(List<Site> siteList)
{
    foreach (var site in siteList)
    {
        ParseSite(site);
    }
}

Upvotes: 1

Venson
Venson

Reputation: 1870

Did you tried the PLinq lib?

Plinq allows you to execute linq querys async.

In your case it would look like:

SiteList.AsParallel().ForEach(s => s.ParseData);

Upvotes: -1

Related Questions