Reputation: 1429
I have a collection of 1000 input message to process. I'm looping the input collection and starting the new task for each message to get processed.
//Assume this messages collection contains 1000 items
var messages = new List<string>();
foreach (var msg in messages)
{
Task.Factory.StartNew(() =>
{
Process(msg);
});
}
Can we guess how many maximum messages simultaneously get processed at the time (assuming normal Quad core processor), or can we limit the maximum number of messages to be processed at the time?
How to ensure this message get processed in the same sequence/order of the collection?
Upvotes: 70
Views: 93338
Reputation: 2018
With .NET 5.0 and Core 3.0 channels were introduced.
The main benefit of this producer/consumer concurrency pattern is that you can also limit the input data processing to reduce resource impact.
This is especially helpful when processing millions of data records.
Instead of reading the whole dataset at once into memory, you can now consecutively query only chunks of the data and wait for the workers to process it before querying more.
Code sample with a queue capacity of 50 messages and a max of logical processors as consumer threads:
/// <exception cref="System.AggregateException">Thrown on Consumer Task exceptions.</exception>
public static async Task ProcessMessages(
List<string> messages,
int producerCapacity = 50,
int consumerTaskLimit = 0)
{
if (consumerTaskLimit == 0)
consumerTaskLimit = Environment.ProcessorCount;
// by default only uses one processor group
// https://stackoverflow.com/questions/27965962/c-sharp-environment-processorcount-does-not-always-return-the-full-number-of-log
var tokenSource = new CancellationTokenSource();
CancellationToken ct = tokenSource.Token;
var channel = Channel.CreateBounded<string>(producerCapacity);
_ = Task.Run(async () =>
{
try
{
foreach (var msg in messages)
{
await channel.Writer.WriteAsync(msg, ct);
ct.ThrowIfCancellationRequested();
// blocking when channel is full
// waiting for the consumer tasks to pop messages from the queue
}
}
catch (OperationCanceledException) { }
catch (Exception ex)
{
Console.WriteLine("Exception while processing Messages\n" + ex);
tokenSource.Cancel();
}
finally
{
channel.Writer.Complete();
// signaling the end of queue so that
// WaitToReadAsync will return false to stop the consumer tasks
}
});
var consumerTasks = Enumerable
.Range(1, consumerTaskLimit)
.Select(_ => Task.Run(async () =>
{
try
{
while (await channel.Reader.WaitToReadAsync(ct))
{
ct.ThrowIfCancellationRequested();
while (channel.Reader.TryRead(out var message))
{
await Task.Delay(500);
Console.WriteLine(message);
}
}
}
catch (OperationCanceledException) { }
catch
{
tokenSource.Cancel();
throw;
}
}))
.ToArray();
Task waitForConsumers = Task.WhenAll(consumerTasks);
try { await waitForConsumers; }
catch
{
if (waitForConsumers.IsFaulted && waitForConsumers.Exception is not null)
{
foreach (var e in waitForConsumers.Exception.Flatten().InnerExceptions)
Console.WriteLine(e.ToString());
throw waitForConsumers.Exception.Flatten();
}
else throw;
}
}
As pointed out by Theodor Zoulias: On multiple consumer exceptions, the remaining tasks will continue to run and have to take the load of the killed tasks. To avoid this, I implemented a CancellationToken to stop all the remaining tasks and handle the exceptions combined in the AggregateException of waitForConsumers.Exception.
Side note:
The Task Parallel Library (TPL) might be good at automatically limiting the tasks based on your local resources. But when you are processing data remotely via RPC, it's necessary to manually limit your RPC calls to avoid filling the network/processing stack!
Upvotes: 8
Reputation: 1817
I ran into a similar problem where I wanted to produce 5000 results while calling apis, etc. So, I ran some speed tests.
Parallel.ForEach(products.Select(x => x.KeyValue).Distinct().Take(100), id =>
{
new ParallelOptions { MaxDegreeOfParallelism = 100 };
GetProductMetaData(productsMetaData, client, id).GetAwaiter().GetResult();
});
produced 100 results in 30 seconds.
Parallel.ForEach(products.Select(x => x.KeyValue).Distinct().Take(100), id =>
{
new ParallelOptions { MaxDegreeOfParallelism = 100 };
GetProductMetaData(productsMetaData, client, id);
});
Moving the GetAwaiter().GetResult() to the individual async api calls inside GetProductMetaData resulted in 14.09 seconds to produce 100 results.
foreach (var id in ids.Take(100))
{
GetProductMetaData(productsMetaData, client, id);
}
Complete non-async programming with the GetAwaiter().GetResult() in api calls resulted in 13.417 seconds.
var tasks = new List<Task>();
while (y < ids.Count())
{
foreach (var id in ids.Skip(y).Take(100))
{
tasks.Add(GetProductMetaData(productsMetaData, client, id));
}
y += 100;
Task.WhenAll(tasks).GetAwaiter().GetResult();
Console.WriteLine($"Finished {y}, {sw.Elapsed}");
}
Forming a task list and working through 100 at a time resulted in a speed of 7.36 seconds.
using (SemaphoreSlim cons = new SemaphoreSlim(10))
{
var tasks = new List<Task>();
foreach (var id in ids.Take(100))
{
cons.Wait();
var t = Task.Factory.StartNew(() =>
{
try
{
GetProductMetaData(productsMetaData, client, id);
}
finally
{
cons.Release();
}
});
tasks.Add(t);
}
Task.WaitAll(tasks.ToArray());
}
Using SemaphoreSlim resulted in 13.369 seconds, but also took a moment to boot to start using it.
var throttler = new SemaphoreSlim(initialCount: take);
foreach (var id in ids)
{
throttler.WaitAsync().GetAwaiter().GetResult();
tasks.Add(Task.Run(async () =>
{
try
{
skip += 1;
await GetProductMetaData(productsMetaData, client, id);
if (skip % 100 == 0)
{
Console.WriteLine($"started {skip}/{count}, {sw.Elapsed}");
}
}
finally
{
throttler.Release();
}
}));
}
Using Semaphore Slim with a throttler for my async task took 6.12 seconds.
The answer for me in this specific project was use a throttler with Semaphore Slim. Although the while foreach tasklist did sometimes beat the throttler, 4/6 times the throttler won for 1000 records.
I realize I'm not using the OPs code, but I think this is important and adds to this discussion because how is sometimes not the only question that should be asked, and the answer is sometimes "It depends on what you are trying to do."
Now to answer the specific questions:
Upvotes: 0
Reputation: 11778
If your Process
method is async you can't use Task.Factory.StartNew
as it doesn't play well with an async delegate. Also there are some other nuances when using it (see this for example).
The proper way to do it in this case is to use Task.Run
. Here's @ClearLogic answer modified for an async Process method.
static void Main(string[] args)
{
int maxConcurrency = 5;
List<string> messages = Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();
using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
List<Task> tasks = new List<Task>();
foreach (var msg in messages)
{
concurrencySemaphore.Wait();
var t = Task.Run(async () =>
{
try
{
await Process(msg);
}
finally
{
concurrencySemaphore.Release();
}
});
tasks.Add(t);
}
Task.WaitAll(tasks.ToArray());
}
Console.WriteLine("Exited using block");
Console.ReadKey();
}
private static async Task Process(string msg)
{
await Task.Delay(2000);
Console.WriteLine(msg);
}
Upvotes: 6
Reputation: 8740
you can use the BlockingCollection
, If the consume collection limit has reached, the produce will stop producing until a consume process will finish. I find this pattern more easy to understand and implement than the SemaphoreSlim
.
int TasksLimit = 10;
BlockingCollection<Task> tasks = new BlockingCollection<Task>(new ConcurrentBag<Task>(), TasksLimit);
void ProduceAndConsume()
{
var producer = Task.Factory.StartNew(RunProducer);
var consumer = Task.Factory.StartNew(RunConsumer);
try
{
Task.WaitAll(new[] { producer, consumer });
}
catch (AggregateException ae) { }
}
void RunConsumer()
{
foreach (var task in tasks.GetConsumingEnumerable())
{
task.Start();
}
}
void RunProducer()
{
for (int i = 0; i < 1000; i++)
{
tasks.Add(new Task(() => Thread.Sleep(1000), TaskCreationOptions.AttachedToParent));
}
}
Note that the RunProducer
and RunConsumer
has spawn two independent tasks.
Upvotes: -1
Reputation: 121
I think it would be better to use Parallel LINQ
Parallel.ForEach(messages ,
new ParallelOptions{MaxDegreeOfParallelism = 4},
x => Process(x);
);
where x is the MaxDegreeOfParallelism
Upvotes: 11
Reputation: 3682
SemaphoreSlim is a very good solution in this case and I higly recommend OP to try this, but @Manoj's answer has flaw as mentioned in comments.semaphore should be waited before spawning the task like this.
Updated Answer: As @Vasyl pointed out Semaphore may be disposed before completion of tasks and will raise exception when Release()
method is called so before exiting the using block must wait for the completion of all created Tasks.
int maxConcurrency=10;
var messages = new List<string>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
List<Task> tasks = new List<Task>();
foreach(var msg in messages)
{
concurrencySemaphore.Wait();
var t = Task.Factory.StartNew(() =>
{
try
{
Process(msg);
}
finally
{
concurrencySemaphore.Release();
}
});
tasks.Add(t);
}
Task.WaitAll(tasks.ToArray());
}
Answer to Comments
for those who want to see how semaphore can be disposed without Task.WaitAll
Run below code in console app and this exception will be raised.
System.ObjectDisposedException: 'The semaphore has been disposed.'
static void Main(string[] args)
{
int maxConcurrency = 5;
List<string> messages = Enumerable.Range(1, 15).Select(e => e.ToString()).ToList();
using (SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
List<Task> tasks = new List<Task>();
foreach (var msg in messages)
{
concurrencySemaphore.Wait();
var t = Task.Factory.StartNew(() =>
{
try
{
Process(msg);
}
finally
{
concurrencySemaphore.Release();
}
});
tasks.Add(t);
}
// Task.WaitAll(tasks.ToArray());
}
Console.WriteLine("Exited using block");
Console.ReadKey();
}
private static void Process(string msg)
{
Thread.Sleep(2000);
Console.WriteLine(msg);
}
Upvotes: 73
Reputation: 9
public static void RunTasks(List<NamedTask> importTaskList)
{
List<NamedTask> runningTasks = new List<NamedTask>();
try
{
foreach (NamedTask currentTask in importTaskList)
{
currentTask.Start();
runningTasks.Add(currentTask);
if (runningTasks.Where(x => x.Status == TaskStatus.Running).Count() >= MaxCountImportThread)
{
Task.WaitAny(runningTasks.ToArray());
}
}
Task.WaitAll(runningTasks.ToArray());
}
catch (Exception ex)
{
Log.Fatal("ERROR!", ex);
}
}
Upvotes: -1
Reputation: 19
If you need in-order queuing (processing might finish in any order), there is no need for a semaphore. Old fashioned if statements work fine:
const int maxConcurrency = 5;
List<Task> tasks = new List<Task>();
foreach (var arg in args)
{
var t = Task.Run(() => { Process(arg); } );
tasks.Add(t);
if(tasks.Count >= maxConcurrency)
Task.WaitAny(tasks.ToArray());
}
Task.WaitAll(tasks.ToArray());
Upvotes: 0
Reputation: 16956
You could use Parallel.Foreach
and rely on MaxDegreeOfParallelism
instead.
Parallel.ForEach(messages, new ParallelOptions {MaxDegreeOfParallelism = 10},
msg =>
{
// logic
Process(msg);
});
Upvotes: 85
Reputation: 1201
You can simply set the max concurrency degree like this way:
int maxConcurrency=10;
var messages = new List<1000>();
using(SemaphoreSlim concurrencySemaphore = new SemaphoreSlim(maxConcurrency))
{
foreach(var msg in messages)
{
Task.Factory.StartNew(() =>
{
concurrencySemaphore.Wait();
try
{
Process(msg);
}
finally
{
concurrencySemaphore.Release();
}
});
}
}
Upvotes: 0
Reputation: 9733
You can create your own TaskScheduler and override QueueTask there.
protected virtual void QueueTask(Task task)
Then you can do anything you like.
One example here:
Limited concurrency level task scheduler (with task priority) handling wrapped tasks
Upvotes: 0