Reputation: 387
I have following code which throws SemaphoreFullException
, I don't understand why ?
If I change _semaphore = new SemaphoreSlim(0, 2)
to
_semaphore = new SemaphoreSlim(0, int.MaxValue)
then all works fine. Can anyone please find fault with this code and explain to me.
class BlockingQueue<T>
{
private Queue<T> _queue = new Queue<T>();
private SemaphoreSlim _semaphore = new SemaphoreSlim(0, 2);
public void Enqueue(T data)
{
if (data == null) throw new ArgumentNullException("data");
lock (_queue)
{
_queue.Enqueue(data);
}
_semaphore.Release();
}
public T Dequeue()
{
_semaphore.Wait();
lock (_queue)
{
return _queue.Dequeue();
}
}
}
public class Test
{
private static BlockingQueue<string> _bq = new BlockingQueue<string>();
public static void Main()
{
for (int i = 0; i < 100; i++)
{
_bq.Enqueue("item-" + i);
}
for (int i = 0; i < 5; i++)
{
Thread t = new Thread(Produce);
t.Start();
}
for (int i = 0; i < 100; i++)
{
Thread t = new Thread(Consume);
t.Start();
}
Console.ReadLine();
}
private static Random _random = new Random();
private static void Produce()
{
while (true)
{
_bq.Enqueue("item-" + _random.Next());
Thread.Sleep(2000);
}
}
private static void Consume()
{
while (true)
{
Console.WriteLine("Consumed-" + _bq.Dequeue());
Thread.Sleep(1000);
}
}
}
Upvotes: 1
Views: 6650
Reputation: 133975
If you want to use the semaphore to control the number of concurrent threads, you're using it wrong. You should acquire the semaphore when you dequeue an item, and release the semaphore when the thread is done processing that item.
What you have right now is a system that allows only two items to be in the queue at any one time. Initially, your semaphore has a count of 2. Each time you enqueue an item, the count is reduced. After two items, the count is 0 and if you try to release again you're going to get a semaphore full exception.
If you really want to do this with a semaphore, you need to remove the Release
call from the Enqueue
method. And add a Release
method to the BlockingQueue
class. You then would write:
private static void Consume()
{
while (true)
{
Console.WriteLine("Consumed-" + _bq.Dequeue());
Thread.Sleep(1000);
bq.Release();
}
}
That would make your code work, but it's not a very good solution. A much better solution would be to use BlockingCollection<T>
and two persistent consumers. Something like:
private BlockingCollection<int> bq = new BlockingCollection<int>();
void Test()
{
// create two consumers
var c1 = new Thread(Consume);
var c2 = new Thread(Consume);
c1.Start();
c2.Start();
// produce
for (var i = 0; i < 100; ++i)
{
bq.Add(i);
}
bq.CompleteAdding();
c1.Join();
c2.Join();
}
void Consume()
{
foreach (var i in bq.GetConsumingEnumerable())
{
Console.WriteLine("Consumed-" + i);
Thread.Sleep(1000);
}
}
That gives you two persistent threads consuming the items. The benefit is that you avoid the cost of spinning up a new thread (or having the RTL assign a pool thread) for each item. Instead, the threads do non-busy waits on the queue. You also don't have to worry about explicit locking, etc. The code is simpler, more robust, and much less likely to contain a bug.
Upvotes: 3