Reputation: 2830
I have a class handling my network connection with the following methods:
SendNetworkMessageAsync
: This method should send a on the networkstream and if the respective arguments are set wait for an response until a timeout occurs. The classes NetworkMessageModel
and NetworkCommandModel
are helper classes, which handle conversions, msg length and so on. When these methods are called, the corresponding socket is already created and connected to the server.
public async Task<NetworkCommandModel> SendNetworkMessageAsync(NetworkMessageModel msg, bool awaitAnswer, int timeout = 500)
{
var timeoutTask = Task.Delay(timeout);
_ = networkStream.WriteAsync(msg.EncodedMessageHeader, 0, msg.EncodedMessageHeader.Length);
_ = networkStream.WriteAsync(msg.EncodedMessageBody, 0, msg.EncodedMessageBody.Length);
if (!awaitAnswer) return null;
var taskReseiveResponse = ReceiveNetworkMessageAsync();
if (await Task.WhenAny(taskReseiveResponse, timeoutTask) == taskReseiveResponse)
{
return NetworkCommandModel.FromStream(taskReseiveResponse.Result.EncodedMessageBody);
}
else
{
return new NetworkCommandModel(Util.NetworkCommandTypeEnum.COMMANDFAILED, "");
}
}
ReceiveNetworkMessageAsync
and ReadAsync
should handle everything concerning receiving messages from the networkstream.
private async Task<NetworkMessageModel> ReceiveNetworkMessageAsync()
{
var headerbytes = await ReadAsync(4);
int bodyLength = IPAddress.NetworkToHostOrder(BitConverter.ToInt32(headerbytes, 0));
var bodybytes = await ReadAsync(bodyLength);
return new NetworkMessageModel(bodybytes, headerbytes);
}
private async Task<byte[]> ReadAsync(int bytesToRead)
{
var buffer = new byte[bytesToRead];
var bytesRead = 0;
while (bytesRead < bytesToRead)
{
var bytesReceived = await networkStream.ReadAsync(buffer, bytesRead, (bytesToRead - bytesRead)).ConfigureAwait(false);
if (bytesReceived == 0)
{
throw new Exception("Socket Closed"); //TODO: Handle unexpected Socket closing error
}
bytesRead += bytesReceived;
}
return buffer;
}
This works fine for the most part. However, the whole setup will fail, if these methods are called simultaneously from different tasks.
For example: I have implemented a "Connection Check", which is a Task, that will periodically send a message to the server and awaits a response within a timeout. If I am receiving a long message from the server and my Connection Check Task calls ReceiveNetworkMessageAsync()
at the same time, the response will be a mix of the two expected answers.
Can I make sure, that the calls to the NetworkStream
are called one after the other?
Upvotes: 1
Views: 901
Reputation: 456607
I'm assuming that the TCP/IP protocol does not have a command/response identifier that you can use to distinguish the responses. If the protocol does have an identifier, then the normal solution is to have a dictionary of outstanding commands and look up the associated command by identifier when a response is received. But if the protocol doesn't have the identifier, your code can only send one command at a time until a response is received. I'm assuming the protocol does not have this identifier for the remainder of this answer.
There's a couple of ways to handle this. The more robust and complex approach is to create a queue of waiters. Each call to the higher-level API would enqueue a waiter into that queue. The waiter will include a TaskCompletionSource<T>
that is eventually used to complete the task returned from the high-level API. Then you'd have a separate "runner" task that just processes that queue, doing a write followed by a read followed by completing that TaskCompletionSource<T>
.
Doable, but kind of involved.
It's also possible that a simpler approach may work for what you need. Each of your higher-level API calls just needs to lock the socket until its response arrives. Then additional commands will queue up when the socket is already busy. SemaphoreSlim
is an asynchronous-compatible mutual exclusion primitive that would work for this. The implementation is simpler than building your own queue, but you also lose some nice behavior like FIFO (or prioritized requests, if desired).
The simpler mutual-exclusion approach looks like:
private readonly SemaphoreSlim _mutex = new SemaphoreSlim(1);
public async Task<NetworkCommandModel> SendNetworkMessageAsync(NetworkMessageModel msg, bool awaitAnswer, int timeout = 500)
{
await _mutex.WaitAsync();
try
{
... // existing code
}
finally
{
_mutex.Release();
}
}
Other notes:
await
ed. At the very least, this lets you see socket errors.Upvotes: 4