Reputation: 4177
I've got a class, which should be thread safe. I preferably want to manage thread safety with a single synchronization object in order to avoid complicated mindbreakers, since all methods alter object state variables. So I wrap method bodies with a lock statement on that object.
There are scenario's where the lock needs to be released for a while in order to allow another thread to update the state. So far so good, just use Monitor.Wait()
and Monitor.Pulse()
.
However, I would like to 'Pulse' with a condition. In the code below, I want to send a 'Pulse' only to a thread waiting in the 'Send()' method. And similarly, send a 'Pulse' only to a thread waiting in the 'Receive()' method.
So summarizing:
CancellationToken
as well, to cancel the wait. I've tried many things, including Monitor, Semaphore and WaitHandle combinations, queues with WaitHandles and more creative options. Also, I've been playing with multiple synchronization objects. But in each scenario I only get parts of the functionality to work.
Below code is the closest I've gotten. The TODO comments show what is wrong with the code.
public class Socket
{
public class Item { }
private object sync = new object();
private ManualResetEvent receiveAvailable = new ManualResetEvent(false);
private Queue<Item> receiveQueue = new Queue<Item>();
// used by client, from any thread
public void Send(Item item, CancellationToken token)
{
lock (this.sync)
{
// sends the message somewhere and should await confirmation.
// note that the confirmation order matters.
// TODO: Should only continue on notification from 'NotifySent()', and respect the cancellation token
Monitor.Wait(this.sync);
}
}
// used by client, from any thread
public Item Receive(CancellationToken token)
{
lock (this.sync)
{
if (!this.receiveAvailable.WaitOne(0))
{
// TODO: Should only be notified by 'EnqueueReceived()' method, and respect the cancellation token.
Monitor.Wait(this.sync);
}
var item = this.receiveQueue.Dequeue();
if (this.receiveQueue.Count == 0)
{
this.receiveAvailable.Reset();
}
return item;
}
}
// used by internal worker thread
internal void NotifySent()
{
lock (this.sync)
{
// Should only notify the Send() method.
Monitor.Pulse(this.sync);
}
}
// used by internal worker thread
internal void EnqueueReceived(Item item)
{
lock (this.sync)
{
this.receiveQueue.Enqueue(item);
this.receiveAvailable.Set();
// TODO: Should only notify the 'Receive()' method.
Monitor.Pulse(this.sync);
}
}
}
SIDENOTE:
In python, my requirement is possible using a threading.Condition
(ignoring the CancellationToken
). Perhaps a similar construct in available in C#?
class Socket(object):
def __init__(self):
self.sync = threading.RLock()
self.receive_queue = collections.deque()
self.send_ready = threading.Condition(self.sync)
self.receive_ready = threading.Condition(self.sync)
def send(self, item):
with self.send_ready:
// send the message
self.send_ready.wait()
def receive(self):
with self.receive_ready:
try:
return self.receive_queue.popleft()
except IndexError:
self.receive_ready.wait()
return self.receive_queue.popleft()
def notify_sent(self):
with self.sync:
self.send_ready.notify()
def enqueue_received(self, item):
with self.sync:
self.receive_queue.append(item)
self.receive_ready.notify()
Upvotes: 3
Views: 461
Reputation: 456477
What you're looking for is Condition Variables, which are not directly exposed in any .NET APIs. The Monitor
is the closest built-in type to what you're looking for, which is a Mutex combined with a single Condition Variable.
The standard way of solving this in .NET is to always re-check the condition (on the waiting side) before continuing. This is also necessary to handle spurious wakeups, which can happen for all Condition Variable-based solutions.
Thus:
// Note: 'while', not 'if'
while (!this.receiveAvailable.WaitOne(0))
{
Monitor.Wait(this.sync);
}
Etc.
In .NET, since you don't have Condition Variables, you will have more spurious wakeups than if you had designated conditions, but even in the designated conditions scenario, spurious wakeups can happen.
Upvotes: 1
Reputation: 4177
I believe I have found a solution to my problem thanks to your comments. I decided to separate the state variables to an external class, so locking in the socket and managing thread safety on the client side becomes easier. This way I can manage the state variables myself in a single thread (In a seperate class, not shown in below code).
Here's the combined solution I've come up with:
public class Socket
{
public class Item { }
private class PendingSend
{
public ManualResetEventSlim ManualResetEvent { get; set; }
public bool Success { get; set; }
public string Message { get; set; }
public Exception InnerException { get; set; }
}
private readonly object sendLock = new object();
private readonly object receiveLock = new object();
private readonly ManualResetEventSlim receiveAvailable
= new ManualResetEventSlim(false);
private readonly SemaphoreSlim receiveSemaphore
= new SemaphoreSlim(1, 1);
private readonly ConcurrentQueue<Item> sendQueue
= new ConcurrentQueue<Item>();
private readonly ConcurrentQueue<PendingSend> pendingSendQueue
= new ConcurrentQueue<PendingSend>();
private readonly ConcurrentQueue<Item> receiveQueue
= new ConcurrentQueue<Item>();
// Called from any client thread.
public void Send(Item item, CancellationToken token)
{
// initialize handle to wait for.
using (var handle = new ManualResetEventSlim(false))
{
var pendingSend = new PendingSend
{
ManualResetEvent = handle
};
// Make sure the item and pendingSend are put in the same order.
lock (this.sendLock)
{
this.sendQueue.Enqueue(item);
this.pendingSendQueue.Enqueue(pendingSend);
}
// Wait for the just created send handle to notify.
// May throw operation cancelled, in which case the message is
// still enqueued... Maybe fix that later.
handle.Wait(token);
if (!pendingSend.Success)
{
// Now we actually have information why the send
// failed. Pretty cool.
throw new CommunicationException(
pendingSend.Message,
pendingSend.InnerException);
}
}
}
// Called by internal worker thread.
internal Item DequeueForSend()
{
this.sendQueue.TryDequeue(out Item result);
// May return null, that's fine
return result;
}
// Called by internal worker thread, in the same order items are dequeued.
internal void SendNotification(
bool success,
string message,
Exception inner)
{
if (!this.pendingSendQueue.TryDequeue(out PendingSend result))
{
// TODO: Notify a horrible bug has occurred.
}
result.Success = success;
result.Message = message;
result.InnerException = inner;
// Releases that waithandle in the Send() method.
// The 'PendingSend' instance now contains information about the send.
result.ManualResetEvent.Set();
}
// Called by any client thread.
public Item Receive(CancellationToken token)
{
// This makes sure clients fall through one by one.
this.receiveSemaphore.Wait(token);
try
{
// This makes sure a message is available.
this.receiveAvailable.Wait(token);
if (!this.receiveQueue.TryDequeue(out Item result))
{
// TODO: Log a horrible bug has occurred.
}
// Make sure the count check and the reset happen in a single go.
lock (this.receiveLock)
{
if (this.receiveQueue.Count == 0)
{
this.receiveAvailable.Reset();
}
}
return result;
}
finally
{
// make space for the next receive
this.receiveSemaphore.Release();
}
}
// Called by internal worker thread.
internal void EnqueueReceived(Item item)
{
this.receiveQueue.Enqueue(item);
// Make sure the set and reset don't intertwine
lock (this.receiveLock)
{
this.receiveAvailable.Set();
}
}
}
Upvotes: 0