Reputation: 35265
I am interested to get some ideas from you about what would be a good/better threading architecture that respects the rules described below:
A thread must be running for the life of the application, in the sleep/wait mode if there is no work in the queue to be performed.
A thread must be of a BelowNormal priority (this eliminates possibility of using ThreadPool).
The thread must give its feedback to the main thread upon completion of task.
Thread will monitor a Queue<T> to get more jobs to be performed.
I am using .Net Framework 4.0
Let me know what you think :)
Upvotes: 3
Views: 1357
Reputation: 48949
This situation screams BlockingCollection loud and clear. Create a dedicated thread that watches the queue with its priority set approriately. The BlockingCollection.Take
method will block automatically when there are no items in the queue.
public class Example
{
private BlockingCollection<WorkItem> m_Queue = new BlockingCollection<WorkItem>();
public event EventHandler<WorkItemEventArgs> WorkItemCompleted;
public Example()
{
var thread = new Thread(
() =>
{
while (true)
{
WorkItem item = m_Queue.Take();
// Add code to process the work item here.
if (WorkItemCompleted != null)
{
WorkItemCompleted(this, new WorkItemEventArgs(item));
}
}
});
thread.IsBackground = true;
thread.Priority = ThreadPriority.BelowNormal;
thread.Start();
}
public void Add(WorkItem item)
{
m_Queue.Add(item);
}
}
Upvotes: 1
Reputation: 131112
Personally I roll my own usually, because I like having much tighter control.
I use this in Media Browser:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Diagnostics;
using MediaBrowser.Library.Logging;
namespace MediaBrowser.Library.Threading {
public static class Async {
public const string STARTUP_QUEUE = "Startup Queue";
class ThreadPool {
List<Action> actions = new List<Action>();
List<Thread> threads = new List<Thread>();
string name;
volatile int maxThreads = 1;
public ThreadPool(string name) {
Debug.Assert(name != null);
if (name == null) {
throw new ArgumentException("name should not be null");
}
this.name = name;
}
public void SetMaxThreads(int maxThreads) {
Debug.Assert(maxThreads > 0);
if (maxThreads < 1) {
throw new ArgumentException("maxThreads should be larger than 0");
}
this.maxThreads = maxThreads;
}
public void Queue(Action action, bool urgent) {
Queue(action, urgent, 0);
}
public void Queue(Action action, bool urgent, int delay) {
if (delay > 0) {
Timer t = null;
t = new Timer(_ =>
{
Queue(action, urgent, 0);
t.Dispose();
}, null, delay, Timeout.Infinite);
return;
}
lock (threads) {
// we are spinning up too many threads
// should be fixed
if (maxThreads > threads.Count) {
Thread t = new Thread(new ThreadStart(ThreadProc));
t.IsBackground = true;
// dont affect the UI.
t.Priority = ThreadPriority.Lowest;
t.Name = "Worker thread for " + name;
t.Start();
threads.Add(t);
}
}
lock (actions) {
if (urgent) {
actions.Insert(0, action);
} else {
actions.Add(action);
}
Monitor.Pulse(actions);
}
}
private void ThreadProc() {
while (true) {
lock (threads) {
if (maxThreads < threads.Count) {
threads.Remove(Thread.CurrentThread);
break;
}
}
List<Action> copy;
lock (actions) {
while (actions.Count == 0) {
Monitor.Wait(actions);
}
copy = new List<Action>(actions);
actions.Clear();
}
foreach (var action in copy) {
action();
}
}
}
}
static Dictionary<string, ThreadPool> threadPool = new Dictionary<string, ThreadPool>();
public static Timer Every(int milliseconds, Action action) {
Timer timer = new Timer(_ => action(), null, 0, milliseconds);
return timer;
}
public static void SetMaxThreads(string uniqueId, int threads) {
GetThreadPool(uniqueId).SetMaxThreads(threads);
}
public static void Queue(string uniqueId, Action action) {
Queue(uniqueId, action, null);
}
public static void Queue(string uniqueId, Action action, int delay) {
Queue(uniqueId, action, null,false, delay);
}
public static void Queue(string uniqueId, Action action, Action done) {
Queue(uniqueId, action, done, false);
}
public static void Queue(string uniqueId, Action action, Action done, bool urgent) {
Queue(uniqueId, action, done, urgent, 0);
}
public static void Queue(string uniqueId, Action action, Action done, bool urgent, int delay) {
Debug.Assert(uniqueId != null);
Debug.Assert(action != null);
Action workItem = () =>
{
try {
action();
} catch (ThreadAbortException) { /* dont report on this, its normal */ } catch (Exception ex) {
Debug.Assert(false, "Async thread crashed! This must be fixed. " + ex.ToString());
Logger.ReportException("Async thread crashed! This must be fixed. ", ex);
}
if (done != null) done();
};
GetThreadPool(uniqueId).Queue(workItem, urgent, delay);
}
private static ThreadPool GetThreadPool(string uniqueId) {
ThreadPool currentPool;
lock (threadPool) {
if (!threadPool.TryGetValue(uniqueId, out currentPool)) {
currentPool = new ThreadPool(uniqueId);
threadPool[uniqueId] = currentPool;
}
}
return currentPool;
}
}
}
It has a fairly elegant API, only feature I would like to add one day is scavenging empty thread pools.
Usage:
// Set the threads for custom thread pool
Async.SetMaxThreads("Queue Name", 10);
// Perform an action on the custom threadpool named: "Queue Name", when done call ImDone
Async.Queue("Queue Name", () => DoSomeThing(foo), () => ImDone(foo));
This has a few handy oveloads that allow you to queue delayed actions, and another to push in urgent jobs that skip to the front of the queue.
Upvotes: 1
Reputation: 34407
When I need to implement my own multi-threaded processing, I usually use something like this:
public class MyWorker<T> : IDisposable
{
private readonly Queue<T> _taskQueue; // task queue
private readonly object _threadLock = new object();
private Thread _thread; // worker thread
private ManualResetEvent _evExit;
private AutoResetEvent _evNewData;
/// <summary>Override this to process data.</summary>
protected abstract void ProcessData(T data);
/// <summary>Override this to set other thread priority.</summary>
protected virtual ThreadPriority ThreadPriority
{
get { return ThreadPriority.BelowNormal; }
}
protected MyWorker()
{
_taskQueue = new Queue<T>();
_evExit = new ManualResetEvent(false);
_evNewData = new AutoResetEvent(false);
}
~MyWorker()
{
Dispose(false);
}
private void ThreadProc()
{
try
{
var wh = new WaitHandle[] { _evExit, _evNewData };
while(true)
{
T data = default(T);
bool gotData = false;
lock(_taskQueue) // sync
{
if(_taskQueue.Count != 0) // have data?
{
data = _taskQueue.Dequeue();
gotData = true;
}
}
if(!gotData)
{
if(WaitHandle.WaitAny(wh) == 0) return; // demanded stop
continue; //we have data now, grab it
}
ProcessData(data);
if(_evExit.WaitOne(0)) return;
}
}
catch(ThreadInterruptedException)
{
// log warning - this is not normal
}
catch(ThreadAbortException)
{
// log warning - this is not normal
}
}
public void Start()
{
lock(_threadLock)
{
if(_thread != null)
throw new InvalidOperationException("Already running.");
_thread = new Thread(ThreadProc)
{
Name = "Worker Thread",
IsBackground = true,
Priority = ThreadPriority,
};
_thread.Start();
}
}
public void Stop()
{
lock(_threadLock)
{
if(_thread == null)
throw new InvalidOperationException("Is not running.");
_evExit.Set();
if(!_thread.Join(1000))
_thread.Abort();
_thread = null;
}
}
/// <summary>Enqueue data for processing.</summary>
public void EnqueueData(T data)
{
lock(_taskQueue)
{
_taskQueue.Enqueue(data);
_evNewData.Set(); // wake thread if it is sleeping
}
}
/// <summary>Clear all pending data processing requests.</summary>
public void ClearData()
{
lock(_taskQueue)
{
_taskQueue.Clear();
_evNewData.Reset();
}
}
protected virtual void Dispose(bool disposing)
{
lock(_threadLock)
{
if(_thread != null)
{
_evExit.Set();
if(!_thread.Join(1000))
_thread.Abort();
_thread = null;
}
}
_evExit.Close();
_evNewData.Close();
if(disposing)
_taskQueue.Clear();
}
public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}
}
Upvotes: 4
Reputation: 273244
- A thread must be of a BelowNormal priority (this eliminates possibility of using ThreadPool).
This seems to be the main stumbling block for using the TPL and ThreadPool. Are you sure you're not over-estimating the usefulness of a lower priority?
You will have to put in a lot of work to come up with something that will always be much less powerful (and much less tested/reliable) than the TPL.
I would reconsider this.
Upvotes: 3
Reputation: 17556
By reading above conditions
Some questions
1- Is there any other thread which will populate jobs in Queue< T > ?
if the answer is yes than Producer / Consumer Deign Pattern can be used here i am not aware of .net 4.0 but this design can be implemented in .net 3.5.
See here for example.
Upvotes: 1
Reputation: 283634
A thread pool sounds like just the thing. Actually, you can change the priority of .NET's own thread pool by setting the process priority. Bump the process priority down a notch and your UI up a notch and you should have a UI at normal priority and thread pool at lower priority.
Upvotes: 0