Reputation: 4249
My scenario: I need to process a list of elements. Each element processing is highly time consuming (1-10 seconds) Instead of a
List retval = new List();
foreach (item in myList)
retval.Add(ProcessItem(item));
return retval;
I want to parallel process each item.
I know .NET has got a number of approach for parallel processing: what is the best one? (note, I'm stuck to 3.5 framework version, cannot use Task, async and all nancy features coming with .Net 4...)
Here my try using delegates:
private void DoTest(int processingTaskDuration)
{
List<int> itemsToProcess = new List<int>();
for (int i = 1; i <= 20; i++)
itemsToProcess.Add(i);
TestClass tc = new TestClass(processingTaskDuration);
DateTime start = DateTime.Now;
List<int> result = tc.ProcessList(itemsToProcess);
TimeSpan elapsed = DateTime.Now - start;
System.Diagnostics.Debug.WriteLine(string.Format("elapsed (msec)= {0}", (int)elapsed.TotalMilliseconds));
}
public class TestClass
{
static int s_Counter = 0;
static object s_lockObject = new Object();
int m_TaskMsecDuration = 0;
public TestClass() :
this(5000)
{
}
public TestClass(int taskMsecDuration)
{
m_TaskMsecDuration = taskMsecDuration;
}
public int LongOperation(int itemToProcess)
{
int currentCounter = 0;
lock (s_lockObject)
{
s_Counter++;
currentCounter = s_Counter;
}
System.Diagnostics.Debug.WriteLine(string.Format("LongOperation\tStart\t{0}\t{1}\t{2}", currentCounter, System.Threading.Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("HH:mm:ss.ffffff")));
// time consuming task, e.g 5 seconds
Thread.Sleep(m_TaskMsecDuration);
int retval = itemToProcess * 2;
System.Diagnostics.Debug.WriteLine(string.Format("LongOperation\tEnd \t{0}\t{1}\t{2}", currentCounter, System.Threading.Thread.CurrentThread.ManagedThreadId, DateTime.Now.ToString("HH:mm:ss.ffffff")));
return retval;
}
delegate int LongOperationDelegate(int itemToProcess);
public List<int> ProcessList(List<int> itemsToProcess)
{
List<IAsyncResult> asyncResults = new List<IAsyncResult>();
LongOperationDelegate del = LongOperation;
foreach (int item in itemsToProcess)
{
IAsyncResult res = del.BeginInvoke(item, null, null);
asyncResults.Add(res);
}
// list of waitHandles to wait for
List<WaitHandle> waitHandles = new List<WaitHandle>();
asyncResults.ForEach(el => waitHandles.Add(el.AsyncWaitHandle));
// wait for processing every item
WaitHandle.WaitAll(waitHandles.ToArray());
// retrieve result of processing
List<int> retval = new List<int>();
asyncResults.ForEach(res =>
{
int singleProcessingResult = del.EndInvoke(res);
retval.Add(singleProcessingResult);
}
);
return retval;
}
}
And thats some output (column #3 is a progressive counter, use it to match start with end of a call, #4 is threadID and last is a timeStamp)
LongOperation Start 1 6 15:11:18.331619
LongOperation Start 2 12 15:11:18.331619
LongOperation Start 3 13 15:11:19.363722
LongOperation Start 4 14 15:11:19.895775
LongOperation Start 5 15 15:11:20.406826
LongOperation Start 6 16 15:11:21.407926
LongOperation Start 7 17 15:11:22.410026
LongOperation End 1 6 15:11:23.360121
LongOperation End 2 12 15:11:23.361122
LongOperation Start 8 12 15:11:23.363122
LongOperation Start 9 6 15:11:23.365122
LongOperation Start 10 18 15:11:23.907176
LongOperation End 3 13 15:11:24.365222
LongOperation Start 11 13 15:11:24.366222
LongOperation End 4 14 15:11:24.897275
LongOperation Start 12 14 15:11:24.898275
LongOperation Start 13 19 15:11:25.407326
LongOperation End 5 15 15:11:25.408326
LongOperation Start 14 15 15:11:25.412327
LongOperation Start 15 20 15:11:26.407426
LongOperation End 6 16 15:11:26.410426
LongOperation Start 16 16 15:11:26.410426
LongOperation Start 17 21 15:11:27.408526
LongOperation End 7 17 15:11:27.411527
LongOperation Start 18 17 15:11:27.413527
LongOperation End 8 12 15:11:28.365622
LongOperation Start 19 12 15:11:28.366622
LongOperation End 9 6 15:11:28.366622
LongOperation Start 20 6 15:11:28.389624
LongOperation End 10 18 15:11:28.908676
LongOperation End 11 13 15:11:29.367722
LongOperation End 12 14 15:11:29.899775
LongOperation End 13 19 15:11:30.411827
LongOperation End 14 15 15:11:30.413827
LongOperation End 15 20 15:11:31.407926
LongOperation End 16 16 15:11:31.411927
LongOperation End 17 21 15:11:32.413027
LongOperation End 18 17 15:11:32.416027
LongOperation End 19 12 15:11:33.389124
LongOperation End 20 6 15:11:33.391124
elapsed (msec)= 15075
So:
Is Delegate approach the right one?
Did I implement it right?
If so, why the 3rd operations starts one second after the first two (and so on)?
I mean, I'd like the whole processing complete in more or less the time of one single processing, but it seems the system uses thread pool in a strange way. After all, I'm asking 20 threads, and it waits to span the 3rd one just after the first two calls.
Upvotes: 0
Views: 1352
Reputation: 4249
I got rid of my third question:
If so, why the 3rd operations starts one second after the first two (and so on)?
The problem seems to be in the default way ThreadPool manages thread spawning: see http://msdn.microsoft.com/en-us/library/0ka9477y%28v=VS.90%29.aspx. Quote:
The thread pool has a built-in delay (half a second in the .NET Framework version 2.0) before starting new idle threads. If your application periodically starts many tasks in a short time, a small increase in the number of idle threads can produce a significant increase in throughput. Setting the number of idle threads too high consumes system resources needlessly.
It seems a call to ThreadPool.SetMinThreads with a proper value helps a lot. At the start of my ProcessList, I inserted a call to this method:
private void SetUpThreadPool(int numThreadDesired)
{
int currentWorkerThreads;
int currentCompletionPortThreads;
ThreadPool.GetMinThreads(out currentWorkerThreads, out currentCompletionPortThreads);
//System.Diagnostics.Debug.WriteLine(string.Format("ThreadPool.GetMinThreads: workerThreads = {0}, completionPortThreads = {1}", workerThreads, completionPortThreads));
const int MAXIMUM_VALUE_FOR_SET_MIN_THREAD_PARAM = 20;
int numMinThreadToSet = Math.Min(numThreadDesired, MAXIMUM_VALUE_FOR_SET_MIN_THREAD_PARAM);
if (currentWorkerThreads < numMinThreadToSet)
ThreadPool.SetMinThreads(numThreadDesired, currentCompletionPortThreads);
}
public List<int> ProcessList(List<int> itemsToProcess)
{
SetUpThreadPool(documentNumberList.Count);
...
}
Now all thread (up to 20) start at the same moment, without delay. I think 20 is a good compromise for MAXIMUM_VALUE_FOR_SET_MIN_THREAD_PARAM: not too hight, and fits my particular requirements
Still wondering about main questions
- Is Delegate approach the right one?
- Did I implement it right?
Thanks to everyone helping.
Upvotes: 0
Reputation: 597
void Main()
{
var list = new List<int>{ 1,2,3 };
var processes = list.Count();
foreach (var item in list)
{
ThreadPool.QueueUserWorkItem(s => {
ProcessItem(item);
processes--;
});
}
while (processes > 0) { Thread.Sleep(10); }
}
static void ProcessItem(int item)
{
Thread.Sleep(100); // do work
}
Upvotes: 0
Reputation: 218827
I think the 3.5 backport of Reactive Extensions comes with an implementation of Parallel.ForEach()
that you should be able to use. The port should just contain only what was needed to get Rx to work on 3.5, but that should be enough.
Others have tried implementing it as well, basically just queuing work items on ThreadPool
.
Upvotes: 2