user2331234
user2331234

Reputation: 149

Queue and Dequeue in multiple threads

I'm creating an indexer which is Enqueueing items which needs to be processed. The indexer will add items to its processor. It will for example add 100 items, then doesn't add items for 3 minutes and add another 50 items.

public class Processer
{
    private ConcurrentQueue<Item> items;

    public void AddItem(Item item)
    {
        this.items.Enqueue(item);
    }
}

The items will come in at random intervals, so I will create a separate thread to dequeue and process these items.

What would be the best option to use?

  1. Don't use a Collection, but use the ThreadPool:

    public void AddItem(Item item)
    {
        ThreadPool.QueueUserWorkItem(function, item);
    }
    

    This will automatically create a Queue, and process the items, but I have less control, when 20 items are found, they will almost stop my indexer to run and first finish this thread pool

  2. Use a long running Task:

    public Processer()
    {
        this.task = Task.Factory.StartNew(() => DequeueItems(),
            CancellationToken.None,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
    }
    
    public DequeueItems()
    {
        while(true)
        {
            Item item = null;
            while(this.items.TryDequeue(out item)
            {
                this.store.ExecuteIndex((AbstractIndexCreationTask)item);
            }
    
            Thread.Sleep(100); 
        }
    }
    

    But I hate the while() and thread.sleep I've got to use, since the enumerable will dry up after some time, and it will need recheck if there are new items.

  3. Use a short running task:

    public Processer()
    {
    
    }
    private void Run()
    {
        this.task = Task.Factory.StartNew(() => DequeueItems(),
            CancellationToken.None,
            TaskCreationOptions.PreferFairness,
            TaskScheduler.Default);
    }
    public void AddItem(Item item)
    {
        this.items.Add(item);
        if(this.task == null || this.task.isCompleted)
            this.Run();
    }
    public DequeueItems()
    {
        Item item = null;
        while(this.items.TryDequeue(out item)
        {
            this.store.ExecuteIndex((AbstractIndexCreationTask)item);
        }
    }
    

    This might be nicer? But starting a thread is a "expensive" operation, and I don't know if I can miss items since I check IsCompleted, which can be in the process of ending the while loop and this way missing 1 item. But it doesn't sleep, and use a dirty while loop.

  4. Your option, since MSDN recommends to use the TPL, I thought not to use Threads, but maybe there are better ways dealing with this problem

Changelog

  1. Changed to BlockingCollection
  2. Changed back to ConcurrentQueue

Some things I've checked:

Upvotes: 3

Views: 17560

Answers (2)

Romano Zumb&#233;
Romano Zumb&#233;

Reputation: 8079

I think a Semaphore might be the right thing for you. You'll find a pretty good explanation of it here

In Addition I would suggest to use a ConcurrentQueue

Upvotes: 2

svick
svick

Reputation: 244757

I think the simplest solution here is to use BlockingCollection (probably using its GetConsumingEnumerable()) along with a long-running Task. When there's nothing to do, this will waste a Thread, but a single wasted Thread is not that bad.

If you can't afford to waste that Thread, then you can go with something like your #3. But you have to be very careful about making it thread-safe. For example, in your code, if the Task isn't running and AddItem() is called from two threads at the same time, you end up creating two Tasks, which is almost certainly wrong.

Another option, if you're on .Net 4.5 is to use ActionBlock from TPL Dataflow. With it, you're not wasting any threads and you don't have to write the difficult thread-safe code yourself.

Upvotes: 3

Related Questions