Reputation: 101150
I got the following code (which doesn't work very well in a multi threaded environment)
public class SomeClass
{
private readonly ConcurrentQueue<ISocketWriterJob> _writeQueue = new ConcurrentQueue<ISocketWriterJob>();
private ISocketWriterJob _currentJob;
public void Send(ISocketWriterJob job)
{
if (_currentJob != null)
{
_writeQueue.Enqueue(job);
return;
}
_currentJob = job;
_currentJob.Write(_writeArgs);
// The job is invoked asynchronously here
}
private void HandleWriteCompleted(SocketError error, int bytesTransferred)
{
// error checks etc removed for this sample.
if (_currentJob.WriteCompleted(bytesTransferred))
{
_currentJob.Dispose();
if (!_writeQueue.TryDequeue(out _currentJob))
{
_currentJob = null;
return;
}
}
_currentJob.Write(_writeArgs);
// the job is invoked asycnhronously here.
}
}
The Send method should invoke the job asynchronously if there isn't a current job being executed. It should enqueue the job if there is.
Putting a lock around the _currentJob
assignment/check would make everything work just fine. But are there a lock free way to solve it?
Update
I'm using a socket and it's SendAsync
method to send the information. Which means that I do not know if there is a write/job pending or not when the Send()
method is invoked.
Upvotes: 4
Views: 910
Reputation: 11657
I don't think that you will gain something from using lock-free techniques. Even with simple locks you'll be able to stay in user mode because Monitor.Enter
/Monitor.Exit
used spinning first and only if you'll wait longer in waiting state they'll transitioned into kernel mode.
This means that lock-based technique will perform as good as any lock-free technique, because you can lock only for storing job into the queue and getting it back from it, but you'll have much clear and robust code that every developer can understand:
public class SomeClass
{
// We don't have to use Concurrent collections
private readonly Queue<ISocketWriterJob> _writeQueue = new Queue<ISocketWriterJob>();
private readonly object _syncRoot = new object();
private ISocketWriterJob _currentJob;
public void Send(ISocketWriterJob job)
{
lock(_syncRoot)
{
if (_currentJob != null)
{
_writeQueue.Enqueue(job);
return;
}
_currentJob = job;
}
// Use job instead of shared state
StartJob(job);
}
private void StartJob(ISocketWriterJob job)
{
job.Write(_writeArgs);
// The job is invoked asynchronously here
}
private void HandleWriteCompleted(SocketError error, int bytesTransferred)
{
ISocketWriterJob currentJob = null;
// error checks etc removed for this sample.
lock(_syncRoot)
{
// I suppose this operation pretty fast as well as Dispose
if (_currentJob.WriteCompleted(bytesTransferred))
{
_currentJob.Dispose();
// There is no TryDequeue method in Queue<T>
// But we can easily add it using extension method
if (!_writeQueue.TryDequeue(out _currentJob))
{
// We don't have set _currentJob to null
// because we'll achieve it via out parameter
// _currentJob = null;
return;
}
}
// Storing current job for further work
currentJob = _currentJob;
}
StartJob(currentJob);
}
}
Lock-free is a optimization and like any other optimization you should measure performance first to make sure that you have an issue with your simple lock-based implementation and only if its true - use some lower level techniques like lock free. Performance and maintainability is a classical tradeoff and you should choose very carefully.
Upvotes: 0
Reputation: 13223
Consider using of CompareExchange
with hypothesis about intended state transitions. No need to use ConcurrentQueue since now we are in control of our synchronization.
Updated to use state machine
Updated again to remove unneeded Interlocked.Exchange
(for state assignment).
public class SomeClass
{
private readonly Queue<ISocketWriterJob> _writeQueue = new Queue<ISocketWriterJob>();
private ISocketWriterJob _currentJob;
private enum State { Idle, Active, Enqueue, Dequeue };
private State _state;
public void Send(ISocketWriterJob job)
{
bool spin = true;
while(spin)
{
switch(_state)
{
case State.Idle:
if (Interlocked.CompareExchange(ref _state, State.Active, State.Idle) == State.Idle)
{
spin = false;
}
// else consider new state
break;
case State.Active:
if (Interlocked.CompareExchange(ref _state, State.Enqueue, State.Active) == State.Active)
{
_writeQueue.Enqueue(job);
_state = State.Active;
return;
}
// else consider new state
break;
case State.Enqueue:
case State.Dequeue:
// spin to wait for new state
Thread.Yield();
break;
}
}
_currentJob = job;
_currentJob.Write(_writeArgs);
// The job is invoked asynchronously here
}
private void HandleWriteCompleted(SocketError error, int bytesTransferred)
{
// error checks etc removed for this sample.
if (_currentJob.WriteCompleted(bytesTransferred))
{
_currentJob.Dispose();
bool spin = true;
while(spin)
{
switch(_state)
{
case State.Active:
if (Interlocked.CompareExchange(ref _state, State.Dequeue, State.Active) == State.Active)
{
if (!_writeQueue.TryDequeue(out _currentJob))
{
// handle in state _currentJob = null;
_state = State.Idle;
return;
}
else
{
_state = State.Active;
}
}
// else consider new state
break;
case State.Enqueue:
// spin to wait for new state
Thread.Yield();
break;
// impossible states
case State.Idle:
case State.Dequeue:
break;
}
}
}
_logger.Debug(_writeArgs.GetHashCode() + ": writing more ");
_currentJob.Write(_writeArgs);
// the job is invoked asycnhronously here.
}
}
Upvotes: 4
Reputation: 50114
At the moment the split between your producer and consumer is a little fuzzy; you have "produce a job into a queue or consume it immediately" and "consume a job from the queue or quit if there isn't one"; it would be clearer as "produce a job into a queue" and "consume a job from the queue (initially)" and "consume a job from the queue (once a job finishes").
The trick here is to use a BlockingCollection
so you can wait for a job to appear:
BlockingCollection<ISocketWriterJob> _writeQueue =
new BlockingCollection<ISocketWriterJob>();
Let threads calling Send
literally just queue a job:
public void Send(ISocketWriterJob job)
{
_writeQueue.Add(job);
}
Then have another thread that just consumes jobs.
public void StartConsumingJobs()
{
// Get the first job or wait for one to be queued.
_currentJob = _writeQueue.Take();
// Start job
}
private void HandleWriteCompleted(SocketError error, int bytesTransferred)
{
if (_currentJob.WriteCompleted(bytesTransferred))
{
_currentJob.Dispose();
// Get next job, or wait for one to be queued.
_currentJob = _writeQueue.Take();
}
_currentJob.Write(_writeArgs);
// Start/continue job as before
}
Upvotes: 1