Neilski
Neilski

Reputation: 4415

How to use Threads for Processing Many Tasks

I have a C# requirement for individually processing a 'great many' (perhaps > 100,000) records. Running this process sequentially is proving to be very slow with each record taking a good second or so to complete (with a timeout error set at 5 seconds).

I would like to try running these tasks asynchronously by using a set number of worker 'threads' (I use the term 'thread' here cautiously as I am not sure if I should be looking at a thread, or a task or something else).

I have looked at the ThreadPool, but I can't imagine it could queue the volume of requests required. My ideal pseudo code would look something like this...

public void ProcessRecords() {
    SetMaxNumberOfThreads(20);
    MyRecord rec;
    while ((rec = GetNextRecord()) != null) {
        var task = WaitForNextAvailableThreadFromPool(ProcessRecord(rec));
        task.Start()
    }
}

I will also need a mechanism that the processing method can report back to the parent/calling class.

Can anyone point me in the right direction with perhaps some example code?

Upvotes: 6

Views: 1274

Answers (2)

i3arnon
i3arnon

Reputation: 116636

A possible simple solution would be to use a TPL Dataflow block which is a higher abstraction over the TPL with configurations for degree of parallelism and so forth. You simply create the block (ActionBlock in this case), Post everything to it, wait asynchronously for completion and TPL Dataflow handles all the rest for you:

var block = new ActionBlock<MyRecord>(
    rec => ProcessRecord(rec), 
    new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 20});

MyRecord rec;
while ((rec = GetNextRecord()) != null)
{
     block.Post(rec);
}

block.Complete();
await block.Completion

Another benefit is that the block starts working as soon as the first record arrives and not only when all the records have been received.

If you need to report back on each record you can use a TransformBlock to do the actual processing and link an ActionBlock to it that does the updates:

var transform = new TransfromBlock<MyRecord, Report>(rec =>
{
    ProcessRecord(rec);
    return GenerateReport(rec);
}, new ExecutionDataflowBlockOptions{MaxDegreeOfParallelism = 20});

var reporter = new ActionBlock<Report>(report =>
{
    RaiseEvent(report) // Or any other mechanism...
});

transform.LinkTo(reporter, new DataflowLinkOptions { PropagateCompletion = true });

MyRecord rec;
while ((rec = GetNextRecord()) != null)
{
     transform.Post(rec);
}

transform.Complete();
await transform.Completion

Upvotes: 9

czuroski
czuroski

Reputation: 4332

Have you thought about using parallel processing with Actions? ie, create a method to process a single record, add each record method as an action into a list, and then perform a parrallel.for on the list.

Dim list As New List(Of Action)
list.Add(New Action(Sub() MyMethod(myParameter)))
Parallel.ForEach(list, Sub(t) t.Invoke())

This is in vb.net, but I think you get the gist.

Upvotes: 1

Related Questions