Reputation: 5761
I have a process where my main thread is reading a file and splitting it into parts. Those parts then require further processing. I would like to utilize any available threads so that the downstream processing is utilizing as much CPU (or as many cores) as possible. I don't want to create an excessive backlog from the main thread, so I need the main thread to wait to add to the queue until there is another available thread.
I see many articles like VB.NET 4.0: Looking to execute multiple threads, but wait until all threads are completed before resuming, but they are waiting for all threads to complete, whereas I just need any threads to be available
Is this something I can tackle with the Task Parallel Library, or should I be manually creating threads and monitoring a threadpool?
Using Reader As New StreamReader(FileName)
Do
CurrentBlockSize = Reader.ReadBlock(CurrentBuffer, 0, BufferSize)
RunningBuffer &= New String(CurrentBuffer)
If RunningBuffer.Contains(RowDelimiter) Then
LineParts = RunningBuffer.Split(RowDelimiter)
For I As Integer = 0 To LineParts.Count - 1
If I < LineParts.Count - 1 Then
'Make synchronous call that blocks until'
'another thread is available to process the line'
AddLineToTheProcessingQueue(CurrentLine)
Else
RunningBuffer = LineParts(I)
End If
Next
End If
Loop While CurrentBlockSize = BufferSize
End Using
Upvotes: 1
Views: 426
Reputation: 7735
Paste this code into a new Console Application.
Imports System.Threading
Module Module1
' I just picked 6 randomly, not sure what is a good strategy for picking this number
' also, not sure what is the difference between a Worker Thread and a Completion thread
Const MaxWorkerThreads As Integer = 6
Const MaxCompletionPortThreads As Integer = 6
Sub Main()
ThreadPool.SetMaxThreads(MaxWorkerThreads, MaxCompletionPortThreads)
Dim availableWorkerThreads As Integer
Dim availableCompletionPortThreads As Integer
For i As Integer = 0 To 100
' GetAvailableThreads returns results via output parameters
ThreadPool.GetAvailableThreads(availableWorkerThreads, availableCompletionPortThreads)
Dim tries As Integer = 0
Do While (availableWorkerThreads = 0)
' this loop does not execute if there are available threads
' you may want to add a fail-safe to check "tries" in case the child threads get stuck
tries += 1
Console.WriteLine(String.Format("waiting to start item {0}, attempt {1}, available threads: {2}, {3}", i, tries, availableWorkerThreads, availableCompletionPortThreads))
' failure to call Sleep will make your program unresponsive
Thread.Sleep(1000)
' call GetAvailableThreads again for the next test at the top of the loop
ThreadPool.GetAvailableThreads(availableWorkerThreads, availableCompletionPortThreads)
Loop
' this is how you pass parameters to a thread created through QueueUserWorkItem
Dim parameters As Object() = {i}
ThreadPool.QueueUserWorkItem(AddressOf DoWork, parameters)
' According to MSDN, you must Sleep after calling QueueUserWorkItem, or else the current thread will just exit
Thread.Sleep(500)
Next
End Sub
Sub DoWork(parameters As Object())
Dim itemNumber = parameters(0)
Dim sleepLength = itemNumber * 1000
Console.WriteLine(String.Format("Item: {0} - sleeping for {1} miliseconds.", itemNumber, sleepLength))
Thread.Sleep(sleepLength)
Console.WriteLine(String.Format("Item: {0} - done sleeping.", itemNumber))
End Sub
End Module
Upvotes: 1
Reputation: 244918
I'm not sure why exactly do you want to do this, but you can achieve something very similar by using BlockingCollection
or a dataflow block with BoundedCapacity
set.
For example, if you set the capacity to 1 and your consumers are busy at the moment, you won't be able to add a second item to the queue until one of the consumers finishes its current work and removes that item from the queue. And both versions give you a way to wait until you can add another item to the queue.
Upvotes: 0