Mr J
Mr J

Reputation: 2965

Add items to ConcurrentBag used to Parallel.ForEach c#

I'm trying to crawl several URLs concurrently. Each request may add more URLs to the ConcurrentBag for crawling. At the moment I've got a nasty while(true) that starts a new Parallel.ForEach to process any new URLs.

Is there any way I can add to the contents of the ConcurrentBag so the Parallel.ForEach will see there are new items in it and continue iterating through these new items?

ConcurrentBag<LinkObject> URLSToCheck = new ConcurrentBag<LinkObject>();

while (true)
{
    Parallel.ForEach(URLSToCheck, new ParallelOptions { MaxDegreeOfParallelism = 5 }, URL =>
    {
        Checker Checker = new Checker();

        URLDownloadResult result = Checker.downloadFullURL(URL.destinationURL);

        List<LinkObject> URLsToAdd = Checker.findInternalUrls(URL.sourceURL, result.html);

        foreach (var URLToAdd in URLsToAdd)
        {
            URLSToCheck.Add(new LinkObject { sourceURL = URLToAdd.sourceURL, destinationURL = URLToAdd.destinationURL });
        }
    });

    if(URLSToCheck.Count == 0)break;
}

Upvotes: 4

Views: 2163

Answers (2)

Alberto
Alberto

Reputation: 15941

You can take a look at BlockingCollection.

The BlockingCollection provides an implementation of the producer/consumer pattern: your producer will add to the blocking collection and your Parallel.ForEach will consume from the collection.

To do so you will have to implement a custom partitioner for the BlockingCollection (the reason is explained here: https://blogs.msdn.microsoft.com/pfxteam/2010/04/06/parallelextensionsextras-tour-4-blockingcollectionextensions/)

Partitioner:

class BlockingCollectionPartitioner<T> : Partitioner<T>
{
    private BlockingCollection<T> _collection;

    internal BlockingCollectionPartitioner(BlockingCollection<T> collection)
    {
        if (collection == null)
            throw new ArgumentNullException("collection");
        _collection = collection;
    }

    public override bool SupportsDynamicPartitions 
    {
        get { return true; }
    }

    public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
    {
        if (partitionCount < 1)
            throw new ArgumentOutOfRangeException("partitionCount");

        var dynamicPartitioner = GetDynamicPartitions();
        return Enumerable.Range(0, partitionCount).Select(_ => dynamicPartitioner.GetEnumerator()).ToArray();
    }

    public override IEnumerable<T> GetDynamicPartitions()
    {
        return _collection.GetConsumingEnumerable();
    }
}

Then you will use it like:

BlockingCollection<LinkObject> URLSToCheck = new BlockingCollection<LinkObject>();

Parallel.ForEach(
    new BlockingCollectionPartitioner<LinkObject>(URLSToCheck), 
    new ParallelOptions { MaxDegreeOfParallelism = 5 }, URL =>
       {
            //....
       });

in another thread you will add to the URLSToCheck collection:

URLSToCheck.Add(...)

when you finish the urls to process, you call URLSToCheck.CompleteAdding() and the Parallel.ForEach should automatically stop.

Upvotes: 5

Ofir Winegarten
Ofir Winegarten

Reputation: 9365

DataFlow can be handy here. With ActionBlock it can be done nicely:

// Capture the variable, so it can be used in the next block
ActionBlock<LinkObject> = actionBlock = null;

actionBlock = new ActionBlock<LinkObject>(URL =>
{
    Checker Checker = new Checker();
    URLDownloadResult result = Checker.downloadFullURL(URL.destinationURL);
    List<LinkObject> URLsToAdd = Checker.findInternalUrls(URL.sourceURL, result.html);
    URLsToAdd.ForEach(actionBlock.Post)
},new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 5});

And then add to the actionBlock your initial urls:

actionBlock.Post(url1);
actionBlock.Post(url2);
...

Upvotes: 2

Related Questions