Just Curious
Just Curious

Reputation: 67

ConcurrentQueue(Of T) VS List(Of T) with Synclock statement in a multi-threading application

I have a Public Shared queItems As Queue(Of String) that it's used by many background threads whenever a thread wanted to remove and return the string at the beginning of the queue using Dequeue;

Public Function NextItem() As String
    Dim _item As String = Nothing
    SyncLock Form1.queItems
        If Form1.queItems.Count = 0 Then Return Nothing
        _item = Form1.queItems.Dequeue()
        Form1.queItems.Enqueue(_item)
    End SyncLock

    Return _item
End Function

I later got introduced to ConcurrentQueue(Of T) Class And I made the next version of NextItem() As String using Public Shared queItems As ConcurrentQueue(Of String) as shown here:

Public Function NextItem2() As String
    Dim _item As String = Nothing
here:
    If Form1.queItems.Count = 0 Then Return Nothing
    If Not Form1.queItems.TryDequeue(_item) Then
        Thread.Sleep(100)
        GoTo here
    End If

   Return _item
End Function

the first version is faster than the next version by around 20% in my machine.

But Are they equivalent when it comes to how thread-safe both they are ?

And which version is preferred to use ?

Upvotes: 3

Views: 1369

Answers (5)

Scott Hannen
Scott Hannen

Reputation: 29222

Without knowing anything else about what's happening, I'd suspect that the second one is slower because it's deliberately sleeping for 100ms. It's hard to tell from this how often that is happening. Take it out and try the test again. I bet that 20% difference will disappear.

If you're going to be simultaneously enqueing and dequeing then I'd use the ConcurrentQueue. That's exactly what it's for.

Upvotes: 1

T.S.
T.S.

Reputation: 19350

With the old queue, you usually do this (pseudocode)

Public class SyncQueue

    private _queue As new Queue(of String)
    private shared _lock As New Object() 

    Public Shared Function Dequeue() As String

        Dim item As String = Nothing
        SyncLock _lock
            If _queue.Count > 0 Then
                _queue.Dequeue(item)
            End If
            Return item
        End SyncLock
    End Function

    Public Shared Sub Enqueu(item as String)
        SyncLock _lock
            _queue.Enqueue(item)
        End SyncLock
    End Sub 

End Class

Whilst with ConcurrentQueue(Of T) all this taken care for you. And you should use Try... methods

If (queue.TryDequeue(item)) Then
    ' do something with the item    
End If

Upvotes: 1

Enigmativity
Enigmativity

Reputation: 117064

I'm trying to decipher your code. There are quite a few weird things going on.

In the first block you are accessing a Shared queue from your Form1 class in an instance method called NextItem(). It isn't clear if NextItem() is defined in your Form1 class or elsewhere. By this design it seems that you are anticipating multiple instances of Form1 (or the class where NextItem() is defined) to all share the single queue. That's a little odd.

I'm going to assume that you can use an instance variable for your queue.

Also, you are enqueuing right after dequeuing. That also seems wrong.

So given that, this is what I think your first method should look like to be as thread-safe as possible:

Private _queItems As Queue(Of String)
Private _queItemsLock As New Object()

Public Function NextItem() As String
    SyncLock _queItemsLock
        If _queItems.Count = 0 Then
            Return _queItems.Dequeue()
        Else
            Return Nothing
        End If
    End SyncLock
End Function

In your second block of code you have a lot going on that makes it a bit confusing, but this is what I think it should look like:

Private _queItems As ConcurrentQueue(Of String)

Public Function NextItem2() As String
    Dim _item As String = Nothing
    If _queItems.TryDequeue(_item) Then
        Return _item
    Else
        Return Nothing
    End If
End Function

Now, having said all that I think that there is a better choice for you than Queue(Of T) and ConcurrentQueue(Of String). The class BufferBlock(Of String) (in the System.Threading.Tasks.Dataflow namespace) probably is easier to work with.

You could define this code:

Private _bufferBlock As New BufferBlock(Of String)()

'Blocking call, but immediately returns with value or `Nothing`
Public Function NextItem3() As String
    Dim _item As String = Nothing
    If _bufferBlock.TryReceive(_item) Then
        Return _item
    Else
        Return Nothing
    End If
End Function

'Blocking call - waits for value to be available
Public Function NextItem4() As String
    Return _bufferBlock.Receive()
End Function

'Awaitable call
Public Async Function NextItem5() As Task(Of String)
    Return Await _bufferBlock.ReceiveAsync()
End Function

...and you could then use it like this:

Async Sub Main
    Dim t1 = Task.Factory.StartNew(Function () NextItem4())
    Dim t2 = Task.Factory.StartNew(Function () NextItem4())
    _bufferBlock.Post("Alpha")
    _bufferBlock.Post("Beta")
    _bufferBlock.Post("Gamma")
    _bufferBlock.Post("Delta")
    Dim x = Await NextItem5()
    Dim y = Await _bufferBlock.ReceiveAsync()
    Console.WriteLine(t1.Result)
    Console.WriteLine(t2.Result)
    Console.WriteLine(x)
    Console.WriteLine(y)
End Sub

...and this gives me:

Alpha
Beta
Gamma
Delta

NB: Sometimes the order of the results change as the asynchronous nature of this code means that t1 & t2 can come in any order.

