Reputation: 155628
I'm attempting to write a client in C# for a proprietary TCP protocol connection (that sends/receives key+value pair messages). I'm wanting to use the async
/await
features of NetworkStream
so my program can read and write to the socket with a single thread (a la JavaScript) however I'm having problems with the way NetworkStream.ReadAsync
works.
Here's my program in outline:
public static async Task Main(String[] args)
{
using( TcpClient tcp = new TcpClient() )
{
await tcp.ConnectAsync( ... );
using( NetworkStream ns = tcp.GetStream() )
{
while( true )
{
await RunInnerAsync( ns );
}
}
}
}
private static readonly ConcurrentQueue<NameValueCollection> pendingMessagesToSend = new ConcurrentQueue<NameValueCollection>();
private static async Task RunInnerAsync( NetworkStream ns )
{
// 1. Send any pending messages.
// 2. Read any newly received messages.
// 1:
while( !pendingMessagesToSend.IsEmpty )
{
// ( foreach Message, send down the NetworkStream here )
}
// 2:
Byte[] buffer = new Byte[1024];
while( ns.DataAvailable )
{
Int32 bytesRead = await ns.ReadAsync( buffer, 0, buffer.Length );
if( bytesRead == 0 ) break;
// ( process contents of `buffer` here )
}
}
There's a problem here: if there is no data in the NetworkStream ns
to be read (DataAvailable == false
) then the while( true )
loop in Main
constantly runs and the CPU never idles - this is bad.
So if I change the code to remove the DataAvailable
check and simply always call ReadAsync
then the call effectively "blocks" until data is available - so if no data arrives then this client will never send any messages to the remote host. So I thought about adding a timeout of 500ms or so:
// 2:
Byte[] buffer = new Byte[1024];
while( ns.DataAvailable )
{
CancellationTokenSource cts = new CancellationTokenSource( 500 );
Task<Int32> readTask = ns.ReadAsync( buffer, 0, buffer.Length, cts.Token );
await readTask;
if( readTask.IsCancelled ) break;
// ( process contents of `buffer` here )
}
However, this does not work! Apparently the NetworkStream.ReadAsync
overload that accepts a CancellationToken
does not abort or stop when a cancellation is actually requested, it always ignores it (how is this not a bug?).
The QA I linked to suggests a workaround of simply closing the Socket
/NetworkStream
- which is inappropriate for me because I need to keep the connection alive, but only take a break from waiting for data to arrive and send some data instead.
One of the other answers suggests co-awaiting a Task.Delay
, like so:
// 2:
Byte[] buffer = new Byte[1024];
while( ns.DataAvailable )
{
Task maxReadTime = Task.Delay( 500 );
Task readTask = ns.ReadAsync( buffer, 0, buffer.Length );
await Task.WhenAny( maxReadTime, readTask );
if( maxReadTime.IsCompleted )
{
// what do I do here to cancel the still-pending ReadAsync operation?
}
}
...however while this does stop the program from waiting for an indefinite network read operation, it doesn't stop the read operation itself - so when my program finishes sending any pending messages it will call into ReadAsync
a second time while it's still waiting for data to arrive - and that means dealing with overlapped-IO and is not what I want at all.
I know when working with Socket
directly and its BeginReceive
/ EndReceive
methods you simply only ever call BeginReceive
from within EndReceive
- but how does one safely call BeginReceive
for the first time, especially in a loop - and how should those calls be modified when using the async/await
API instead?
Upvotes: 1
Views: 1396
Reputation: 155628
I got it working by getting the Task
from NetworkStream.ReadAsync
and storing it in a mutable local variable that is set to null
if there is no pending ReadAsync
operation, otherwise it's an valid Task
instance.
Unfortunately as async
methods cannot have ref
parameters I needed to move my logic into Main
from RunInnerAsync
.
Here's my solution:
private static readonly ConcurrentQueue<NameValueCollection> pendingMessagesToSend = new ConcurrentQueue<NameValueCollection>();
private static TaskCompletionSource<Object> queueTcs;
public static async Task Main(String[] args)
{
using( TcpClient tcp = new TcpClient() )
{
await tcp.ConnectAsync( ... );
using( NetworkStream ns = tcp.GetStream() )
{
Task<Int32> nsReadTask = null; // <-- this!
Byte[] buffer = new Byte[1024];
while( true )
{
if( nsReadTask == null )
{
nsReadTask = ns.ReadAsync( buffer, 0, buffer.Length );
}
if( queueTcs == null ) queueTcs = new TaskCompletionSource<Object>();
Task completedTask = await Task.WhenAny( nsReadTask, queueTcs.Task );
if( completedTask == nsReadTask )
{
while( ns.DataAvailable )
{
Int32 bytesRead = await ns.ReadAsync( buffer, 0, buffer.Length );
if( bytesRead == 0 ) break;
// ( process contents of `buffer` here )
}
}
else if( completedTask == queueTcs )
{
while( !pendingMessagesToSend.IsEmpty )
{
// ( foreach Message, send down the NetworkStream here )
}
}
}
}
}
}
And whenever pendingMessagesToSend
is modified, the queueTcs
is instantiated if null and has SetResult(null)
called to un-await the Task completedTask = await Task.WhenAny( nsReadTask, queueTcs.Task );
line.
Since building this solution I don't feel as though I'm using TaskCompletionSource
appropriately, see this QA: Is it acceptable to use TaskCompletionSource as a WaitHandle substitute?
Upvotes: 1