Reputation: 9906
I am building a generic URI retrieval system. Essentially there's a generic class Retriever<T>
and it maintains a queue of URIs to be retrieved. It has a separate thread that handles that queue as fast as it can. An example of a type of URI, as indicated in the question title, is HTTP type URIs.
The problem is, when I get down to requesting that the resource be retrieved, via an abstract method T RetrieveResource(Uri location)
, it slow down due to a lack of asynchrony.
Changing the return type of RetrieveResource
to Task<T>
was my first thought. However, that seems to make tasks pile up and cause lots of problems when we have thousands of outstanding tasks. It appears to create many actual threads instead of utilizing the thread pool. I imagine this just slows everything down because there are too many things going on at once, so nothing individually is making significant progress.
It's expected that we will have a large number of queued items to retrieve and that they cannot be handled as fast as they are enqueued. There is an opportunity for the system to catch up, over time; but it's definitely not quick.
I've also thought about instead of maintaining a queue and a thread to handle it... to just queue a work item on the ThreadPool
. However, I'm not sure that this is ideal if say I need to shut the system down before all work items are handled or later want to allow for prioritization or something.
We also know that retrieving a resource is a time consuming process (0.250 - 5 seconds), but not necessarily a resource intense process. We are fine parallelizing this out to hundreds of requests.
Our requirements are:
BlockingCollection
is useful here).Is there a good way to parallelize this without introducing unnecessary complexity?
Below is some existing code we have, as an example.
public abstract class Retriever<T> : IRetriever<T>, IDisposable
{
private readonly Thread worker;
private readonly BlockingCollection<Uri> pending;
private volatile int isStarted;
private volatile int isDisposing;
public event EventHandler<RetrievalEventArgs<T>> Retrieved;
protected Retriever()
{
this.worker = new Thread(this.RetrieveResources);
this.pending = new BlockingCollection<Uri>(new ConcurrentQueue<Uri>());
this.isStarted = 0;
this.isDisposing = 0;
}
~Retriever()
{
this.Dispose(false);
}
private void RetrieveResources()
{
while (this.isDisposing == 0)
{
while (this.isStarted == 0)
{
Monitor.Wait(this.pending);
}
Uri location = this.pending.Take();
// This is what needs to be concurrently done.
// In this example, it's synchronous, but just on a separate thread.
T result = this.RetrieveResource(location);
// At this point, we would fire our event with the retrieved data
}
}
protected abstract T RetrieveResource(Uri location);
protected void Dispose(bool disposing)
{
if (Interlocked.CompareExchange(ref this.isDisposing, 1, 0) == 1)
{
return;
}
if (disposing)
{
this.pending.CompleteAdding();
this.worker.Join();
}
}
public void Add(Uri uri)
{
try
{
this.pending.Add(uri);
}
catch (InvalidOperationException)
{
return;
}
}
public void AddRange(IEnumerable<Uri> uris)
{
foreach (Uri uri in uris)
{
try
{
this.pending.Add(uri);
}
catch (InvalidOperationException)
{
return;
}
}
}
public void Start()
{
if (Interlocked.CompareExchange(ref this.isStarted, 1, 0) == 1)
{
throw new InvalidOperationException("The retriever is already started.");
}
if (this.worker.ThreadState == ThreadState.Unstarted)
{
this.worker.Start();
}
Monitor.Pulse(this.pending);
}
public void Stop()
{
if (Interlocked.CompareExchange(ref this.isStarted, 0, 1) == 0)
{
throw new InvalidOperationException("The retriever is already stopped.");
}
}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
}
To build on the example above... a solution to this that I think adds too much complexity or rather, weird code... would be this.
private void RetrieveResources()
{
while (this.isDisposing == 0)
{
while (this.isStarted == 0)
{
Monitor.Wait(this.pending);
}
Uri location = this.pending.Take();
Task<T> task = new Task<T>((state) =>
{
return this.RetrieveResource(state as Uri);
}, location);
task.ContinueWith((t) =>
{
T result = t.Result;
RetrievalEventArgs<T> args = new RetrievalEventArgs<T>(location, result);
EventHandler<RetrievalEventArgs<T>> callback = this.Retrieved;
if (!Object.ReferenceEquals(callback, null))
{
callback(this, args);
}
});
task.Start();
}
}
Upvotes: 2
Views: 262
Reputation: 9906
I've come up with a pretty good solution I think. I abstracted both the method a resource is retrieved and the result's representation. This allows support for retrieval of arbitrary URIs with arbitrary results; kind of like some URI driven "ORM".
It supports variable concurrency levels. The other day when I posted the question, I was forgetting that asynchrony and concurrency are quite different and that all I was achieving with tasks was asynchrony and jamming up the task scheduler because what I really wanted was concurrency.
I added in cancellation because it seemed like a good idea to have start/stop capabilities.
public abstract class Retriever<T> : IRetriever<T>
{
private readonly object locker;
private readonly BlockingCollection<Uri> pending;
private readonly Thread[] threads;
private CancellationTokenSource cancellation;
private volatile int isStarted;
private volatile int isDisposing;
public event EventHandler<RetrieverEventArgs<T>> Retrieved;
protected Retriever(int concurrency)
{
if (concurrency <= 0)
{
throw new ArgumentOutOfRangeException("concurrency", "The specified concurrency level must be greater than zero.");
}
this.locker = new object();
this.pending = new BlockingCollection<Uri>(new ConcurrentQueue<Uri>());
this.threads = new Thread[concurrency];
this.cancellation = new CancellationTokenSource();
this.isStarted = 0;
this.isDisposing = 0;
this.InitializeThreads();
}
~Retriever()
{
this.Dispose(false);
}
private void InitializeThreads()
{
for (int i = 0; i < this.threads.Length; i++)
{
Thread thread = new Thread(this.ProcessQueue)
{
IsBackground = true
};
this.threads[i] = thread;
}
}
private void StartThreads()
{
foreach (Thread thread in this.threads)
{
if (thread.ThreadState == ThreadState.Unstarted)
{
thread.Start();
}
}
}
private void CancelOperations(bool reset)
{
this.cancellation.Cancel();
this.cancellation.Dispose();
if (reset)
{
this.cancellation = new CancellationTokenSource();
}
}
private void WaitForThreadsToExit()
{
foreach (Thread thread in this.threads)
{
thread.Join();
}
}
private void ProcessQueue()
{
while (this.isDisposing == 0)
{
while (this.isStarted == 0)
{
Monitor.Wait(this.locker);
}
Uri location;
try
{
location = this.pending.Take(this.cancellation.Token);
}
catch (OperationCanceledException)
{
continue;
}
T data;
try
{
data = this.Retrieve(location, this.cancellation.Token);
}
catch (OperationCanceledException)
{
continue;
}
RetrieverEventArgs<T> args = new RetrieverEventArgs<T>(location, data);
EventHandler<RetrieverEventArgs<T>> callback = this.Retrieved;
if (!Object.ReferenceEquals(callback, null))
{
callback(this, args);
}
}
}
private void ThowIfDisposed()
{
if (this.isDisposing == 1)
{
throw new ObjectDisposedException("Retriever");
}
}
protected abstract T Retrieve(Uri location, CancellationToken token);
protected virtual void Dispose(bool disposing)
{
if (Interlocked.CompareExchange(ref this.isDisposing, 1, 0) == 1)
{
return;
}
if (disposing)
{
this.CancelOperations(false);
this.WaitForThreadsToExit();
this.pending.Dispose();
}
}
public void Start()
{
this.ThowIfDisposed();
if (Interlocked.CompareExchange(ref this.isStarted, 1, 0) == 1)
{
throw new InvalidOperationException("The retriever is already started.");
}
Monitor.PulseAll(this.locker);
this.StartThreads();
}
public void Add(Uri location)
{
this.pending.Add(location);
}
public void Stop()
{
this.ThowIfDisposed();
if (Interlocked.CompareExchange(ref this.isStarted, 0, 1) == 0)
{
throw new InvalidOperationException("The retriever is already stopped.");
}
this.CancelOperations(true);
}
public void Dispose()
{
this.Dispose(true);
GC.SuppressFinalize(this);
}
}
Upvotes: 2