Reputation: 21
I'm trying to do a stable multi threading system (Use exact number of threads set)
Here's the code I'm actually using :
public void Start()
{
List<String> list = new List<String>(File.ReadAllLines("urls.txt"));
int maxThreads = 100;
var framework = new Sender();
ThreadPool.SetMinThreads(maxThreads, maxThreads);
Parallel.ForEach(list, new ParallelOptions { MaxDegreeOfParallelism = maxThreads }, delegate (string url)
{
framework.Send(url, "proxy:port");
});
Console.WriteLine("Done.");
}
It is fast and working, but it exceed 100 threads limit, wouldn't be a problem if the proxies I'm using where locked to 100 simultaneous connections, so a lot of requests get cancelled by my proxy provider, any idea of how I can keep that threads speed without exceeding limit?
Thanks.
Upvotes: 1
Views: 1584
Reputation: 2204
Your Framwork.Send method is returning immediately and processing asynchronously. To validate this, I created the following test method, which works as expected:
public static void Main()
{
List<String> list = new List<String>(Enumerable.Range(0,10000).Select(i=>i.ToString()));
int maxThreads = 100;
ThreadPool.SetMinThreads(maxThreads, maxThreads);
int currentCount = 0;
int maxCount = 0;
object locker = new object();
Parallel.ForEach(list, new ParallelOptions { MaxDegreeOfParallelism = maxThreads }, delegate (string url)
{
lock (locker)
{
currentCount++;
maxCount = Math.Max(currentCount, maxCount);
}
Thread.Sleep(10);
lock (locker)
{
maxCount = Math.Max(currentCount, maxCount);
currentCount--;
}
});
Console.WriteLine("Max Threads: " + maxCount); //Max Threads: 100
Console.Read();
}
Upvotes: 1
Reputation: 131774
Parallel.For/Foreach
are meant for data parallelism - processing a large number of data that doesn't need to perform IO. In this case there's no reason to use more threads than cores that can run them.
This question though is about network IO, concurrent connections and throttling. If the proxy provider has a limit, MaxDegreeOfParallelism
must be set to a value low enough that the limit isn't exceeded.
A better solution would be to use an ActionBlock with limited MaxDegreeOfParallelism and a limit to its input buffer so it doesn't get flooded with urls that await processing.
static async Task Main()
{
var maxConnections=20;
var options=new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = maxConnections,
BoundedCapacity = maxConnections * 2
};
var framework = new Sender();
var myBlock=new ActionBlock<string>(url=>
{
framework.Send(...);
}, options);
//ReadLines doesn't load everything, it returns an IEnumerable<string> that loads
//lines as needed
var lines = File.ReadLines("urls.txt");
foreach(var url in lines)
{
//Send each line to the block, waiting if the buffer is full
await myBlock.SendAsync(url);
}
//Tell the block we are done
myBlock.Complete();
//And wait until it finishes everything
await myBlock.Completion;
}
Setting the bounded capacity and MaxDegreeOfParallelism helps with concurrency limits, but not with request/sec limits. To limit that, one could add a small delay after each request. The block's code would have to change to eg :
var delay=250; // Milliseconds, 4 reqs/sec per connection
var myBlock=new ActionBlock<string>( async url=>
{
framework.Send(...);
await Task.Delay(delay);
}, options);
This can be improved further if Sender.Send
became an asynchronous method. It could use for example HttpClient which only provides asynchronous methods, so it doesn't block waiting for a response. The changes would be minimal :
var myBlock=new ActionBlock<string>( async url=>
{
await framework.SendAsync(...);
await Task.Delay(delay);
}, options);
But the program would use less threads and less CPU - each call to await ...
releases the current thread until a response is received.
Blocking a thread on the other hand stands with a spinwait which means it wastes CPU cycles waiting for a response before putting the thread to sleep.
Upvotes: 0