whoami
whoami

Reputation: 1899

Cancel operation right away before going through long running operations?

I am using AsParallel combined with WithDegreeOfParallelism and WithCancellation in following way

AsParallel().WithCancellation(cs.Token).WithDegreeOfParallelism(2)

This is my understanding about this. Only two of the incoming sequence will be processed at a time. Once one of the request completes, then more items will be processed. However, if cancellation request is initiated, than those items from incoming queue that are not been picked up yet will be processed at all. Based on this understanding, I have created following code.

class Employee
    {
        public int ID { get; set;}
        public string FirstName { get; set;}
        public string LastName { get; set;}
    }

    class Program
    {

        private static List<Employee> _Employees;
        static CancellationTokenSource cs = new CancellationTokenSource();
        static Random rand = new Random();

        static void Main(string[] args)
        {
            _Employees = new List<Employee>() 
            {
                new Employee() { ID = 1, FirstName = "John", LastName = "Doe" },
                new Employee() { ID = 2, FirstName = "Peter", LastName = "Saul" },
                new Employee() { ID = 3, FirstName = "Mike", LastName = "Sue" },
                new Employee() { ID = 4, FirstName = "Catherina", LastName = "Desoza" },
                new Employee() { ID = 5, FirstName = "Paul", LastName = "Smith" },
                new Employee() { ID = 6, FirstName = "Paul2", LastName = "Smith" },
                new Employee() { ID = 7, FirstName = "Paul3", LastName = "Smith" },
                new Employee() { ID = 8, FirstName = "Paul4", LastName = "Smith" },
                new Employee() { ID = 9, FirstName = "Paul5", LastName = "Smith" },
                new Employee() { ID = 10, FirstName = "Paul6", LastName = "Smith" },
                new Employee() { ID = 5, FirstName = "Paul7", LastName = "Smith" }
            };

            try
            {
                var tasks = _Employees.AsParallel().WithCancellation(cs.Token).WithDegreeOfParallelism(2).Select(x => ProcessThisEmployee(x, cs.Token)).ToArray();
                Console.WriteLine("Now waiting");
                Thread.Sleep(1000);
                cs.Cancel();
                Task.WaitAll(tasks);
            }
            catch (AggregateException ae)
            {
                // error handling code
                Console.WriteLine("something bad happened");
            }
            catch (Exception ex)
            {
                // error handling code
                Console.WriteLine("something even worst happened");
            }
            // other stuff
            Console.WriteLine("All Done");
        }

        private static async Task ProcessThisEmployee(Employee x, CancellationToken token)
        {
            if (token.IsCancellationRequested)
            {
                Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled", System.Threading.Thread.CurrentThread.ManagedThreadId));
                return;
            }
            int Sleep = rand.Next(800, 2000);
            Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Sleeping for {2}", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID, Sleep));
            await TaskEx.Run(() => System.Threading.Thread.Sleep(Sleep));

            Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} finished", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID));
        }

    }

Here is output when I ran it.

ThreadID = 3 -> Employee 1 -> Sleeping for 1058
ThreadID = 1 -> Employee 7 -> Sleeping for 1187
ThreadID = 1 -> Employee 8 -> Sleeping for 1296
ThreadID = 1 -> Employee 9 -> Sleeping for 1614
ThreadID = 1 -> Employee 10 -> Sleeping for 1607
ThreadID = 1 -> Employee 5 -> Sleeping for 1928
ThreadID = 3 -> Employee 2 -> Sleeping for 1487
ThreadID = 3 -> Employee 3 -> Sleeping for 1535
ThreadID = 3 -> Employee 4 -> Sleeping for 1265
ThreadID = 3 -> Employee 5 -> Sleeping for 1248
ThreadID = 3 -> Employee 6 -> Sleeping for 807
Now waiting
ThreadID = 3 -> Employee 6 finished
ThreadID = 4 -> Employee 1 finished
ThreadID = 5 -> Employee 7 finished
ThreadID = 6 -> Employee 8 finished
ThreadID = 3 -> Employee 5 finished
ThreadID = 4 -> Employee 9 finished
ThreadID = 5 -> Employee 10 finished
ThreadID = 6 -> Employee 5 finished
ThreadID = 3 -> Employee 4 finished
ThreadID = 7 -> Employee 2 finished
ThreadID = 8 -> Employee 3 finished
All Done

