alex98101
alex98101

Reputation: 1

SocketAsyncEventArgs Send/Receive Order

I've been using SocketAsyncEventArgs for a project recently and I've come across issues where it appears that ReceiveAsync is occasionally getting data in a different order from what is being sent via SendAsync. Each block of data sent in the SendAsync method is maintained, but the blocks are not necessarily in the right order. Maybe I have an incorrect understanding of the SendAsync method, but I thought that especially using SocketType.Stream and ProtocolType.Tcp would ensure the order is maintained. I understand that the underlying process will inevitably break the message up and that ReceiveAsync will commonly read less than the buffer allocation. But I assumed that the send and receive streams would maintain order.

I carved out a test console program which shows the issue. It tries to run about 20 times using a different set of sockets and ports each time. On my laptop, it usually makes it through one time and then fails the second time; usually receiving a later block when it's expecting the second. From other testing, I know that expected block eventually does come, just out of sequence.

One caveat is that I was able to test it on a Windows 2008 remote server and had no issues. However, it has never come close to completing on my laptop. In fact, if I let the debug execution hang in the exception break for a while I've had it completely freeze my laptop more than once and had to do a hard reboot. This is my work laptop running on Windows 7, using VS2017. I'm not sure if it could be a factor, but it is running Symantec Endpoint Protection though I haven't found anything in the logs.

So my question is, do I have an incorrect view of how the SocketAsyncEventArgs operate? Or is my code a disaster (perhaps both)? Is it somehow unique to my laptop? (This last one makes me feel like I'm setting up for embarrassment like when you're new to programming and you think there must be something wrong with the compiler.)

using System;
using System.IO;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Threading.Tasks;

static class DumTest
{

    static void Main(string[] args)
    {
        for (int i = 9177; i < 9199; i++)
        {
            RunDum(i);
            //Thread.Sleep(350);
        }

        Console.WriteLine("all done.");
        Console.ReadLine();
    }

    static void RunDum(int port)
    {
        var dr = new DumReceiver(port);
        var ds = new DumSender(port);

        dr.Acception.Wait();

        ds.Connection.Wait();

        dr.Completion.Wait();

        ds.Completion.Wait();

        Console.WriteLine($"Completed {port}. " +
            $"sent: {ds.SegmentsSent} segments, received: {dr.SegmentsRead} segments");
    }
}

class DumReceiver
{
    private readonly SocketAsyncEventArgs eva = new SocketAsyncEventArgs();
    private readonly TaskCompletionSource<object> tcsAcc = new TaskCompletionSource<object>();

    private TaskCompletionSource<object> tcsRcv;
    private Socket socket;

