Reputation: 55132
I have a producer that gets users from a resource and places them into a ConcurrentQueue, then What I want to do is using multiple consumers and process all users and get their information from another resource.
public void Populate(IEnumerable<Users> users){
_queue.Enqueue(users);
// here single threaded
}
public void Process(){
// here i want this to be processed by multiple consumers
// say multiple threads so that I can finish processing them.
}
My question is, should i use thread? task? ThreadPool?
I have seen this question: C# equivalent for Java ExecutorService.newSingleThreadExecutor(), or: how to serialize mulithreaded access to a resource
Upvotes: 0
Views: 4111
Reputation: 109852
Since you are using a queuing mechanism already, I suggest you use a BlockingCollection
instead of ConcurrentQueue
, along with Parallel.Invoke()
.
There are some important things about BlockingCollection
that make it nice to use.
foreach
.foreach
loop blocks automatically when the queue is empty, and continues when items become available.queue.CompleteAdding()
and any foreach
loops taking items from the queue will exit automatically when the queue becomes completely empty.You can use Parallel.Invoke()
to start a number of threads, each of which uses foreach
to iterate over the queue. (Parallel.Invoke()
lets you give it an array of tasks to run in parallel, which makes it quite simple to use.)
This is best illustrated with a sample program:
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Demo
{
class User
{
public string Name;
}
class Program
{
readonly BlockingCollection<User> _queue = new BlockingCollection<User>();
void run()
{
var background = Task.Factory.StartNew(process); // Start the processing threads.
// Make up 50 sample users.
var users = Enumerable.Range(0, 50).Select(n => new User{Name = n.ToString()});
foreach (var user in users) // Add some sample data.
_queue.Add(user);
Console.WriteLine("Press <RETURN> to exit.");
Console.ReadLine();
_queue.CompleteAdding(); // Makes all the consuming foreach loops exit.
background.Wait();
Console.WriteLine("Exited.");
}
void process() // Process the input queue,
{
int taskCount = 4; // Let's use 4 threads.
var actions = Enumerable.Repeat<Action>(processQueue, taskCount);
Parallel.Invoke(actions.ToArray());
}
void processQueue()
{
foreach (User user in _queue.GetConsumingEnumerable())
processUser(user);
}
void processUser(User user)
{
Console.WriteLine("Processing user " + user.Name);
Thread.Sleep(200); // Simulate work.
}
static void Main()
{
new Program().run();
}
}
}
If you don't need to limit the number of concurrent threads and are happy to let .Net decide for you (not a bad idea), then you can simplify the code quite a bit by removing processQueue()
altogether and changing process()
to:
void process() // Process the input queue,
{
Parallel.ForEach(_queue.GetConsumingEnumerable(), processUser);
}
However, that does more locking than it needs to, so you're probably best off just using the original method (which doesn't suffer from that problem), or using the solution described here: http://blogs.msdn.com/b/pfxteam/archive/2010/04/06/9990420.aspx
Upvotes: 6