AaA
AaA

Reputation: 3694

Thread Safe decrement counter

Problem

In a project case I need to create multiple threads that are picking tasks from a queue and running them. Some of these tasks cannot run if a group of other tasks are still running. Consider something like file copy and defrag (runs when system is idle) in windows .

Solution

To implement this, I created a class based on System.Threading.CountdownEvent.

Whenever a thread picks a blocking task from queue, they will Increment the CounterEvent and after they finished their job they will Decrement the CounterEvent.

If a thread picks a low priority task, it will Wait until CounterEvent is zero then starts running.

A low priority taks can immediately start with Reset of CounterEvent

Main thread or a parallel thread can monitor the status of lock by querying the CurrentCount.

Here is the Code:

using System;
using System.Diagnostics.Contracts;
using System.Threading;


public class CounterEvent : IDisposable {

    private volatile int m_currentCount;
    private volatile bool m_disposed;
    private ManualResetEventSlim m_event;

    // Gets the number of remaining signals required to set the event.
    public int CurrentCount {
        get {
            return m_currentCount;
        }
    }

    // Allocate a thin event, Create a latch in signaled state.
    public CounterEvent() {
        m_currentCount = 0;

        m_event = new ManualResetEventSlim();
        m_event.Set(); // 
    }

    // Decrements the counter. if counter is zero signals other threads to continue
    public void Decrement() {
        ThrowIfDisposed();
        Contract.Assert(m_event != null);
        int newCount = 0;
        if (m_currentCount >= 0) {
            #pragma warning disable 0420
            newCount = Interlocked.Decrement(ref m_currentCount);
            #pragma warning restore 0420
        }
        if (newCount == 0) {
            m_event.Set();
        }
    }

    // increments the current count by one.
    public void Increment() {
        ThrowIfDisposed();
        #pragma warning disable 0420
        Interlocked.Increment(ref m_currentCount);
        #pragma warning restore 0420
    }

    // Resets the CurrentCount to the value of InitialCount.
    public void Reset() {
        ThrowIfDisposed();
        m_currentCount = 0;
        m_event.Set();
    }

    // Blocks the current thread until the System.Threading.CounterEvent is set.
    public void Wait() {
        ThrowIfDisposed();
        m_event.Wait();
    }

    /// <summary>
    /// Throws an exception if the latch has been disposed.
    /// </summary>
    private void ThrowIfDisposed() {
        if (m_disposed) {
            throw new ObjectDisposedException("CounterEvent");
        }
    }

    // According to MSDN this is not thread safe
    public void Dispose() {
        Dispose(true);
        GC.SuppressFinalize(this);
    }

    // According to MSDN Dispose() is not thread-safe.
    protected virtual void Dispose(bool disposing) {
        if (disposing) {
            m_event.Dispose();
            m_disposed = true;
        }
    }
}

Question

will this code work as expected? Any flaws that I didn't see in it? Is there any better option doing this?

Note

Application is written with System.Threading.Thread and cost of converting it for me is very high, however a great replacement solution always worth working on for future.

Upvotes: 1

Views: 878

Answers (1)

Muraad Nofal
Muraad Nofal

Reputation: 802

This should be one atomic operation and it is not threadsafe if you do it like this

if (m_currentCount >= 0) 
{
    newCount = Interlocked.Decrement(ref m_currentCount);
}

It may happen that m_currentCount is changed between the if and the Interlocked.Decrement You should rewrite your logic to use Interlocked.CompareExchange I would also use Interlocked.Exchange in every place where you assign to m_currentCount then you don´t need volatile and the pragma You should also be aware of that under very heavy load it can happen that a reset event Set is getting lost

Upvotes: 2

Related Questions