Jesse de Wit
Jesse de Wit

Reputation: 4177

Monitor.Pulse() with a condition

I've got a class, which should be thread safe. I preferably want to manage thread safety with a single synchronization object in order to avoid complicated mindbreakers, since all methods alter object state variables. So I wrap method bodies with a lock statement on that object. There are scenario's where the lock needs to be released for a while in order to allow another thread to update the state. So far so good, just use Monitor.Wait() and Monitor.Pulse(). However, I would like to 'Pulse' with a condition. In the code below, I want to send a 'Pulse' only to a thread waiting in the 'Send()' method. And similarly, send a 'Pulse' only to a thread waiting in the 'Receive()' method.

So summarizing:

I've tried many things, including Monitor, Semaphore and WaitHandle combinations, queues with WaitHandles and more creative options. Also, I've been playing with multiple synchronization objects. But in each scenario I only get parts of the functionality to work.

Below code is the closest I've gotten. The TODO comments show what is wrong with the code.

public class Socket
{
    public class Item { }

    private object sync = new object();
    private ManualResetEvent receiveAvailable = new ManualResetEvent(false);

    private Queue<Item> receiveQueue = new Queue<Item>();

    // used by client, from any thread
    public void Send(Item item, CancellationToken token)
    {
        lock (this.sync)
        {
            // sends the message somewhere and should await confirmation.
            // note that the confirmation order matters.

            // TODO: Should only continue on notification from 'NotifySent()', and respect the cancellation token
            Monitor.Wait(this.sync); 
        }
    }

    // used by client, from any thread
    public Item Receive(CancellationToken token)
    {
        lock (this.sync)
        {
            if (!this.receiveAvailable.WaitOne(0))
            {
                // TODO: Should only be notified by 'EnqueueReceived()' method, and respect the cancellation token.
                Monitor.Wait(this.sync);
            }

            var item = this.receiveQueue.Dequeue();
            if (this.receiveQueue.Count == 0)
            {
                this.receiveAvailable.Reset();
            }

            return item;
        }
    }

    // used by internal worker thread
    internal void NotifySent()
    {
        lock (this.sync)
        {
            // Should only notify the Send() method.
            Monitor.Pulse(this.sync);
        }
    }

    // used by internal worker thread
    internal void EnqueueReceived(Item item)
    {
        lock (this.sync)
        {
            this.receiveQueue.Enqueue(item);
            this.receiveAvailable.Set();

            // TODO: Should only notify the 'Receive()' method.
            Monitor.Pulse(this.sync);
        }
    }
}

SIDENOTE: In python, my requirement is possible using a threading.Condition (ignoring the CancellationToken). Perhaps a similar construct in available in C#?

class Socket(object):
    def __init__(self):
        self.sync = threading.RLock()
        self.receive_queue = collections.deque()
        self.send_ready = threading.Condition(self.sync)
        self.receive_ready = threading.Condition(self.sync)

    def send(self, item):
        with self.send_ready:
            // send the message
            self.send_ready.wait()

    def receive(self):
        with self.receive_ready:
            try:
                return self.receive_queue.popleft()
            except IndexError:
                self.receive_ready.wait()
            return self.receive_queue.popleft()

    def notify_sent(self):
        with self.sync:
            self.send_ready.notify()

    def enqueue_received(self, item):
        with self.sync:
            self.receive_queue.append(item)
            self.receive_ready.notify()

Upvotes: 3

Views: 461

Answers (2)

Stephen Cleary
Stephen Cleary

Reputation: 456477

What you're looking for is Condition Variables, which are not directly exposed in any .NET APIs. The Monitor is the closest built-in type to what you're looking for, which is a Mutex combined with a single Condition Variable.

The standard way of solving this in .NET is to always re-check the condition (on the waiting side) before continuing. This is also necessary to handle spurious wakeups, which can happen for all Condition Variable-based solutions.

Thus:

// Note: 'while', not 'if'
while (!this.receiveAvailable.WaitOne(0))
{
  Monitor.Wait(this.sync);
}

