Pure.Krome
Pure.Krome

Reputation: 86947

MSMQ and polling to receive messages?

I've got a windows service that does some image conversion. It works by firing off when any file (in a particular folder) is renamed (ie. rename file watcher). Works great until I have a massive amount of images dumped (and renamed) in that folder. CPU redlines, etc..

So, I was going to change my code to use MSMQ to queue all the files that need to be converted. Fine. Everytime the file is renamed and the file watcher fires, i then add a new message to the queue. Kewl.

Problem is this -> how do i grab one message at a time from the queue?

Do I need to make a timer object that polls the queue every xxx seconds? Or is there a way to constantly keep peeking the first item in the queue. Once a message exists, extract it, process it, then continue (which .. means, keep peeking until the world blows up).

I've wondered if i just need to put a while loop around the Receive method. Pseduo code is below (in Edit #2)...

Anyone have any experience with this and have some suggestions?

Thanks kindly!

EDIT:

If WCF is the way to go, can someone provide some sample code, etc instead?

EDIT 2:

Here's some pseudo code i was thinking off....

// Windows service start method.
protected override void OnStart(string[] args)
{
   // some initialisation stuf...

   // Start polling the queue.
   StartPollingMSMQ();

   // ....
}

private static void StartPollingMSMQ()
{
    // NOTE: This code should check if the queue exists, instead of just assuming it does.
    //       Left out for berevity.
    MessageQueue messageQueue = new MessageQueue(".\\Foo");

    while (true)
    {
        // This blocks/hangs here until a message is received.
        Message message = messageQueue.Receive(new TimeSpan(0, 0, 1));

        // Woot! we have something.. now process it...
        DoStuffWithMessage(message);

        // Now repeat for eva and eva and boomski...
    }
}

Upvotes: 8

Views: 15420

Answers (5)

dvr
dvr

Reputation: 11

cast the event args to get your message rather than sender to a new MessageQueue in ViktorJ example, surely this is more efficient? then use static field mq to call mq.BeginReceive otherwise your'e burning memory

Upvotes: 1

Alex Waddell
Alex Waddell

Reputation: 196

Be aware that the service example will block at OnStart(). Instead start a worker thread :

    protected override void OnStart(string[] args)
    {
        IntPtr handle = this.ServiceHandle;
        myServiceStatus.currentState = (int)State.SERVICE_START_PENDING;
        SetServiceStatus(handle, ref myServiceStatus);

        // Start a separate thread that does the actual work.

        if ((workerThread == null) ||
            ((workerThread.ThreadState &
             (System.Threading.ThreadState.Unstarted | System.Threading.ThreadState.Stopped)) != 0))
        {
            workerThread = new Thread(new ThreadStart(ServiceWorkerMethod));
            workerThread.Start();
        }

    }

then call BeginReceive() from the worker.

Upvotes: 0

Jonathan Parker
Jonathan Parker

Reputation: 6795

Sounds like you need to look into WCF.

Queues in Windows Communication Foundation

Load leveling. Sending applications can overwhelm receiving applications with messages. Queues can manage mismatched message production and consumption rates so that a receiver is not overwhelmed.

Here's an example using WCF and MSMQ

Upvotes: 0

ViktorJ
ViktorJ

Reputation:

If you use local queue, you don't need WCF.

This is how looks my sample service (a service clas from windows service project):

using System.Messaging;
public partial class MQProcessTest1 : ServiceBase
{
    //a name of the queue
    private const string MqName = @".\Private$\test1";
    //define static local private queue
    private static MessageQueue _mq;
    //lazy local property through which we access queue (it initializes queue when needed)
    private static MessageQueue mq
    {
        get
        {
            if (_mq == null)
            {
                if (!MessageQueue.Exists(MqName))
                    MessageQueue.Create(MqName);
                _mq = new MessageQueue(MqName, QueueAccessMode.ReceiveAndAdmin);
                _mq.Formatter = new BinaryMessageFormatter();
            }
            return _mq;
        }
    }

    //constructor
    public MQProcessTest1()
    {
        InitializeComponent();
        //event to process received message 
        mq.ReceiveCompleted += new ReceiveCompletedEventHandler(mq_ReceiveCompleted);
    }

    //method to process message
    private void mq_ReceiveCompleted(object sender, ReceiveCompletedEventArgs e)
    {
        //queue that have received a message
        MessageQueue cmq = (MessageQueue)sender;
        try
        {
            //a message we have received (it is already removed from queue)
            Message msg = cmq.EndReceive(e.AsyncResult);
            //here you can process a message
        }
        catch
        {
        }
        //refresh queue just in case any changes occurred (optional)
        cmq.Refresh();
        //tell MessageQueue to receive next message when it arrives
        cmq.BeginReceive();
    }

    protected override void OnStart(string[] args)
    {
        //let start receive a message (when arrives)
        if (mq != null)
            mq.BeginReceive();
        //you can do any additional logic if mq == null
    }

    protected override void OnStop()
    {
        //close MessageQueue on service stop
        if (mq != null)
            mq.Close();
        return;
    }
}

Upvotes: 26

paxdiablo
paxdiablo

Reputation: 881263

I was under the impression that MSMQ was built to be compatible with IBM's MQ product. If that is the case, you can call MQGET with a timeout and not worry about polling at all.

Just get a message off the queue with a two-second timeout (for example). If there was one there, process it. Then either exit the service if desired or go back to the MQGET with wait.

This means your service will not suck up CPU time unnecessarily, but it will still be able to exit in a timely fashion if signaled.

Generally, you would have something like:

Set up all queue stuff.
while true:
    Read from queue with 10-second timeout.
    If message was read:
        Process message
    If signaled to exit:
        break
Tear down queue stuff.
Exit.

Upvotes: 1

Related Questions