Reputation: 180
I have read documenation and many tutorials on TPL but none covers model I want to achieve.
There were always fixed number of iterations for some algorithm.
I need constantly running threads (as many as possible):
while(true)
Additionaly I need mechanism which will be able to set alarm clock (e.g. 5 seconds). After five seconds all work must be suspended for a while and then resumed.
Should I use Task.ContinueWith the same task? But I am not processing result of previous task launch, but instead I update data structure in MAIN Thread and then decide what will be the input of new task iteration...
How can I leave to TPL decision how many task should be created for best efficiency?
No I am using BackgroundWorkers, becase they have nice RunEventCompleted event - inside it I am on my main thread so I can update my MAIN structure, check time constraints and then eventually call StartAsync again on the BackgroundWorker which completed. It is nice and clear, but probably very inneficient. I need to make it highly efficient on multi-processor, multi-core servers.
One problem is that computation is always online, never stops. There is some networking also, which enables to ask remotely of current state of MAIN structure.
Second problem is critical time control (I must have precise timer - when it stops which no thread can be restarted). Then comes special high priority task after it ends, all work is resumed.
Third problem is that there is no upper bound for operations to do.
These three constraints, from what I observed, do not go along TPL well - I can't use something like Parallel.For because the collection is modified by results of task itself in realtime... I don't know also how to combine:
Can someone give me clues? I know how to do it bad, inefficent way. There are some small requirements which I described, which prevent me from doing this right. I am a little bit confused.
Upvotes: 2
Views: 1169
Reputation: 30454
I think you should read
MSDN: How to implement a producer / consumer dataflow pattern
I had the same problem: one producer produced items, while several consumers consumed them and decided to send them to other consumers. Each consumer was working asynchronously and independent from other consumers.
Your main task is the producer. He produces items that your other tasks should process. The class with the code of your main task has a function:
public async Task ProduceOutputAsync(...)
Your main program starts this Task using:
var producerTask = Task.Run( () => MyProducer.ProduceOutputAsync(...)
Once this is called the producer task starts producing output. Meanwhile your main program can continue doing other things, like for instance start the consumers.
But let's first focus on the Producer task.
The producer task produces items of type T to be processed by other tasks. They are carried over to the other task using objects that implement ITargetBlock'.
Every time the producer task has finished creating an object of type T it sends it to the target block using ITargetBlock.Post, or preferably the async version:
while (continueProducing())
{
T product = await CreateProduct(...)
bool accepted = await this.TargetBlock(product)
// process the return value
}
// if here, nothing to produce anymore. Notify the consumers:
this.TargetBlock.Complete();
The producer needs an ITargetBlock<T
>. In my application a BufferBlock<T
> was enough. Check MSDN for the other possible targets.
Anyway, the data flow block should also implement ISourceBlock<T
>. Your receiver waits for input to arrive at the source, fetches it and processes it. Once finished, it can send the result to its own target block, and wait for the next input until there is no input expected anymore. Of course if your consumer doesn't produce output it doesn't have to send anything to a target.
Waiting for input is done as follows:
ISourceBlock`<T`> mySource = ...;
while (await mySource.ReceiveAsync())
{ // a object of type T is available at the source
T objectToProcess = await mySource.ReceiveAsync();
// keep in mind that someone else might have fetched your object
// so only process it if you've got it.
if (objectToProcess != null)
{
await ProcessAsync(objectToProcess);
// if your processing produces output send the output to your target:
var myOutput = await ProduceOutput(objectToprocess);
await myTarget.SendAsync(myOutput);
}
}
// if here, no input expected anymore, notify my consumers:
myTarget.Complete();
Each consumer will stop as soon as it hears that no input is expected anymore. After all tasks have completed your main function can read the results and return
Upvotes: 0
Reputation: 9915
You need to use messaging + actors + a scheduler imo. And then you need to use a language capable for it. Have a look at this code that asynchronously receives from Azure Service Bus, enqueues in a shared queue and manages runtime state through an actor.
Inline:
Should I use Task.ContinueWith the same task?
No, ContinueWith will get your program killed based on exception handling inside of each continuation passing; there's no good way in TPL to marshal failed state into the call-side/main thread.
But I am not processing result of previous task launch, but instead I update data structure in MAIN Thread and then decide what will be the input of new task iteration...
You need to move beyond threading for this, unless you're willing to spend A LOT of time on the problem.
How can I leave to TPL decision how many task should be created for best efficiency?
That's handled by the framework that runs your async workflows.
No I am using BackgroundWorkers, becase they have nice RunEventCompleted event - inside it I am on my main thread so I can update my MAIN structure, check time constraints and then eventually call StartAsync again on the BackgroundWorker which completed. It is nice and clear, but probably very inneficient. I need to make it highly efficient on multi-processor, multi-core servers.
One problem is that computation is always online, never stops. There is some networking also, which enables to ask remotely of current state of MAIN structure. Second problem is critical time control (I must have precise timer - when it stops which no thread can be restarted).
If you run everything asynchronously, you can pass messages to your actor that suspends it. You scheduling actor is responsible for calling all its subscribers with their schedulled messages; have a look at the paused
state in the code linked. If you have outstanding requests you can pass them a cancellation token and handle a 'hard' cancellation/socket abort that way.
Then comes special high priority task after it ends, all work is resumed. These two constraints, from what I observed, do not go along TPL well - I can't use something like Parallel.For because the collection is modified by results of task itself in realtime...
You probably need a pattern called pipes-and-filters. You pipe your input into a chain of workers (actors); each worker consumes from the other worker's output. Signalling is done using a control channel (in my case that is the inbox of the actor).
Upvotes: 3