Dai
Dai

Reputation: 155628

How do I correctly read and write with a Socket/NetworkStream using C# async/await in a single thread?

I'm attempting to write a client in C# for a proprietary TCP protocol connection (that sends/receives key+value pair messages). I'm wanting to use the async/await features of NetworkStream so my program can read and write to the socket with a single thread (a la JavaScript) however I'm having problems with the way NetworkStream.ReadAsync works.

Here's my program in outline:

public static async Task Main(String[] args)
{
    using( TcpClient tcp = new TcpClient() )
    {
        await tcp.ConnectAsync( ... );
        using( NetworkStream ns = tcp.GetStream() )
        {
            while( true )
            {
                await RunInnerAsync( ns );
            }
        }
    }
}

private static readonly ConcurrentQueue<NameValueCollection> pendingMessagesToSend = new ConcurrentQueue<NameValueCollection>();

private static async Task RunInnerAsync( NetworkStream ns )
{
    // 1. Send any pending messages.
    // 2. Read any newly received messages.

    // 1:
    while( !pendingMessagesToSend.IsEmpty )
    {
        // ( foreach Message, send down the NetworkStream here )
    }

    // 2:
    Byte[] buffer = new Byte[1024];
    while( ns.DataAvailable )
    {
        Int32 bytesRead = await ns.ReadAsync( buffer, 0, buffer.Length );
        if( bytesRead == 0 ) break;
        // ( process contents of `buffer` here )
    } 
}

There's a problem here: if there is no data in the NetworkStream ns to be read (DataAvailable == false) then the while( true ) loop in Main constantly runs and the CPU never idles - this is bad.

So if I change the code to remove the DataAvailable check and simply always call ReadAsync then the call effectively "blocks" until data is available - so if no data arrives then this client will never send any messages to the remote host. So I thought about adding a timeout of 500ms or so:

// 2:
Byte[] buffer = new Byte[1024];
while( ns.DataAvailable )
{
    CancellationTokenSource cts = new CancellationTokenSource( 500 );
    Task<Int32> readTask = ns.ReadAsync( buffer, 0, buffer.Length, cts.Token );
    await readTask;
    if( readTask.IsCancelled ) break;
    // ( process contents of `buffer` here )
}

However, this does not work! Apparently the NetworkStream.ReadAsync overload that accepts a CancellationToken does not abort or stop when a cancellation is actually requested, it always ignores it (how is this not a bug?).

The QA I linked to suggests a workaround of simply closing the Socket/NetworkStream - which is inappropriate for me because I need to keep the connection alive, but only take a break from waiting for data to arrive and send some data instead.

One of the other answers suggests co-awaiting a Task.Delay, like so:

// 2:
Byte[] buffer = new Byte[1024];
while( ns.DataAvailable )
{
    Task maxReadTime = Task.Delay( 500 );
    Task readTask = ns.ReadAsync( buffer, 0, buffer.Length );
    await Task.WhenAny( maxReadTime, readTask );
    if( maxReadTime.IsCompleted )
    {
        // what do I do here to cancel the still-pending ReadAsync operation?
    }
}

...however while this does stop the program from waiting for an indefinite network read operation, it doesn't stop the read operation itself - so when my program finishes sending any pending messages it will call into ReadAsync a second time while it's still waiting for data to arrive - and that means dealing with overlapped-IO and is not what I want at all.

I know when working with Socket directly and its BeginReceive / EndReceive methods you simply only ever call BeginReceive from within EndReceive - but how does one safely call BeginReceive for the first time, especially in a loop - and how should those calls be modified when using the async/await API instead?

Upvotes: 1

Views: 1396

Answers (1)

Dai
Dai

Reputation: 155628

I got it working by getting the Task from NetworkStream.ReadAsync and storing it in a mutable local variable that is set to null if there is no pending ReadAsync operation, otherwise it's an valid Task instance.

Unfortunately as async methods cannot have ref parameters I needed to move my logic into Main from RunInnerAsync.

Here's my solution:

private static readonly ConcurrentQueue<NameValueCollection> pendingMessagesToSend = new ConcurrentQueue<NameValueCollection>();
private static TaskCompletionSource<Object> queueTcs;

public static async Task Main(String[] args)
{
    using( TcpClient tcp = new TcpClient() )
    {
        await tcp.ConnectAsync( ... );
        using( NetworkStream ns = tcp.GetStream() )
        {
            Task<Int32> nsReadTask = null; // <-- this!
            Byte[] buffer = new Byte[1024];

            while( true )
            {
                if( nsReadTask == null )
                {
                    nsReadTask = ns.ReadAsync( buffer, 0, buffer.Length );
                }

                if( queueTcs == null ) queueTcs = new TaskCompletionSource<Object>();

                Task completedTask = await Task.WhenAny( nsReadTask, queueTcs.Task );
                if( completedTask == nsReadTask )
                {
                    while( ns.DataAvailable )
                    {
                        Int32 bytesRead = await ns.ReadAsync( buffer, 0, buffer.Length );
                        if( bytesRead == 0 ) break;
                        // ( process contents of `buffer` here )
                    } 
                }
                else if( completedTask == queueTcs )
                {
                    while( !pendingMessagesToSend.IsEmpty )
                    {
                        // ( foreach Message, send down the NetworkStream here )
                    }
                }
            }
        }
    }
}

And whenever pendingMessagesToSend is modified, the queueTcs is instantiated if null and has SetResult(null) called to un-await the Task completedTask = await Task.WhenAny( nsReadTask, queueTcs.Task ); line.

Since building this solution I don't feel as though I'm using TaskCompletionSource appropriately, see this QA: Is it acceptable to use TaskCompletionSource as a WaitHandle substitute?

Upvotes: 1

Related Questions