Zdeněk
Zdeněk

Reputation: 969

TPL DataFlow not processing all messages

I'm implementing simple data loader over HTTP, following tips from my previous question C# .NET Parallel I/O operation (with throttling), answered by Throttling asynchronous tasks.

I split loading and deserialization, assuming that one may be slower/faster than other. Also I want to throttle downloading, but don't want to throttle deserialization. Therefore I'm using two blocks and one buffer.

Unfortunately I'm facing problem that this pipeline sometimes processes less messages than consumed (I know from target server that I did exactly n requests, but I end up with less responses).

My method looks like this (no error handling):

public async Task<IEnumerable<DummyData>> LoadAsync(IEnumerable<Uri> uris)
{
    IList<DummyData> result;
    using (var client = new HttpClient())
    {
        var buffer = new BufferBlock<DummyData>();

        var downloader = new TransformBlock<Uri, string>(
            async u => await client.GetStringAsync(u),
            new ExecutionDataflowBlockOptions
            { MaxDegreeOfParallelism = _maxParallelism });

        var deserializer =
            new TransformBlock<string, DummyData>(
                s => JsonConvert.DeserializeObject<DummyData>(s),
                new ExecutionDataflowBlockOptions
                { MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });

        var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };

        downloader.LinkTo(deserializer, linkOptions);
        deserializer.LinkTo(buffer, linkOptions);

        foreach (Uri uri in uris)
        {
            await downloader.SendAsync(uri);
        }

        downloader.Complete();
        await downloader.Completion;

        buffer.TryReceiveAll(out result);
    }

    return result;
}

So to be more specific, I have 100 URLs to load, but I get 90-99 responses. No error & server handled 100 requests. This happens randomly, most of the time code behaves correctly.

Upvotes: 1

Views: 445

Answers (1)

Theodor Zoulias
Theodor Zoulias

Reputation: 43643

There are three issues with your code:

  1. Awaiting for the completion of the first block of the pipeline (downloader) instead of the last (buffer).

  2. Using the TryReceiveAll method for retrieving the messages of the buffer block. The correct way to retrieve all messages from an unlinked block without introducing race conditions is to use the methods OutputAvailableAsync and TryReceive in a nested loop. You can find examples here and here.

  3. In case of timeout the HttpClient throws an unexpected TaskCanceledException, and the TPL Dataflow blocks happens to ignore exceptions of this type. The combination of these two unfortunate realities means that by default any timeout occurrence will remain unobserved. To fix this problem you could change your code like this:

var downloader = new TransformBlock<Uri, string>(async url =>
{
    try
    {
        return await client.GetStringAsync(url);
    }
    catch (OperationCanceledException) { throw new TimeoutException(); }
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = _maxParallelism });

A fourth unrelated issue is the use of the MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded option for the deserializer block. In the (hopefully unlikely) case that the deserializer is slower than the downloader, the deserializer will start queuing more and more work on the ThreadPool, keeping it permanently starved. This will not be good for the performance and the responsiveness of your application, or for the health of the system as a whole. In practice there is rarely a reason to configure a CPU-bound block with a MaxDegreeOfParallelism larger than Environment.ProcessorCount.

Upvotes: 2

Related Questions