jgauffin
jgauffin

Reputation: 101150

Multiple producers / single consumer, processing async without locks?

I got the following code (which doesn't work very well in a multi threaded environment)

public class SomeClass
{
    private readonly ConcurrentQueue<ISocketWriterJob> _writeQueue = new ConcurrentQueue<ISocketWriterJob>();
    private ISocketWriterJob _currentJob;

    public void Send(ISocketWriterJob job)
    {
        if (_currentJob != null)
        {
            _writeQueue.Enqueue(job);
            return;
        }

        _currentJob = job;
        _currentJob.Write(_writeArgs);

        // The job is invoked asynchronously here
    }

    private void HandleWriteCompleted(SocketError error, int bytesTransferred)
    {
        // error checks etc removed for this sample.

        if (_currentJob.WriteCompleted(bytesTransferred))
        {
            _currentJob.Dispose();
            if (!_writeQueue.TryDequeue(out _currentJob))
            {
                _currentJob = null;
                return;
            }
        }

        _currentJob.Write(_writeArgs);

        // the job is invoked asycnhronously here.
    }
}

The Send method should invoke the job asynchronously if there isn't a current job being executed. It should enqueue the job if there is.

Putting a lock around the _currentJob assignment/check would make everything work just fine. But are there a lock free way to solve it?

Update

I'm using a socket and it's SendAsync method to send the information. Which means that I do not know if there is a write/job pending or not when the Send() method is invoked.

Upvotes: 4

Views: 910

Answers (3)

Sergey Teplyakov
Sergey Teplyakov

Reputation: 11657

I don't think that you will gain something from using lock-free techniques. Even with simple locks you'll be able to stay in user mode because Monitor.Enter/Monitor.Exit used spinning first and only if you'll wait longer in waiting state they'll transitioned into kernel mode.

This means that lock-based technique will perform as good as any lock-free technique, because you can lock only for storing job into the queue and getting it back from it, but you'll have much clear and robust code that every developer can understand:

public class SomeClass
{
    // We don't have to use Concurrent collections
    private readonly Queue<ISocketWriterJob> _writeQueue = new Queue<ISocketWriterJob>();
    private readonly object _syncRoot = new object();
    private ISocketWriterJob _currentJob;

    public void Send(ISocketWriterJob job)
    {
        lock(_syncRoot)
        {
            if (_currentJob != null)
            {
                _writeQueue.Enqueue(job);
                return;
            }
            _currentJob = job;
        }

        // Use job instead of shared state
        StartJob(job);
    }

    private void StartJob(ISocketWriterJob job)
    {
       job.Write(_writeArgs);
       // The job is invoked asynchronously here
    }

    private void HandleWriteCompleted(SocketError error, int bytesTransferred)
    {
        ISocketWriterJob currentJob = null;

        // error checks etc removed for this sample.
        lock(_syncRoot)
        {
           // I suppose this operation pretty fast as well as Dispose
           if (_currentJob.WriteCompleted(bytesTransferred))
            {
               _currentJob.Dispose();
              // There is no TryDequeue method in Queue<T>
              // But we can easily add it using extension method
              if (!_writeQueue.TryDequeue(out _currentJob))
              {
                  // We don't have set _currentJob to null
                  // because we'll achieve it via out parameter
                  // _currentJob = null;
                  return;
              }
           }

           // Storing current job for further work
           currentJob = _currentJob;
        }

        StartJob(currentJob);
    }
}

Lock-free is a optimization and like any other optimization you should measure performance first to make sure that you have an issue with your simple lock-based implementation and only if its true - use some lower level techniques like lock free. Performance and maintainability is a classical tradeoff and you should choose very carefully.

Upvotes: 0

ony
ony

Reputation: 13223

Consider using of CompareExchange with hypothesis about intended state transitions. No need to use ConcurrentQueue since now we are in control of our synchronization.

Updated to use state machine
Updated again to remove unneeded Interlocked.Exchange (for state assignment).

public class SomeClass
{
    private readonly Queue<ISocketWriterJob> _writeQueue = new Queue<ISocketWriterJob>();
    private ISocketWriterJob _currentJob;
    private enum State { Idle, Active, Enqueue, Dequeue };
    private State _state;

    public void Send(ISocketWriterJob job)
    {
        bool spin = true;
        while(spin)
        {
            switch(_state)
            {
            case State.Idle:
                if (Interlocked.CompareExchange(ref _state, State.Active, State.Idle) == State.Idle)
                {
                    spin = false;
                }
                // else consider new state
                break;
            case State.Active:
                if (Interlocked.CompareExchange(ref _state, State.Enqueue, State.Active) == State.Active)
                {
                    _writeQueue.Enqueue(job);
                    _state = State.Active;
                    return;
                }
                // else consider new state
                break;
            case State.Enqueue:
            case State.Dequeue:
                // spin to wait for new state
                Thread.Yield();
                break;
            }
        }

        _currentJob = job;
        _currentJob.Write(_writeArgs);

        // The job is invoked asynchronously here
    }

    private void HandleWriteCompleted(SocketError error, int bytesTransferred)
    {
        // error checks etc removed for this sample.

        if (_currentJob.WriteCompleted(bytesTransferred))
        {
            _currentJob.Dispose();

            bool spin = true;
            while(spin)
            {
                switch(_state)
                {
                case State.Active:
                    if (Interlocked.CompareExchange(ref _state, State.Dequeue, State.Active) == State.Active)
                    {
                        if (!_writeQueue.TryDequeue(out _currentJob))
                        {
                            // handle in state _currentJob = null;
                            _state = State.Idle;
                            return;
                        }
                        else
                        {
                            _state = State.Active;
                        }
                    }
                    // else consider new state
                    break;

                case State.Enqueue:
                    // spin to wait for new state
                    Thread.Yield();
                    break;

                // impossible states
                case State.Idle:
                case State.Dequeue:
                    break;
                }
            }
        }

        _logger.Debug(_writeArgs.GetHashCode() + ": writing more ");
        _currentJob.Write(_writeArgs);

        // the job is invoked asycnhronously here.
    }
}

Upvotes: 4

Rawling
Rawling

Reputation: 50114

At the moment the split between your producer and consumer is a little fuzzy; you have "produce a job into a queue or consume it immediately" and "consume a job from the queue or quit if there isn't one"; it would be clearer as "produce a job into a queue" and "consume a job from the queue (initially)" and "consume a job from the queue (once a job finishes").

The trick here is to use a BlockingCollection so you can wait for a job to appear:

BlockingCollection<ISocketWriterJob> _writeQueue =
         new BlockingCollection<ISocketWriterJob>();

Let threads calling Send literally just queue a job:

public void Send(ISocketWriterJob job)
{
    _writeQueue.Add(job);
}

Then have another thread that just consumes jobs.

public void StartConsumingJobs()
{
    // Get the first job or wait for one to be queued.
    _currentJob = _writeQueue.Take();

    // Start job
}

private void HandleWriteCompleted(SocketError error, int bytesTransferred)
{
    if (_currentJob.WriteCompleted(bytesTransferred))
    {
        _currentJob.Dispose();

        // Get next job, or wait for one to be queued.
        _currentJob = _writeQueue.Take();
    }

    _currentJob.Write(_writeArgs);

   // Start/continue job as before
}

Upvotes: 1

Related Questions