Etc.

In .NET, since you don't have Condition Variables, you will have more spurious wakeups than if you had designated conditions, but even in the designated conditions scenario, spurious wakeups can happen.

Upvotes: 1

Jesse de Wit
Jesse de Wit

Reputation: 4177

I believe I have found a solution to my problem thanks to your comments. I decided to separate the state variables to an external class, so locking in the socket and managing thread safety on the client side becomes easier. This way I can manage the state variables myself in a single thread (In a seperate class, not shown in below code).

Here's the combined solution I've come up with:

public class Socket
{
    public class Item { }

    private class PendingSend
    {
        public ManualResetEventSlim ManualResetEvent { get; set; }
        public bool Success { get; set; }
        public string Message { get; set; }
        public Exception InnerException { get; set; }
    }

    private readonly object sendLock = new object();
    private readonly object receiveLock = new object();
    private readonly ManualResetEventSlim receiveAvailable
        = new ManualResetEventSlim(false);
    private readonly SemaphoreSlim receiveSemaphore 
        = new SemaphoreSlim(1, 1);

    private readonly ConcurrentQueue<Item> sendQueue
        = new ConcurrentQueue<Item>();
    private readonly ConcurrentQueue<PendingSend> pendingSendQueue
        = new ConcurrentQueue<PendingSend>();
    private readonly ConcurrentQueue<Item> receiveQueue
        = new ConcurrentQueue<Item>();

    // Called from any client thread.
    public void Send(Item item, CancellationToken token)
    {
        // initialize handle to wait for.
        using (var handle = new ManualResetEventSlim(false))
        {
            var pendingSend = new PendingSend
            {
                ManualResetEvent = handle
            };

            // Make sure the item and pendingSend are put in the same order.
            lock (this.sendLock)
            {
                this.sendQueue.Enqueue(item);
                this.pendingSendQueue.Enqueue(pendingSend);
            }

            // Wait for the just created send handle to notify.
            // May throw operation cancelled, in which case the message is
            // still enqueued... Maybe fix that later.
            handle.Wait(token);

            if (!pendingSend.Success)
            {
                // Now we actually have information why the send 
                // failed. Pretty cool.
                throw new CommunicationException(
                    pendingSend.Message, 
                    pendingSend.InnerException);
            }
        }
    }

    // Called by internal worker thread.
    internal Item DequeueForSend()
    {
        this.sendQueue.TryDequeue(out Item result);

        // May return null, that's fine
        return result;
    }

    // Called by internal worker thread, in the same order items are dequeued.
    internal void SendNotification(
        bool success,
        string message,
        Exception inner)
    {
        if (!this.pendingSendQueue.TryDequeue(out PendingSend result))
        {
            // TODO: Notify a horrible bug has occurred.
        }

        result.Success = success;
        result.Message = message;
        result.InnerException = inner;

        // Releases that waithandle in the Send() method.
        // The 'PendingSend' instance now contains information about the send.
        result.ManualResetEvent.Set();
    }

    // Called by any client thread.
    public Item Receive(CancellationToken token)
    {
        // This makes sure clients fall through one by one.
        this.receiveSemaphore.Wait(token);

        try
        {
            // This makes sure a message is available.
            this.receiveAvailable.Wait(token);

            if (!this.receiveQueue.TryDequeue(out Item result))
            {
                // TODO: Log a horrible bug has occurred.
            }

            // Make sure the count check and the reset happen in a single go.
            lock (this.receiveLock)
            {
                if (this.receiveQueue.Count == 0)
                {
                    this.receiveAvailable.Reset();
                }
            }

            return result;
        }
        finally
        {
            // make space for the next receive
            this.receiveSemaphore.Release();
        }
    }

    // Called by internal worker thread.
    internal void EnqueueReceived(Item item)
    {
        this.receiveQueue.Enqueue(item);

        // Make sure the set and reset don't intertwine
        lock (this.receiveLock)
        {
            this.receiveAvailable.Set();
        }
    }
}

Upvotes: 0

Related Questions