Reputation: 7318
This is my first attempt at writing a Windows service.
This windows service has to process 2 windows message Queues.
Each Message Queue should have there own thread, but I can't seem to get the Architecture in Place.
I followed this Windows Service to run constantly which allowed me to create one thread in which I am Processing one Queue.
So this is my service class:
protected override void OnStart(string[] args)
{
_thread = new Thread(WorkerThreadFunc) { Name = "Address Calculator Thread", IsBackground = true };
_thread.Start();
}
private void WorkerThreadFunc()
{
_addressCalculator = new GACAddressCalculator();
while (!_shutdownEvent.WaitOne(0))
{
_addressCalculator.StartAddressCalculation();
}
}
protected override void OnStop()
{
_shutdownEvent.Set();
if (!_thread.Join(5000))
{ // give the thread 5 seconds to stop
_thread.Abort();
}
}
In My GACAddressCalculator.StartAddressCalculation()
I am creating a Queue Processor Object which looks like this:
public void StartAddressCalculation()
{
try
{
var googleQueue = new GISGoogleQueue("VehMonLogGISGoogle", 1, _gacLogger, 1);
googleQueue.ProccessMessageQueue();
}
catch (Exception ex)
{
}
}
And this is GISGoogleQueue
:
public class GISGoogleQueue : BaseMessageQueue
{
public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread)
: base(queueName, threadCount, logger, messagesPerThread)
{
}
public override void ProccessMessageQueue()
{
if (!MessageQueue.Exists(base.QueueName))
{
_logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName));
return;
}
var messageQueue = new MessageQueue(QueueName);
var myVehMonLog = new VehMonLog();
var o = new Object();
var arrTypes = new Type[2];
arrTypes[0] = myVehMonLog.GetType();
arrTypes[1] = o.GetType();
messageQueue.Formatter = new XmlMessageFormatter(arrTypes);
using (var pool = new Pool(ThreadCount))
{
// Infinite loop to process all messages in Queue
for (; ; )
{
for (var i = 0; i < MessagesPerThread; i++)
{
try
{
while (pool.TaskCount() >= MessagesPerThread) ; // Stop execution until Tasks in pool have been executed
var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0)); // TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins
if (message != null) // Check if message is not Null
{
var monLog = (VehMonLog)message.Body;
pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool
}
}
catch (Exception ex)
{
}
}
}
}
}
}
Now this works fine for 1 Message queue, but if I want to process another message Queue it won't happen as I have an infinite loop in the ProccessMessageQueue
method.
I want to execute each Queue in a separate thread.
I think I am making a mistake in WorkerThreadFunc()
, I have to somehow start two threads from there or in the OnStart()
.
Also if you have any tips on how to improve this service would be great.
By the way I am using the Pool Class from this Answer https://stackoverflow.com/a/436552/1910735 for the thread Pool inside ProccessMessageQueue
Upvotes: 1
Views: 2308
Reputation: 46034
I would suggest changing your service class as follows (comments below):
protected override void OnStart(string[] args)
{
_thread = new Thread(WorkerThreadFunc)
{
Name = "Run Constantly Thread",
IsBackground = true
};
_thread.Start();
}
GISGoogleQueue _googleQueue1;
GISGoogleQueue _googleQueue2;
private void WorkerThreadFunc()
{
// This thread is exclusively used to keep the service running.
// As such, there's no real need for a while loop here. Create
// the necessary objects, start them, wait for shutdown, and
// cleanup.
_googleQueue1 = new GISGoogleQueue(...);
_googleQueue1.Start();
_googleQueue2 = new GISGoogleQueue(...);
_googleQueue2.Start();
_shutdownEvent.WaitOne(); // infinite wait
_googleQueue1.Shutdown();
_googleQueue2.Shutdown();
}
protected override void OnStop()
{
_shutdownEvent.Set();
if (!_thread.Join(5000))
{
// give the thread 5 seconds to stop
_thread.Abort();
}
}
I'm ignoring your GACAddressCalculator
. From what you showed, it appeared to be a thin wrapper around GISGoogleQueue
. Obviously, if it actually does something that you didn't show, it'll need to be factored back in.
Notice that two GISGoogleQueue
objects were created in the WorkerThreadFunc()
. So let's next look at how to create those objects to achieve the appropriate threading model.
public class GISGoogleQueue : BaseMessageQueue
{
System.Threading.Thread _thread;
System.Threading.ManualResetEvent _shutdownEvent;
public GISGoogleQueue(string queueName, int threadCount, GACLogger logger, int messagesPerThread)
: base(queueName, threadCount, logger, messagesPerThread)
{
// Let this class wrap a thread object. Create it here.
_thread = new Thread(RunMessageQueueFunc()
{
Name = "Run Message Queue Thread " + Guid.NewGuid().ToString(),
IsBackground = true
};
_shutdownEvent = new ManualResetEvent(false);
}
public Start()
{
_thread.Start();
}
public Shutdown()
{
_shutdownEvent.Set();
if (!_thread.Join(5000))
{
// give the thread 5 seconds to stop
_thread.Abort();
}
}
private void RunMessageQueueFunc()
{
if (!MessageQueue.Exists(base.QueueName))
{
_logger.LogMessage(MessageType.Information, string.Format("Queue '{0}' doesn't exist", this.QueueName));
return;
}
var messageQueue = new MessageQueue(QueueName);
var myVehMonLog = new VehMonLog();
var o = new Object();
var arrTypes = new Type[2];
arrTypes[0] = myVehMonLog.GetType();
arrTypes[1] = o.GetType();
messageQueue.Formatter = new XmlMessageFormatter(arrTypes);
using (var pool = new Pool(ThreadCount))
{
// Here's where we'll wait for the shutdown event to occur.
while (!_shutdownEvent.WaitOne(0))
{
for (var i = 0; i < MessagesPerThread; i++)
{
try
{
// Stop execution until Tasks in pool have been executed
while (pool.TaskCount() >= MessagesPerThread) ;
// TimeOut for message reading from Queue, set to 5 minutes, Will throw exception after 5 mins
var message = messageQueue.Receive(new TimeSpan(0, 0, 5, 0));
if (message != null) // Check if message is not Null
{
var monLog = (VehMonLog)message.Body;
pool.QueueTask(() => ProcessMessageFromQueue(monLog)); // Add to Tasks list in Pool
}
}
catch (Exception ex)
{
}
}
}
}
}
}
This approach centers around using a Thread
object wrapped by the GISGoogleQueue
class. For each GISGoogleQueue
object you create, you get a wrapped thread that will do the work once Start()
is called on the GISGoogleQueue
object.
A couple of points. In the RunMessageQueueFunc()
, you're checking to see if the name of the queue exists. If it doesn't, the function exits. IF that happens, the thread exits, too. The point is that you may wish to do that check earlier in the process. Just a thought.
Second, note that your infinite loop has been replaced by a check against the _shutdownEvent
object. That way, the loop will stop when the service shuts down. For timeliness, you'll need to make sure that a complete pass through the loop doesn't take too long. Otherwise, you may end up aborting the thread 5 seconds after shutdown. The abort is only there to make sure things are torn down, but should be avoided if possible.
I know a lot of people will prefer using the Task
class to do things like this. It appears that you are inside RunMessageQueueFunc()
. But for threads that run for the duration of the process, I think the Task
class is the wrong choice because it ties up threads in the thread pool. To me, that what the Thread
class is build for.
HTH
Upvotes: 3
Reputation: 2143
You can use Parallel.ForEach like this;
Parallel.ForEach(queueItems, ProcessQueue); //this will process each queue item in a separate thread
private void ProcessQueue(QueueItem queue)
{
//your processing logic
}
Upvotes: 0