Guillaume P.
Guillaume P.

Reputation: 163

Using TPL in order to make sequential order of actions by using continueWith clause

First of all I will explain what I am trying to do.

I have a component A which is using a component B.

In order to communicate between both of them, I need to use event.

One of my prerequisites here, is to let component B running asynchronously AND to run event handler in sequential order they've been called.

Besides, I would like to cancel the pipe of call (when user ask it). Thus all event handler called which are not executed yet will never do.

Solution to achieve on is TPL. I made a POC of what I'm trying to do :

    static void Main(string[] args)
    {
        var tokenSource = new CancellationTokenSource();
        var token = tokenSource.Token;

        var t = Task.Factory.StartNew(() => DoSomeWork(token));
                            //.ContinueWith((prevTask) => DoSomeWork(token));

        t.ContinueWith((prevTask) => DoSomeWork(token));

        Task.WaitAll(t);

        Console.WriteLine("Finish");

        Console.ReadKey();
    }

    static int id = 1;
    static void DoSomeWork(CancellationToken ct)
    {
        ct.ThrowIfCancellationRequested();

        Thread.Sleep(1000);
        
        Console.WriteLine(id++);
    }

There is the output of this snippet :

1

Finish

2

As you can see, it finishes before it really does. It displays 2 after Finish.

If I modify the previous code by this, it works :

        static void Main(string[] args)
    {
        var tokenSource = new CancellationTokenSource();
        var token = tokenSource.Token;

        var t = Task.Factory.StartNew(() => DoSomeWork(token))
                            .ContinueWith((prevTask) => DoSomeWork(token));

        //t.ContinueWith((prevTask) => DoSomeWork(token));

        Task.WaitAll(t);

        Console.WriteLine("Finish");

        Console.ReadKey();
    }

    static int id = 1;
    static void DoSomeWork(CancellationToken ct)
    {
        ct.ThrowIfCancellationRequested();

        Thread.Sleep(1000);
        
        Console.WriteLine(id++);
    }

There is the output of this snippet :

1

2

Finish

As you understand, I do not need to use the continueWith statement in task declaration, but when an event is raised.

Why Task.WaitAll(t); doesn't work one the first sample ?

Is anybody can help me ?

Upvotes: 0

Views: 486

Answers (2)

Jcl
Jcl

Reputation: 28272

The initial problem is that you are creating two tasks but waiting only on one.

// t is the "first" task
var t = Task.Factory.StartNew(() => DoSomeWork(token));
// the continuation task is not assigned
t.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(t); // <-- wait only on "t", which is the first task
Console.WriteLine("Finish"); // when the first task finishes, this gets printed
// now the continuation task is executing, but you are not waiting for it

What happens on the second snippet is that you are waiting on the continuation task, so it'll wait until it has finished

// t is now the continuation task
var t = Task.Factory.StartNew(() => DoSomeWork(token))
             .ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(t); // <-- wait till the continuation task has finished

So, the second method is ok, but if you want a finer control, just assign a task variable to wait on the continuation task:

// t is the "first" task
var t = Task.Factory.StartNew(() => DoSomeWork(token));
// The continuation task is assigned to "t2"
var t2 = t.ContinueWith((prevTask) => DoSomeWork(token));
Task.WaitAll(new [] { t, t2 } ); // <-- wait for all tasks
Console.WriteLine("Finish");

Note: I've followed your code for the examples, but WaitAll doesn't take a single task as a parameter (it takes an array of tasks), so that probably doesn't compile. You either use Task.Wait or pass an array to WaitAll

Upvotes: 1

Aron
Aron

Reputation: 15772

The correct way to do asynchronous coding in C# is to use the await keyword.

public async Task DoLotsOfWork()
{
    await DoSomeWorkAsync();
    await DoSomeMoreWorkAsync();
    Console.WriteLine("Finish");
}

You will have some issues with running that code from a Console app, so i would advise you use @StephenCleary's Task.AsyncEx library.

https://www.nuget.org/packages/Nito.AsyncEx/

You use it like this.

public void Main()
{
    AsyncContext.Run(DoLotsOfWork);
}

Further. There are very few reasons at all for using the Task.Run (or worse the Task.Factory.StartNew) methods. These run your method in the background as a Threadpool piece of work.

For example

private static async Task DoSomeWorkAsync(CancellationToken ct)
{
    await Task.Delay(TimeSpan.FromMilliseconds(1000), ct);
    Console.WriteLine(id++);
}

This will NOT run on any thread (thus not blocking any thread). Instead a timer/callback is created to cause the main thread to return to the second line after 1000 milliseconds

EDIT: To do this dynamically it is also pretty simple

public async Task DoLotsOfWork(IEnumerable<Func<Task>> tasks)
{
    foreach(var task in tasks)
        await task();
    Console.WriteLine("Finished");
}

If however you ask asking about methods which use the awful EAP pattern, I would advise you use Rx's Observable.FromEventPattern helper function.

public async Task SendEmail(MailMessage message)
{
    using(var smtp = new SmtpClient())
    {
        smtp.SendAsync(message);
        await Observable.FromEventPattern<>(x => smtp.SendCompleted +=x, x => smtp.SendCompleted -=x)
                  .ToTask()
    }
}

Futher EDIT:

public class Publisher
{
    public IObservable<CancelationToken> SomeEvent {get;}
}

public abstract class Subscriber
{
    public abstract IObservable<CancelationToken> Subscribe(IObservable<CancelationToken> observable);

}

IEnumerable<Subscriber> subscribers = ...
Publisher publisher = ...

IDisposable subscription = subscribers.Aggregate(publisher.SomeEvent, (e, sub) => sub.Subscribe(e)).Subscribe();

//Dispose the subscription when you want to stop listening.

Upvotes: 1

Related Questions