Clock
Clock

Reputation: 984

Speed processing improvement by using tasks

I have the following code:

class Program
{
    class ProcessedEven
    {
        public int ProcessedInt { get; set; }

        public DateTime ProcessedValue { get; set; }
    }

    class ProcessedOdd
    {
        public int ProcessedInt { get; set; }

        public string ProcessedValue { get; set; }
    }

    static void Main(string[] args)
    {
        Stopwatch stopwatch = new Stopwatch();

        IEnumerator<int> enumerator = Enumerable.Range(0, 100000).GetEnumerator();
        Dictionary<int, ProcessedOdd> processedOddValuesDictionary = new Dictionary<int, ProcessedOdd>();
        Dictionary<int, ProcessedEven> processedEvenValuesDictionary = new Dictionary<int, ProcessedEven>();

        stopwatch.Start();

        while (enumerator.MoveNext())
        {
            int currentNumber = enumerator.Current;

            if (currentNumber % 2 == 0)
            {
                Task.Run(() =>
                {
                    ProcessedEven processedEven =
                        new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
                    await Task.Delay(100);

                    processedEvenValuesDictionary.Add(currentNumber, processedEven);
                });
            }
            else
            {
                Task.Run(() =>
                {
                    ProcessedOdd processedOdd =
                        new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
                    await Task.Delay(100);

                    processedOddValuesDictionary.Add(currentNumber, processedOdd);
                });
            }
        }

        stopwatch.Stop();

        Console.WriteLine(stopwatch.Elapsed.TotalSeconds);

        Console.ReadKey();
    }

So basically I have to iterate over an enumerator that is all the time synchronous.

Once the current value from the iterator is taken it gets processed, somehow in a way that takes a long time. After is processed depending on its value is being added to a dictionary. So in the end the dictionaries have to be populated with the right values.

In order to improve the speed I thought that introducing some parallelism could help, but after adding the "Task.Run" calls some

"System.NullReferenceException: 'Object reference not set to an instance of an object"

exceptions occured. Also the execution time increased compared to the "synchronous" version of this code (the one without the "Task.Run" calls).

I do not understand why these exceptions are being raised since everything seems not to be null.

Is there a way to improve the speed in this scenario (original code is without the "Task.Run" calls) by using multi threading ?

Should the addition of the processed elements to the dictionaries be done inside a lock statement, since the dictionaries seems to be shared between the tasks ?

Upvotes: 3

Views: 505

Answers (3)

Theodor Zoulias
Theodor Zoulias

Reputation: 43384

The specific reason you are getting the NullReferenceException is because the internal state of a Dictionary container became corrupted. Probably two threads have tried to resize the two internal arrays of a Dictionary in parallel, or something else equally nasty. Actually you are lucky that you get these exceptions, because a far worse outcome would be to have a working program that produces incorrect results.

The more general reason of this issue is that you have allowed parallel ansynchronized access to thread-unsafe objects. The Dictionary class, as most built-in .NET classes, is not thread-safe. It is implemented with the assumption that will be accessed by a single thread (or at least by one thread at a time). It contains no internal synchronization. The reason is that adding synchronization in a class incures API complexity and performance overhead, and there is no reason to pay this overhead every time you use this class, when it's only going to be needed in a few special cases.

There are many solution to your problem. One is to keep using the thread-unsafe Dictionary, but ensure that it will be accessed exclusively by using locks. This is the most flexible solution, but you need to be very careful to not allow even a single unprotected code path to the object. Accessing every property and every method, either reading or writing to it, must be inside a lock. So this is flexible but fragile, and may become a performance bottleneck in case of heavy contention (I.e. too many threads are requesting the exclusive lock concurrently, and are forced to wait in a line).

Another solution is to use a thread-safe container like the ConcurrentDictionary. This class ensures that its internal state will never become corrupted when accessed by multiple threads in parallel. Unfortunately it ensures nothing about the rest of the state of your program. So it is suitable for some simple cases that you have no other shared state except from the dictionary itself. In these case it offer performance improvements because it is implemented with granular internal locking (there are multiple locks, one for every segment of data).

The best solution is to remove the need for thread synchronization alltogether by eliminating the shared state. Just let each thread work with its internal isolated subset or data, and only merge these subsets when all threads have been completed. This usually offers the best performance, at the cost of having to partition the initial workload and then write the final merging code. There are libraries that follow this strategy but are dealing with all this boilerplate, allowing you to write as fewer code as possible. One of the best is the TPL Dataflow library, which is actually embedded in the .NET Core platform. For .NET Framework you need to install a package to use it.

Upvotes: 2

Artur
Artur

Reputation: 5522

You are creating a lot of small tasks and exhausting your thread pool by calling Task.Run. You better use Parallel.ForEach for better performance. And as @user1672994 said you should use thread safe version of Dictionary - ConcurrentDictionary

static void Main(string[] args)
{
    Stopwatch stopwatch = new Stopwatch();

    IEnumerable<int> enumerable = Enumerable.Range(0, 100000);
    ConcurrentDictionary<int, ProcessedOdd> processedOddValuesDictionary = new ConcurrentDictionary<int, ProcessedOdd>();
    ConcurrentDictionary<int, ProcessedEven> processedEvenValuesDictionary = new ConcurrentDictionary<int, ProcessedEven>();

    stopwatch.Start();

    Parallel.ForEach(enumerable,
        currentNumber =>
            {
                if (currentNumber % 2 == 0)
                {
                    ProcessedEven processedEven =
                        new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
                    // Task.Delay(100);

                    processedEvenValuesDictionary.TryAdd(currentNumber, processedEven);
                }
                else
                {
                    ProcessedOdd processedOdd =
                        new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
                    // Task.Delay(100);

                    processedOddValuesDictionary.TryAdd(currentNumber, processedOdd);
                }
            });

    stopwatch.Stop();

    Console.WriteLine(stopwatch.Elapsed.TotalSeconds);

    Console.ReadKey();
}

I also don;t understand why you need Task.Delay(100) in your code. Anyway it is async operation that without await operator would do something that you probably don't expect. Ether use await or use sync version Thread.Sleep(100)

Upvotes: 4

user1672994
user1672994

Reputation: 10839

You should use ConcurrentDictionary which is a thread-safe collection of key/value pairs that can be accessed by multiple threads concurrently.

ConcurrentDictionary is designed for multithreaded scenarios. You do not have to use locks in your code to add or remove items from the collection. However, it is always possible for one thread to retrieve a value, and another thread to immediately update the collection by giving the same key a new value.

When I ran your code after changing Dictionary to ConcurrentDictionary then code runs without NullReferenceException and finished in ~1.37 seconds.

Full Code :