Here are my issues (according to my understanding of things).

  1. I was expecting that for some employees ProcessThisEmployee will not be called at all because it will be cancelled but its called for all employees
  2. Even if ProcessThisEmployee method is called, it will go through following code path that is also not happening

    if ( token.IsCancellationRequested )
    {
        Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled",System.Threading.Thread.CurrentThread.ManagedThreadId));
        return;
    }
    

So then I changed ProcessThisEmployee, basically moved the token.IsCancellationRequested message after Sleep as follows.

private static async Task ProcessThisEmployee(Employee x, CancellationToken token)
{

    int Sleep = rand.Next(800, 2000);
    Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Sleeping for {2}", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID, Sleep));
    await TaskEx.Run(() => System.Threading.Thread.Sleep(Sleep));
    if (token.IsCancellationRequested)
    {
        Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} -> Cancelled", System.Threading.Thread.CurrentThread.ManagedThreadId));
        return;
    }
    Console.WriteLine(string.Format("ThreadID = {0} -> Employee {1} finished", System.Threading.Thread.CurrentThread.ManagedThreadId, x.ID));
}

Now I get following output.

ThreadID = 3 -> Employee 1 -> Sleeping for 1330  
ThreadID = 1 -> Employee 7 -> Sleeping for 1868  
ThreadID = 3 -> Employee 2 -> Sleeping for 903  
ThreadID = 3 -> Employee 3 -> Sleeping for 1241  
ThreadID = 3 -> Employee 4 -> Sleeping for 1367  
ThreadID = 3 -> Employee 5 -> Sleeping for 1007  
ThreadID = 3 -> Employee 6 -> Sleeping for 923  
ThreadID = 1 -> Employee 8 -> Sleeping for 1032  
ThreadID = 1 -> Employee 9 -> Sleeping for 1948  
ThreadID = 1 -> Employee 10 -> Sleeping for 1456  
ThreadID = 1 -> Employee 5 -> Sleeping for 1737  
Now waiting  
ThreadID = 5 -> Employee 2 finished  
ThreadID = 3 -> Employee 6 finished  
something bad happened  
All Done  

My question is what am I mis-understanding about this workflow. I basically would like to cancel the operation as soon as possible without going through long running operation (Sleep is just an example in this case but it could be something really expensive)

Upvotes: 0

Views: 73

Answers (1)

Dirk
Dirk

Reputation: 10958

There are a few problems with that code:

1.) ToArray() materializes the sequence, i.e. it will only return after all inputs from the source sequence have been gone through the Select(...).

Since you call cs.Cancel() after that it won't trigger the token.IsCancellationRequested immediately at the start of the ProcessThisEmployee

2.) WithDegreeOfParallelism(2).Select(x => ProcessThisEmployee(x, cs.Token)) looks good but in fact isn't really doing what you want it to do since ProcessThisEmployee is an async method that returns as soon as the first return or the first await is reached.

What you probably wanted to do is to execute the long running ProcessThisEmployee method with only 2 degrees of parallelism. What you actually do is create a bunch of Tasks with only 2 degrees of parallelism. The tasks itself all run concurrently after that.

I don't know how to fix this for your particular case, as I don't know the context. But maybe this already helps you a bit.


Update to reply to your comment: I am doing ToArray and ProcessThisEmployee is an async method because this code will become part of library and could be used from WPF application. End user may wants to provide updates on the UI, so I don't want to block until operation completes (john smith)

Don't write async wrappers for things that aren't asynchronous by nature, i.e. mostly file, network or database access. If the developer using a library wants to call something in an async context he can still do an await Task.Run(...). For more information about this you can take a look at this article about whether you should expose asynchronous wrappers for synchronous methods.

In my eyes PLINQ is mostly useful if you already have a working LINQ query, and want to speed it up because that query would be suited for parallel processing.

What could be the easiest way in your case might be a work queue using 2 threads. I'm pretty sure there are examples of these in the web.

Upvotes: 2

Related Questions