user2635088
user2635088

Reputation: 1618

TPL FromAsync Task keeps getting cancelled

I'm sending 10 messages from a client to a server in a loop, and then responses back from the server to the client using TPL's FromAsync (targeting .NET 4.0)

After the last message is sent from the client, I'm trying to measure the total time taken until the final response is received. To do this, I call Task.WaitAll but I get an AggregateException who's inner exception says "A task was cancelled" for all the tasks. The server is receiving the requests.

Code:

private static double MeasureAsyncTPL(List<string> stringList)
    {
        var stopwatch = new Stopwatch();
        stopwatch.Start();

        var tasks = new Task[stringList.Count];
        for (int i = 0; i < stringList.Count; i++)
        {
            string source = stringList[i];
            tasks[i] = SendMessageToServerAsync(source);
            //SendMessageToServerAsync(source);
        }
        Task.WaitAll(tasks);


        stopwatch.Stop();            
        return stopwatch.Elapsed.TotalMilliseconds; 
    }

        static Task SendMessageToServerAsync(string message)
    {
        Task task;
        var byteArray = MessageToByteArray(message, Encoding.UTF8);
        using (var tcpClient = new TcpClient())
        {
            tcpClient.Connect("127.0.0.1", 5000);
            using (var networkStream = tcpClient.GetStream())
            {
                task = Write(networkStream,byteArray,0);
                Task continuation = task.ContinueWith(ant => Read(ant, networkStream), TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.AttachedToParent);

                return continuation;
            }
        }
    }

        private static Task Write(NetworkStream stream, byte[] buffer, int offset)
    {
        return Task.Factory.FromAsync(stream.BeginWrite, stream.EndWrite, buffer, offset, buffer.Length, null);
    }

    private static Task Read(Task write, NetworkStream stream)
    {

        byte[] data = new byte[50];
        return Task<int>.Factory.FromAsync(stream.BeginRead, stream.EndRead, data, 0, data.Length, null);

    }

The exception is being picked up inside MeasureAsyncTPL on the line Task.WaitAll(tasks). How can I fix this exception?

Second question - is FromAsync inside my Read method guaranteed to return the entire message from the server? Or do I need to call it multiple times while keeping track of a the size of the byte array that the server sent?

Upvotes: 0

Views: 249

Answers (1)

Yacoub Massad
Yacoub Massad

Reputation: 27871

You are disposing of the networkStream and tcpClient before the write and read operations start or have time to work.

With .NET 4.0, you need to do something as complex as this (this is something to start with, it needs more work):

var byteArray = MessageToByteArray(message, Encoding.UTF8);

var tcpClient = new TcpClient();

NetworkStream networkStream = null;

try
{
    tcpClient.Connect("127.0.0.1", 5000);

    networkStream = tcpClient.GetStream();
}
catch
{
    ((IDisposable)tcpClient).Dispose();
    throw;
}

byte[] read_buffer = new byte[50];

Action dispose_action = () =>
{
    ((IDisposable)tcpClient).Dispose();
    networkStream.Dispose();
};

var write_task = networkStream.WriteAsync(byteArray, 0, byteArray.Length);

write_task
    .ContinueWith(wt =>
    {
        var read_task = networkStream.ReadAsync(read_buffer, 0, read_buffer.Length);


        read_task.ContinueWith(rt =>
        {
            //See if rt task is successful and if so, process read data here

            dispose_action();
        });

    },
    TaskContinuationOptions.OnlyOnRanToCompletion);

write_task.ContinueWith(wt => dispose_action, TaskContinuationOptions.NotOnRanToCompletion);

The main idea of the code above is that you don't dispose of the TCP client and the network stream object until the read/write operations are done.

Imagine how complex this code would become if you want to read multiple times in some kind of loop.

With .NET 4.5, you can use async and await to do something (simpler) like this:

static async Task SendMessageToServerAsync(string message)
{
    var byteArray = MessageToByteArray(message, Encoding.UTF8);
    using (var tcpClient = new TcpClient())
    {
        tcpClient.Connect("127.0.0.1", 5000);
        using (var networkStream = tcpClient.GetStream())
        {
            await networkStream.WriteAsync(byteArray, 0, byteArray.Length);

            byte[] read_buffer = new byte[50];

            int read = await networkStream.ReadAsync(read_buffer, 0, read_buffer.Length);

            // do something with the read_buffer
        }
    }
}

Upvotes: 2

Related Questions