    internal DumReceiver(int port)
    {
        this.eva.Completed += this.Received;

        var lstSock = new Socket(
            AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

        var localIP = Dns.GetHostEntry(Dns.GetHostName()).AddressList
            .First(i => i.AddressFamily == AddressFamily.InterNetwork);

        lstSock.Bind(new IPEndPoint(localIP, port));
        lstSock.Listen(1);

        var saea = new SocketAsyncEventArgs();
        saea.Completed += this.AcceptCompleted;
        lstSock.AcceptAsync(saea);
    }

    internal Task Acception => this.tcsAcc.Task;

    internal Task Completion { get; private set; }

    internal int SegmentsRead { get; private set; }

    private void AcceptCompleted(object sender, SocketAsyncEventArgs e)
    {
        if (e.SocketError == SocketError.Success)
        {
            this.socket = e.AcceptSocket;
            e.Dispose();
            try
            {
                this.Completion = this.ReceiveLupeAsync();
            }
            finally
            {
                this.tcsAcc.SetResult(null);
            }
        }
        else
        {
            this.tcsAcc.SetException(new SocketException((int)e.SocketError));
        }
    }

    private async Task ReceiveLupeAsync()
    {
        var buf = new byte[8196];
        byte bufSeg = 1;
        int pos = 0;

        while (true)
        {
            this.tcsRcv = new TaskCompletionSource<object>();
            this.eva.SetBuffer(buf, pos, 8196 - pos);
            if (this.socket.ReceiveAsync(this.eva))
            {
                await this.tcsRcv.Task.ConfigureAwait(false);
            }

            if (this.eva.SocketError != SocketError.Success)
            {
                throw new SocketException((int)eva.SocketError);
            }

            if (this.eva.BytesTransferred == 0)
            {
                if (pos != 0)
                {
                    throw new EndOfStreamException();
                }

                break;
            }

            pos += this.eva.BytesTransferred;
            if (pos == 8196)
            {
                pos = 0;
                for (int i = 0; i < 8196; i++)
                {
                    if (buf[i] != bufSeg)
                    {
                        var msg = $"Expected {bufSeg} but read {buf[i]} ({i} of 8196). " +
                            $"Last read: {this.eva.BytesTransferred}.";
                        Console.WriteLine(msg);
                        throw new Exception(msg);
                    }
                }

                this.SegmentsRead++;
                bufSeg = (byte)(this.SegmentsRead + 1);
            }
        }
    }

    private void Received(object s, SocketAsyncEventArgs e) => this.tcsRcv.SetResult(null);
}

class DumSender
{
    private readonly SocketAsyncEventArgs eva = new SocketAsyncEventArgs();
    private readonly Socket socket = new Socket(
        AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

    private readonly TaskCompletionSource<object> tcsCon = new TaskCompletionSource<object>();
    private TaskCompletionSource<object> tcsSnd;

    internal DumSender(int port)
    {
        this.eva.Completed += this.Sent;

        var saea = new SocketAsyncEventArgs();
        var localIP = Dns.GetHostEntry(Dns.GetHostName()).AddressList
            .First(i => i.AddressFamily == AddressFamily.InterNetwork);

        saea.RemoteEndPoint = new IPEndPoint(localIP, port);
        saea.Completed += this.ConnectionCompleted;
        this.socket.ConnectAsync(saea);
    }

    internal Task Connection => this.tcsCon.Task;

    internal Task Completion { get; private set; }

    internal int SegmentsSent { get; private set; }

    private void ConnectionCompleted(object sender, SocketAsyncEventArgs e)
    {
        if (e.SocketError == SocketError.Success)
        {
            e.Dispose();

            try
            {
                this.Completion = this.SendLupeAsync();
            }
            finally
            {
                this.tcsCon.SetResult(null);
            }
        }
        else
        {
            this.tcsCon.SetException(new SocketException((int)e.SocketError));
        }
    }

    private async Task SendLupeAsync()
    {
        var buf = new byte[8196];
        byte bufSeg = 1;

        while (true)
        {
            for (int i = 0; i < 8196; i++)
            {
                buf[i] = bufSeg;
            }

            this.tcsSnd = new TaskCompletionSource<object>();
            this.eva.SetBuffer(buf, 0, 8196);
            if (this.socket.SendAsync(this.eva))
            {
                await this.tcsSnd.Task.ConfigureAwait(false);
            }

            if (this.eva.SocketError != SocketError.Success)
            {
                throw new SocketException((int)this.eva.SocketError);
            }

            if (this.eva.BytesTransferred != 8196)
            {
                throw new SocketException();
            }

            if (++this.SegmentsSent == 299)
            {
                break;
            }

            bufSeg = (byte)(this.SegmentsSent + 1);
        }

        this.socket.Shutdown(SocketShutdown.Both);
    }

    private void Sent(object s, SocketAsyncEventArgs e) => this.tcsSnd.SetResult(null);
}

Upvotes: 0

Views: 1359

Answers (1)

acelent
acelent

Reputation: 8135

I believe the problem is in your code.

You must check the return of Socket's *Async methods that use SocketAsyncEventArgs. If they return false, them the SocketAsyncEventArgs.Completed event won't be raised, you must handle the result synchronously.

Reference documentation: SocketAsyncEventArgs Class. Search for willRaiseEvent.

In DumReceiver's constructor, you don't check AcceptAsync's result and you don't handle the case when it completes synchronously.

In DumSender's constructor, you don't check ConnectAsync's result and you don't handle the case when it completes synchronously.

On top of this, the SocketAsyncEventArgs.Completed event may be raised in some other thread, most probably an I/O thread from the ThreadPool.

Each time you assign to DumReceiver.tcsRcv and DumSender.tcsSnd without proper synchronization, you can't be sure that DumReceiver.Received and DumSender.Sent are using the latest TaskCompletionSource.

Actually, you could get a NullReferenceException on the first iteration.

You lack synchronization in:

  • DumReceiver, the fields tcsRcv and socket and the properties Completion and SegmentsRead

  • DumSender, the field tcsSnd and the properties Completion and SegmentsSent

I suggest you consider using a single SemaphoreSlim instead of creating a new TaskCompletionSource on each time you invoke ReceiveAsync and SendAsync. You'd initialize the semaphore to 0 in the constructor. If the *Async operation is pending, you'd await WaitAsync on the semaphore, and the Completed event would Release the semaphore.

This should be enough to get rid of the race conditions in the TaskCompletionSource fields. You'd still need proper synchronization on the other fields and properties. For instance, there's no reason why Completion can't be created in the constructors, and SegmentsRead and SegmentsSent could be read-only and refer to a field which would be accessed internally with one or more of the Interlocked methods (e.g. Interlocked.Increment or Interlocked.Add).

Upvotes: 0

Related Questions