    class Program
    {
        class ProcessedEven
        {
            public int ProcessedInt { get; set; }

            public DateTime ProcessedValue { get; set; }
        }

        class ProcessedOdd
        {
            public int ProcessedInt { get; set; }

            public string ProcessedValue { get; set; }
        }

        static void Main(string[] args)
        {
            Stopwatch stopwatch = new Stopwatch();

            IEnumerator<int> enumerator = Enumerable.Range(0, 100000).GetEnumerator();
            ConcurrentDictionary<int, ProcessedOdd> processedOddValuesDictionary = new ConcurrentDictionary<int, ProcessedOdd>();
            ConcurrentDictionary<int, ProcessedEven> processedEvenValuesDictionary = new ConcurrentDictionary<int, ProcessedEven>();

            stopwatch.Start();

            while (enumerator.MoveNext())
            {
                int currentNumber = enumerator.Current;

                if (currentNumber % 2 == 0)
                {
                    Task.Run(() =>
                    {
                        ProcessedEven processedEven =
                            new ProcessedEven { ProcessedInt = currentNumber, ProcessedValue = DateTime.Now.AddMinutes(currentNumber) };
                        Task.Delay(100);

                        processedEvenValuesDictionary.TryAdd(currentNumber, processedEven);
                    });
                }
                else
                {
                    Task.Run(() =>
                    {
                        ProcessedOdd processedOdd =
                            new ProcessedOdd { ProcessedInt = currentNumber, ProcessedValue = Math.Pow(currentNumber, 4).ToString() };
                        Task.Delay(100);

                        processedOddValuesDictionary.TryAdd(currentNumber, processedOdd);
                    });
                }
            }

            stopwatch.Stop();

            Console.WriteLine(stopwatch.Elapsed.TotalSeconds);

            Console.ReadKey();
        }
    }

enter image description here

Upvotes: 3

Related Questions