Reputation: 2965
I'm trying to crawl several URLs concurrently. Each request may add more URLs to the ConcurrentBag
for crawling. At the moment I've got a nasty while(true) that starts a new Parallel.ForEach
to process any new URLs.
Is there any way I can add to the contents of the ConcurrentBag
so the Parallel.ForEach
will see there are new items in it and continue iterating through these new items?
ConcurrentBag<LinkObject> URLSToCheck = new ConcurrentBag<LinkObject>();
while (true)
{
Parallel.ForEach(URLSToCheck, new ParallelOptions { MaxDegreeOfParallelism = 5 }, URL =>
{
Checker Checker = new Checker();
URLDownloadResult result = Checker.downloadFullURL(URL.destinationURL);
List<LinkObject> URLsToAdd = Checker.findInternalUrls(URL.sourceURL, result.html);
foreach (var URLToAdd in URLsToAdd)
{
URLSToCheck.Add(new LinkObject { sourceURL = URLToAdd.sourceURL, destinationURL = URLToAdd.destinationURL });
}
});
if(URLSToCheck.Count == 0)break;
}
Upvotes: 4
Views: 2163
Reputation: 15941
You can take a look at BlockingCollection.
The BlockingCollection provides an implementation of the producer/consumer pattern: your producer will add to the blocking collection and your Parallel.ForEach will consume from the collection.
To do so you will have to implement a custom partitioner for the BlockingCollection (the reason is explained here: https://blogs.msdn.microsoft.com/pfxteam/2010/04/06/parallelextensionsextras-tour-4-blockingcollectionextensions/)
Partitioner:
class BlockingCollectionPartitioner<T> : Partitioner<T>
{
private BlockingCollection<T> _collection;
internal BlockingCollectionPartitioner(BlockingCollection<T> collection)
{
if (collection == null)
throw new ArgumentNullException("collection");
_collection = collection;
}
public override bool SupportsDynamicPartitions
{
get { return true; }
}
public override IList<IEnumerator<T>> GetPartitions(int partitionCount)
{
if (partitionCount < 1)
throw new ArgumentOutOfRangeException("partitionCount");
var dynamicPartitioner = GetDynamicPartitions();
return Enumerable.Range(0, partitionCount).Select(_ => dynamicPartitioner.GetEnumerator()).ToArray();
}
public override IEnumerable<T> GetDynamicPartitions()
{
return _collection.GetConsumingEnumerable();
}
}
Then you will use it like:
BlockingCollection<LinkObject> URLSToCheck = new BlockingCollection<LinkObject>();
Parallel.ForEach(
new BlockingCollectionPartitioner<LinkObject>(URLSToCheck),
new ParallelOptions { MaxDegreeOfParallelism = 5 }, URL =>
{
//....
});
in another thread you will add to the URLSToCheck collection:
URLSToCheck.Add(...)
when you finish the urls to process, you call URLSToCheck.CompleteAdding()
and the Parallel.ForEach
should automatically stop.
Upvotes: 5
Reputation: 9365
DataFlow can be handy here. With ActionBlock
it can be done nicely:
// Capture the variable, so it can be used in the next block
ActionBlock<LinkObject> = actionBlock = null;
actionBlock = new ActionBlock<LinkObject>(URL =>
{
Checker Checker = new Checker();
URLDownloadResult result = Checker.downloadFullURL(URL.destinationURL);
List<LinkObject> URLsToAdd = Checker.findInternalUrls(URL.sourceURL, result.html);
URLsToAdd.ForEach(actionBlock.Post)
},new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 5});
And then add to the actionBlock
your initial urls:
actionBlock.Post(url1);
actionBlock.Post(url2);
...
Upvotes: 2