Personally, I think that the ability to call Await _bufferBlock.ReceiveAsync() has the greatest potential to simplify your code.

At the end of the day I would look at whatever methods you come up with and time them properly to determine which is faster. We can time your code as we don't have it.

Upvotes: 1

Jonathan Allen
Jonathan Allen

Reputation: 70317

I'm willing to guess that the reason the second version is slower is that you are using it incorrectly. You don't need to check the Count if you are using TryDequeue, it takes care of that for you.

Fixing your code:

Public Function NextItem2() As String
    Dim _item As String
    Form1.queItems.TryDequeue(_item)
    Return _item
End Function

Reading the count from a ConcurrentQueue is actually very expensive.

When you add an item to a queue, it only has to look at one end of the queue. When you remove an item, it only looks at the other end. This means you can have one thread adding and another removing and they won't interfer with each other (unless maybe the queue is empty).

When you want the count, it actually has to look at both ends and count the items between. Which in turn requires either locking both ends, or continuously retrying until it gets a clean read. (You can examine the code yourself, but it's hard to understand.)

http://referencesource.microsoft.com/#mscorlib/system/Collections/Concurrent/ConcurrentQueue.cs


This is literally the code they use for TryDequeue:

 public bool TryDequeue(out T result)
    {
        while (!IsEmpty)
        {
            Segment head = m_head;
            if (head.TryRemove(out result))
                return true;
            //since method IsEmpty spins, we don't need to spin in the while loop
        }
        result = default(T);
        return false;
    }

As you can see, it will set the result parameter to 0/null if the queue is empty. Hence the reason we don't need to explicitly check the function's return value.

Now if you wanted a different default, then you would write:

Public Function NextItem2() As String
    Dim _item As String
    If Not Form1.queItems.TryDequeue(_item) Then _item = [my default]
    Return _item
End Function

Upvotes: 1

Drunken Code Monkey
Drunken Code Monkey

Reputation: 1839

The concurrent collections implements partitioning internally to avoid having to lock the collection. What that means is that in situation where there are many threads queuing and dequeuing (high contention), part of the collection can be "locked" while other parts (think of it like blocks of data) are still accessible. This is not possible using a simple SyncLock, because doing so locks the whole collection, preventing access from other threads. In situations where there are only a few threads involved, a SyncLock and list will likely be faster because you are bypassing the overhead of partitioning the collection. In situations where you have 8 or more threads all competing for the same collection a concurrent collection will most likely be orders of magnitude faster than list + synclock.

http://www.codethinked.com/net-40-and-system_collections_concurrent_concurrentqueue

EDIT: I in fact just tested this on my dual core machine using the following:

Imports System.Threading

Public Class Form1

    Private _lock As New Object

    Private _queue As New Queue(Of Integer)

    Private _concurrentQueue As New Concurrent.ConcurrentQueue(Of Integer)


    Private Sub Form1_Load(sender As Object, e As EventArgs) Handles Me.Load

        Dim sw As New Stopwatch()

        Dim lstResQueue As New List(Of Integer)
        Dim lstResConcurrent As New List(Of Integer)

        For i = 1 To 10
            Dim t As New Thread(AddressOf TestLockedQueue)

            sw.Start()
            t.Start()
            While t.IsAlive : Thread.Sleep(0) : End While
            sw.Stop()

            lstResQueue.Add(sw.ElapsedMilliseconds)

            sw.Reset()

            t = New Thread(AddressOf TestConcurrentQueue)

            sw.Start()
            t.Start()
            While t.IsAlive : Thread.Sleep(0) : End While
            sw.Stop()

            lstResConcurrent.Add(sw.ElapsedMilliseconds)
        Next

        MessageBox.Show(String.Format("Average over 10 runs: " & vbCrLf & _
                                      "Queue(Of Integer) with lock: {0}" & vbCrLf & _
                                      "ConcurrentQueue(Of Integer): {1}",
                                      lstResQueue.Average, lstResConcurrent.Average))
    End Sub


    Private Sub TestLockedQueue()
        Parallel.For(0, 5000000,
                     New ParallelOptions() With {.MaxDegreeOfParallelism = 16},
                     Sub(i)
                         Dim a = 0

                         SyncLock _lock
                             Try
                                 _queue.Enqueue(i)
                             Catch ex As Exception
                             End Try
                         End SyncLock

                         SyncLock _lock
                             Try
                                 a = _queue.Dequeue()
                             Catch ex As Exception
                             End Try
                         End SyncLock

                         Dim b = a
                     End Sub)
    End Sub

    Private Sub TestConcurrentQueue()
        Parallel.For(0, 5000000,
                     New ParallelOptions() With {.MaxDegreeOfParallelism = 16},
                     Sub(i)
                         Dim a = 0

                         Try
                             _concurrentQueue.Enqueue(i)
                         Catch ex As Exception
                         End Try

                         _concurrentQueue.TryDequeue(a)

                         Dim b = a
                     End Sub)
    End Sub

End Class

And the ConcurrentQueue is invariably twice as fast. It would be interesting to see the results on a more beefy 8-core processor. My results are 10 seconds versus 4.5 seconds. I am guessing the ConcurrentQueue won't scale well beyond 2 cores, while the ConcurrentBag will scale all the way up.

Upvotes: 1

Related Questions