Chris
Chris

Reputation: 1283

Queue calls to asynchronous method BeginSend of a socket

I need to queue BeginSend calls to a socket and I need them to be executed chronologically. To do this I used a semaphore to signal when the callback function is clear to go.
Most of the time it works because each asynchronous callback is executed on a separated thread, but occasionally the same thread used in current callback is used in the newly asynchronous call. When this happens, that thread is locked waiting for the semaphore to be released, but because the same thread that was supposed to clear the semaphore, is waiting for it to be cleared, the thread is locked forever.

To illustrate the problem here is a test code:

static Semaphore semaphore = new Semaphore(1, 1);
static IList<byte[]> buffer = new List<byte[]>();
static void Main()
{
    Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
    socket.Connect(new IPEndPoint(new IPAddress(new byte[] { 192, 168, 1, 8 }), 123));
    while (true) // data feed
    {
        lock (buffer)
        {
            buffer.Add(new byte[1460]);
            if (buffer.Count == 1)
                socket.BeginSend(buffer[0], 0, 1460, 0, new AsyncCallback(SendCallback), socket); // calls BeginSend if the buffer was empty before 
        }
    }
}

static void SendCallback(IAsyncResult ar)
{
    Console.WriteLine("in " + Thread.CurrentThread.ManagedThreadId);
    semaphore.WaitOne();
    Socket socket = (Socket)ar.AsyncState;
    lock(buffer)
    {
        buffer.RemoveAt(0); // removes data that was sent
        if (buffer.Count > 0) // if there is more data to send calls BeginSend again
            socket.BeginSend(buffer[0], 0, 1460, 0, new AsyncCallback(SendCallback), socket);
    }
    semaphore.Release();
    Console.WriteLine("out " + Thread.CurrentThread.ManagedThreadId);
}

and here is the output:
enter image description here
Because thread 10 was transferred to a new callback, without giving a chance for the previous callback to exit and clear the semaphore, the thread is locked forever.

How should I approach this problem?

Upvotes: 2

Views: 3367

Answers (1)

lboshuizen
lboshuizen

Reputation: 2786

switch to tasks:

Nice msdn article Tasks and the APM Pattern

public Task<int> SendAsync(Socket socket, byte[] buffer, int offset, int size, SocketFlags flags)
{
   var result = socket.BeginSend(buffer, offset, size, flags, _ => { }, socket);
   return Task.Factory.FromAsync(result,(r) => socket.EndSend(r));
}

things get a little easier now:

Using a default BlockingCollection<> as a concurrent queue. It's thread save and removes the explicit lock on the list

static BlockingCollection<byte[]> buffer = new BlockingCollection<byte[]>();

public async void Main()
{
   Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
   socket.Connect(new IPEndPoint(new IPAddress(new byte[] { 192, 168, 1, 8 }), 123));
   while (!buffer.IsCompleted)
   {
      var data = buffer.Take();
      await SendAsync(socket, data, 0, data.Length, 0);
   }
   Console.ReadLine();            
}

Non blocking send and order is maintained while not needing the semaphore.

Upvotes: 3

Related Questions