Reputation: 67
I have a Public Shared queItems As Queue(Of String)
that it's used by many background threads whenever a thread wanted to remove and return the string at the beginning of the queue using Dequeue
;
Public Function NextItem() As String
Dim _item As String = Nothing
SyncLock Form1.queItems
If Form1.queItems.Count = 0 Then Return Nothing
_item = Form1.queItems.Dequeue()
Form1.queItems.Enqueue(_item)
End SyncLock
Return _item
End Function
I later got introduced to ConcurrentQueue(Of T) Class
And I made the next version of NextItem() As String
using Public Shared queItems As ConcurrentQueue(Of String)
as shown here:
Public Function NextItem2() As String
Dim _item As String = Nothing
here:
If Form1.queItems.Count = 0 Then Return Nothing
If Not Form1.queItems.TryDequeue(_item) Then
Thread.Sleep(100)
GoTo here
End If
Return _item
End Function
the first version is faster than the next version by around 20% in my machine.
But Are they equivalent when it comes to how thread-safe both they are ?
And which version is preferred to use ?
Upvotes: 3
Views: 1369
Reputation: 29222
Without knowing anything else about what's happening, I'd suspect that the second one is slower because it's deliberately sleeping for 100ms. It's hard to tell from this how often that is happening. Take it out and try the test again. I bet that 20% difference will disappear.
If you're going to be simultaneously enqueing and dequeing then I'd use the ConcurrentQueue
. That's exactly what it's for.
Upvotes: 1
Reputation: 19350
With the old queue, you usually do this (pseudocode)
Public class SyncQueue
private _queue As new Queue(of String)
private shared _lock As New Object()
Public Shared Function Dequeue() As String
Dim item As String = Nothing
SyncLock _lock
If _queue.Count > 0 Then
_queue.Dequeue(item)
End If
Return item
End SyncLock
End Function
Public Shared Sub Enqueu(item as String)
SyncLock _lock
_queue.Enqueue(item)
End SyncLock
End Sub
End Class
Whilst with ConcurrentQueue(Of T)
all this taken care for you. And you should use Try...
methods
If (queue.TryDequeue(item)) Then
' do something with the item
End If
Upvotes: 1
Reputation: 117064
I'm trying to decipher your code. There are quite a few weird things going on.
In the first block you are accessing a Shared
queue from your Form1
class in an instance method called NextItem()
. It isn't clear if NextItem()
is defined in your Form1
class or elsewhere. By this design it seems that you are anticipating multiple instances of Form1
(or the class where NextItem()
is defined) to all share the single queue. That's a little odd.
I'm going to assume that you can use an instance variable for your queue.
Also, you are enqueuing right after dequeuing. That also seems wrong.
So given that, this is what I think your first method should look like to be as thread-safe as possible:
Private _queItems As Queue(Of String)
Private _queItemsLock As New Object()
Public Function NextItem() As String
SyncLock _queItemsLock
If _queItems.Count = 0 Then
Return _queItems.Dequeue()
Else
Return Nothing
End If
End SyncLock
End Function
In your second block of code you have a lot going on that makes it a bit confusing, but this is what I think it should look like:
Private _queItems As ConcurrentQueue(Of String)
Public Function NextItem2() As String
Dim _item As String = Nothing
If _queItems.TryDequeue(_item) Then
Return _item
Else
Return Nothing
End If
End Function
Now, having said all that I think that there is a better choice for you than Queue(Of T)
and ConcurrentQueue(Of String)
. The class BufferBlock(Of String)
(in the System.Threading.Tasks.Dataflow
namespace) probably is easier to work with.
You could define this code:
Private _bufferBlock As New BufferBlock(Of String)()
'Blocking call, but immediately returns with value or `Nothing`
Public Function NextItem3() As String
Dim _item As String = Nothing
If _bufferBlock.TryReceive(_item) Then
Return _item
Else
Return Nothing
End If
End Function
'Blocking call - waits for value to be available
Public Function NextItem4() As String
Return _bufferBlock.Receive()
End Function
'Awaitable call
Public Async Function NextItem5() As Task(Of String)
Return Await _bufferBlock.ReceiveAsync()
End Function
...and you could then use it like this:
Async Sub Main
Dim t1 = Task.Factory.StartNew(Function () NextItem4())
Dim t2 = Task.Factory.StartNew(Function () NextItem4())
_bufferBlock.Post("Alpha")
_bufferBlock.Post("Beta")
_bufferBlock.Post("Gamma")
_bufferBlock.Post("Delta")
Dim x = Await NextItem5()
Dim y = Await _bufferBlock.ReceiveAsync()
Console.WriteLine(t1.Result)
Console.WriteLine(t2.Result)
Console.WriteLine(x)
Console.WriteLine(y)
End Sub
...and this gives me:
Alpha Beta Gamma Delta
NB: Sometimes the order of the results change as the asynchronous nature of this code means that t1
& t2
can come in any order.
Personally, I think that the ability to call Await _bufferBlock.ReceiveAsync()
has the greatest potential to simplify your code.
At the end of the day I would look at whatever methods you come up with and time them properly to determine which is faster. We can time your code as we don't have it.
Upvotes: 1
Reputation: 70317
I'm willing to guess that the reason the second version is slower is that you are using it incorrectly. You don't need to check the Count if you are using TryDequeue, it takes care of that for you.
Fixing your code:
Public Function NextItem2() As String
Dim _item As String
Form1.queItems.TryDequeue(_item)
Return _item
End Function
Reading the count from a ConcurrentQueue is actually very expensive.
When you add an item to a queue, it only has to look at one end of the queue. When you remove an item, it only looks at the other end. This means you can have one thread adding and another removing and they won't interfer with each other (unless maybe the queue is empty).
When you want the count, it actually has to look at both ends and count the items between. Which in turn requires either locking both ends, or continuously retrying until it gets a clean read. (You can examine the code yourself, but it's hard to understand.)
http://referencesource.microsoft.com/#mscorlib/system/Collections/Concurrent/ConcurrentQueue.cs
This is literally the code they use for TryDequeue:
public bool TryDequeue(out T result)
{
while (!IsEmpty)
{
Segment head = m_head;
if (head.TryRemove(out result))
return true;
//since method IsEmpty spins, we don't need to spin in the while loop
}
result = default(T);
return false;
}
As you can see, it will set the result parameter to 0/null if the queue is empty. Hence the reason we don't need to explicitly check the function's return value.
Now if you wanted a different default, then you would write:
Public Function NextItem2() As String
Dim _item As String
If Not Form1.queItems.TryDequeue(_item) Then _item = [my default]
Return _item
End Function
Upvotes: 1
Reputation: 1839
The concurrent collections implements partitioning internally to avoid having to lock the collection. What that means is that in situation where there are many threads queuing and dequeuing (high contention), part of the collection can be "locked" while other parts (think of it like blocks of data) are still accessible. This is not possible using a simple SyncLock, because doing so locks the whole collection, preventing access from other threads. In situations where there are only a few threads involved, a SyncLock and list will likely be faster because you are bypassing the overhead of partitioning the collection. In situations where you have 8 or more threads all competing for the same collection a concurrent collection will most likely be orders of magnitude faster than list + synclock.
http://www.codethinked.com/net-40-and-system_collections_concurrent_concurrentqueue
EDIT: I in fact just tested this on my dual core machine using the following:
Imports System.Threading
Public Class Form1
Private _lock As New Object
Private _queue As New Queue(Of Integer)
Private _concurrentQueue As New Concurrent.ConcurrentQueue(Of Integer)
Private Sub Form1_Load(sender As Object, e As EventArgs) Handles Me.Load
Dim sw As New Stopwatch()
Dim lstResQueue As New List(Of Integer)
Dim lstResConcurrent As New List(Of Integer)
For i = 1 To 10
Dim t As New Thread(AddressOf TestLockedQueue)
sw.Start()
t.Start()
While t.IsAlive : Thread.Sleep(0) : End While
sw.Stop()
lstResQueue.Add(sw.ElapsedMilliseconds)
sw.Reset()
t = New Thread(AddressOf TestConcurrentQueue)
sw.Start()
t.Start()
While t.IsAlive : Thread.Sleep(0) : End While
sw.Stop()
lstResConcurrent.Add(sw.ElapsedMilliseconds)
Next
MessageBox.Show(String.Format("Average over 10 runs: " & vbCrLf & _
"Queue(Of Integer) with lock: {0}" & vbCrLf & _
"ConcurrentQueue(Of Integer): {1}",
lstResQueue.Average, lstResConcurrent.Average))
End Sub
Private Sub TestLockedQueue()
Parallel.For(0, 5000000,
New ParallelOptions() With {.MaxDegreeOfParallelism = 16},
Sub(i)
Dim a = 0
SyncLock _lock
Try
_queue.Enqueue(i)
Catch ex As Exception
End Try
End SyncLock
SyncLock _lock
Try
a = _queue.Dequeue()
Catch ex As Exception
End Try
End SyncLock
Dim b = a
End Sub)
End Sub
Private Sub TestConcurrentQueue()
Parallel.For(0, 5000000,
New ParallelOptions() With {.MaxDegreeOfParallelism = 16},
Sub(i)
Dim a = 0
Try
_concurrentQueue.Enqueue(i)
Catch ex As Exception
End Try
_concurrentQueue.TryDequeue(a)
Dim b = a
End Sub)
End Sub
End Class
And the ConcurrentQueue is invariably twice as fast. It would be interesting to see the results on a more beefy 8-core processor. My results are 10 seconds versus 4.5 seconds. I am guessing the ConcurrentQueue won't scale well beyond 2 cores, while the ConcurrentBag will scale all the way up.
Upvotes: 1