Reputation: 969
I'm implementing simple data loader over HTTP, following tips from my previous question C# .NET Parallel I/O operation (with throttling), answered by Throttling asynchronous tasks.
I split loading and deserialization, assuming that one may be slower/faster than other. Also I want to throttle downloading, but don't want to throttle deserialization. Therefore I'm using two blocks and one buffer.
Unfortunately I'm facing problem that this pipeline sometimes processes less messages than consumed (I know from target server that I did exactly n
requests, but I end up with less responses).
My method looks like this (no error handling):
public async Task<IEnumerable<DummyData>> LoadAsync(IEnumerable<Uri> uris)
{
IList<DummyData> result;
using (var client = new HttpClient())
{
var buffer = new BufferBlock<DummyData>();
var downloader = new TransformBlock<Uri, string>(
async u => await client.GetStringAsync(u),
new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = _maxParallelism });
var deserializer =
new TransformBlock<string, DummyData>(
s => JsonConvert.DeserializeObject<DummyData>(s),
new ExecutionDataflowBlockOptions
{ MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded });
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
downloader.LinkTo(deserializer, linkOptions);
deserializer.LinkTo(buffer, linkOptions);
foreach (Uri uri in uris)
{
await downloader.SendAsync(uri);
}
downloader.Complete();
await downloader.Completion;
buffer.TryReceiveAll(out result);
}
return result;
}
So to be more specific, I have 100 URLs to load, but I get 90-99 responses. No error & server handled 100 requests. This happens randomly, most of the time code behaves correctly.
Upvotes: 1
Views: 445
Reputation: 43643
There are three issues with your code:
Awaiting for the completion of the first block of the pipeline (downloader
) instead of the last (buffer
).
Using the TryReceiveAll
method for retrieving the messages of the buffer
block. The correct way to retrieve all messages from an unlinked block without introducing race conditions is to use the methods OutputAvailableAsync
and TryReceive
in a nested loop. You can find examples here and here.
In case of timeout the HttpClient
throws an unexpected TaskCanceledException
, and the TPL Dataflow blocks happens to ignore exceptions of this type. The combination of these two unfortunate realities means that by default any timeout occurrence will remain unobserved. To fix this problem you could change your code like this:
var downloader = new TransformBlock<Uri, string>(async url =>
{
try
{
return await client.GetStringAsync(url);
}
catch (OperationCanceledException) { throw new TimeoutException(); }
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = _maxParallelism });
A fourth unrelated issue is the use of the MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
option for the deserializer
block. In the (hopefully unlikely) case that the deserializer
is slower than the downloader
, the deserializer
will start queuing more and more work on the ThreadPool
, keeping it permanently starved. This will not be good for the performance and the responsiveness of your application, or for the health of the system as a whole. In practice there is rarely a reason to configure a CPU-bound block with a MaxDegreeOfParallelism
larger than Environment.ProcessorCount
.
Upvotes: 2