DarthVader
DarthVader

Reputation: 55132

Consume from Queue with multiple threads/tasks

I have a producer that gets users from a resource and places them into a ConcurrentQueue, then What I want to do is using multiple consumers and process all users and get their information from another resource.

  public void Populate(IEnumerable<Users> users){
     _queue.Enqueue(users);
     // here single threaded
  }

  public void Process(){
     // here i want this to be processed by multiple consumers
     // say multiple threads so that I can finish processing them.
  }

My question is, should i use thread? task? ThreadPool?

I have seen this question: C# equivalent for Java ExecutorService.newSingleThreadExecutor(), or: how to serialize mulithreaded access to a resource

Upvotes: 0

Views: 4111

Answers (1)

Matthew Watson
Matthew Watson

Reputation: 109852

Since you are using a queuing mechanism already, I suggest you use a BlockingCollection instead of ConcurrentQueue, along with Parallel.Invoke().

There are some important things about BlockingCollection that make it nice to use.

  1. BlockingCollection lets the consuming threads take items from the collection in a threadsafe and natural manner using foreach.
  2. The consuming foreach loop blocks automatically when the queue is empty, and continues when items become available.
  3. BlockingCollection provides an easy-to-use mechanism to signal the end of data. The queue owner simply calls queue.CompleteAdding() and any foreach loops taking items from the queue will exit automatically when the queue becomes completely empty.

You can use Parallel.Invoke() to start a number of threads, each of which uses foreach to iterate over the queue. (Parallel.Invoke() lets you give it an array of tasks to run in parallel, which makes it quite simple to use.)

This is best illustrated with a sample program:

using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Demo
{
    class User
    {
        public string Name;
    }

    class Program
    {
        readonly BlockingCollection<User> _queue = new BlockingCollection<User>();

        void run()
        {
            var background = Task.Factory.StartNew(process); // Start the processing threads.

            // Make up 50 sample users.
            var users = Enumerable.Range(0, 50).Select(n => new User{Name = n.ToString()});

            foreach (var user in users) // Add some sample data.
                _queue.Add(user);

            Console.WriteLine("Press <RETURN> to exit.");
            Console.ReadLine();
            _queue.CompleteAdding(); // Makes all the consuming foreach loops exit.
            background.Wait();
            Console.WriteLine("Exited.");
        }

        void process() // Process the input queue,
        {
            int taskCount = 4;  // Let's use 4 threads.
            var actions = Enumerable.Repeat<Action>(processQueue, taskCount);
            Parallel.Invoke(actions.ToArray());
        }

        void processQueue()
        {
            foreach (User user in _queue.GetConsumingEnumerable())
                processUser(user);
        }

        void processUser(User user)
        {
            Console.WriteLine("Processing user " + user.Name);
            Thread.Sleep(200); // Simulate work.
        }

        static void Main()
        {
            new Program().run();
        }
    }
}

If you don't need to limit the number of concurrent threads and are happy to let .Net decide for you (not a bad idea), then you can simplify the code quite a bit by removing processQueue() altogether and changing process() to:

void process() // Process the input queue,
{
    Parallel.ForEach(_queue.GetConsumingEnumerable(), processUser);
}

However, that does more locking than it needs to, so you're probably best off just using the original method (which doesn't suffer from that problem), or using the solution described here: http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx

Upvotes: 6

Related Questions