Reputation: 155608
My code handles a TCP connection to a remote host, with a ConcurrentQueue
to store outgoing messages. It's intended to run in a single thread. The lifetime of the connection is contained within RunAsync
while a separate object contains the "public state" of the connection:
class PublicState
{
internal readonly ConcurrentQueue<Message> OutgoingMessageQueue = new ConcurrentQueue<Message>();
internal TaskCompletionSource<Object> OutgoingMessageTcs = null;
internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();
public void EnqueueMessages(IEnumerable<Message> messages)
{
foreach( Message m in messages ) this.OutgoingMessageQueue.Enqueue( m);
if( this.OutgoingMessageTcs == null ) this.OutgoingMessageTcs = new TaskCompletionSource<Object>();
this.OutgoingMessageTcs.SetResult( null );
}
}
static async Task RunAsync(IPEndPoint endPoint, PublicState state)
{
using( TcpClient tcp = new TcpClient() )
{
await tcp.ConnectAsync( endPoint.Address, endPoint.Port ).ConfigureAwait(false);
Byte[] reusableBuffer = new Byte[ 4096 ];
using( NetworkStream ns = tcp.GetStream() )
{
state.ConnectedTcs.SetResult( null );
Task<Int32> nsReadTask = null;
while( tcp.Connected )
{
if( !state.writeQueue.IsEmpty )
{
await WriteMessagesAsync( ... ).ConfigureAwait( false );
}
if( ns.DataAvailable )
{
await ReadMessagesAsync( ... ).ConfigureAwait( false );
}
// Wait for new data to arrive from remote host or for new messages to send:
if( state.OutgoingMessageTcs == null ) state.OutgoingMessageTcs = new TaskCompletionSource<Object>();
if( nsReadTask == null ) nsReadTask = ns.ReadAsync( reusableBuffer, 0, 0 ).ConfigureAwait( false );
Task c = await Task.WhenAny( state.OutgoingMessageTcs, nsReadTask ).ConfigureAwait( false );
if( c == state.OutgoingMessageTcs.Task ) state.OutgoingMessageTcs = null;
else if( c == nsReadTask ) nsReadTask = null;
}
}
}
}
Used like this:
public async Task Main(String[] args)
{
PublicState state = new PublicState();
Task clientTask = Client.RunAsync( new IPEndPoint(args[0]), state );
await state.ConnectedTcs.Task; // awaits until TCP connection is established
state.EnqueueMessage( new Message("foo") );
state.EnqueueMessage( new Message("bar") );
state.EnqueueMessage( new Message("baz") );
await clientTask; // awaits until the TCP connection is closed
}
This code works, but I don't like it: it feels like I'm using TaskCompletionSource
which is meant to represent an actual Task or some kind of background operation, whereas I'm really using TaskCompletionSource
as a kind of cheap EventWaitHandle
. I'm not using EventWaitHandle
because it's IDisposable
(I don't want to risk leaking native resources) and it lacks a WaitAsync
or WaitOneAsync
method. I could use SemaphoreSlim
(which is awaitable, but wraps an EventWaitHandle
) but my code doesn't really represent a good use of a semaphore.
Is my use of TaskCompletionSource<T>
acceptable, or is there a better way to "un-await" execution in RunAsync
when an item is added to OutgoingMessageQueue
?
Another reason I feel it's "wrong" is that TaskCompletionSource<T>
can only be used once, then it needs replacing. I'm keen to avoid extraneous allocations.
Upvotes: 5
Views: 1295
Reputation: 111
To back up what others have mentioned, it does look like Microsoft's documentation mentions and even encourages developing a Semaphore class which is written on top of the Task objects here:
You can also build an asynchronous semaphore that does not rely on wait handles and instead works completely with tasks. To do this, you can use techniques such as those discussed in Consuming the Task-based Asynchronous Pattern for building data structures on top of Task.
This does make me wonder why such a prepackaged class does not already exist, but it certainly shows that this is fine.
Upvotes: 1
Reputation: 101613
If I understood you correctly - TPL BufferBlock
might be what you need. Analog of current Enqueue
is Post
, and you can receive next value via ReceiveAsync
extension method.
So with BufferBlock
your code becomes something like this:
class PublicState {
internal readonly BufferBlock<Message> OutgoingMessageQueue = new BufferBlock<Message>();
internal readonly TaskCompletionSource<Object> ConnectedTcs = new TaskCompletionSource<Object>();
public void EnqueueMessage(Message message) {
this.OutgoingMessageQueue.Post(message);
}
}
static async Task RunAsync(IPEndPoint endPoint, PublicState state) {
using (TcpClient tcp = new TcpClient()) {
await tcp.ConnectAsync(endPoint.Address, endPoint.Port).ConfigureAwait(false);
Byte[] reusableBuffer = new Byte[4096];
using (NetworkStream ns = tcp.GetStream()) {
state.ConnectedTcs.SetResult(null);
Task<Int32> nsReadTask = null;
Task<Message> newMessageTask = null;
while (tcp.Connected) {
// Wait for new data to arrive from remote host or for new messages to send:
if (nsReadTask == null)
nsReadTask = ns.ReadAsync(reusableBuffer, 0, 0);
if (newMessageTask == null)
newMessageTask = state.OutgoingMessageQueue.ReceiveAsync();
var completed = await Task.WhenAny(nsReadTask, newMessageTask).ConfigureAwait(false);
if (completed == newMessageTask) {
var result = await newMessageTask;
// do stuff
newMessageTask = null;
}
else {
var bytesRead = await nsReadTask;
nsReadTask = null;
}
}
}
}
}
As a bonus, this version is (I think) thread-safe, while your current version is not, because you are doing non-thread-safe things with OutgoingMessageTcs
from potentially multiple threads (thread of RunAsync
and thread of EnqueueMessages
caller).
If for some reason you don't like BufferBlock
- you can use AsyncCollection
from Nito.AsyncEx
nuget package in exactly the same way. Initialization becomes:
internal readonly AsyncCollection<Message> OutgoingMessageQueue = new AsyncCollection<Message>(new ConcurrentQueue<Message>());
And fetching:
if (newMessageTask == null)
newMessageTask = state.OutgoingMessageQueue.TakeAsync();
Upvotes: 3