Reputation: 387
I got an assignment that I find it quite challenging and I have no idea how to start.
Here is the question:
I got a queue of random Products(consists of type A,B and C), says
P[A], P[A], P[B], P[C], P[C], P[A], P[B], P[B]....
I gonna send these product to ProcessProduct() concurrently but two same type of products can not be processed together.
Here is the scenario:
first P[A] get processed and the second P[A] will have to wait for the first one, and the third P[B] and the fourth P[C] will get processed right away, and the fifth P[C] will have to wait for the previous one....
Hope my description is not too confusing. I will appreciate it if anyone could give me some suggestions. Thank you.
Upvotes: 1
Views: 148
Reputation: 67148
Group your product list by type then process each group in parallel.
EDITED
This an example (if the product list is static):
sealed class Product
{
public string Type
{
get;
set;
}
}
sealed class ProductProcessor
{
public void StartProcessing(IEnumerable<Product> products)
{
foreach (var group in products.GroupBy(x => x.Type))
Task.Factory.StartNew(() => ProcessProducts(group));
}
private void ProcessProducts(IEnumerable<Product> products)
{
foreach (Product product in products)
ProcessProduct(product);
}
private void ProcessProduct(Product product)
{
}
}
This is another example for a dynamic queue of products to process (I didn't run the code, read it as a kind of pseudo-code):
sealed class ProductProcessor
{
public void Process(Product product)
{
lock (_queues)
{
if (_queues.ContainsKey(product.Type))
_queues[product.Type].Enqueue(product);
else
{
ConcurrentQueue<Product> queue = new ConcurrentQueue<Product>();
queue.Enqueue(product);
_queues.Add(product.Type, queue);
WaitCallback action = delegate(object state)
{
Product productToProcess;
while (queue.TryDequeue(out productToProcess))
{
ProcessProduct(productToProcess);
}
lock (_queues) _queues.Remove(product.Type);
};
ThreadPool.QueueUserWorkItem(action);
}
}
}
private Dictionary<string, ConcurrentQueue<Product>> _queues
= new Dictionary<string, ConcurrentQueue<Product>>();
private void ProcessProduct(Product product)
{
}
}
P.S. Instead of the lock you may use a ConcurrentDictionary.
Upvotes: 1
Reputation: 36327
One way to do this is to group the projects into three different queus, one queue per type. This will add some overhead but it will make it a lot easier.
Here's some pseudo code:
queueForTypeA=new queue()
queueForTypeB=new queue()
queueForTypeC=new queue()
foreach product in products
if product is typeof A
addLastInQueue(queueForTypeA, product)
if product is typeof B
addLastInQueue(queueForTypeB, product)
if product is typeof C
addLastInQueue(queueForTypeC, product)
Then you could process each queue seperately and never have to worry that you process 2 of the same type at once.
However, this solution doesn't scale very well, if you add another type of product, you need to change your code, this isn't really optimal.
So another option would be to have a list of types that you are processing then you re-visit the first item in the queue each time you want a new product processed and if the type of product is in the productTypesBeingProcessed
-list, you skip in line and take the next one.
Upvotes: 2
Reputation: 1189
Well simply use a queue for each type. And then each queue can only have one job at a time. So you could use on thread per queue.
I cant even see a problem here ? What are you asking for ? Code example ? Functions ?
Because you seem to be stuck on the conception, but most of it seems here. The key is just in affecting one thread per queue (or job if you dont need to work directly with threads)
Upvotes: 0
Reputation: 24907
Use an array of producer-consumer queues, indexed by product-type. Hang one thread only from each queue and so call ProcessProduct() sequentially for each product type.
Upvotes: 1