Nate Sauber
Nate Sauber

Reputation: 1138

How to programatically check if NServiceBus has finished processing all messages

As part of an effort to automate starting/stopping some of our NServiceBus services, I'd like to know when a service has finished processing all the messages in it's input queue.

The problem is that, while the NServiceBus service is running, my C# code is reporting one less message than is actually there. So it thinks that the queue is empty when there is still one message left. If the service is stopped, it reports the "correct" number of messages. This is confusing because, when I inspect the queues myself using the Private Queues view in the Computer Management application, it displays the "correct" number.

I'm using a variant of the following C# code to find the message count:

var queue = new MessageQueue(path);
return queue.GetAllMessages().Length;

I know this will perform horribly when there are many messages. The queues I'm inspecting should only ever have a handful of messages at a time.

I have looked at other related questions, but haven't found the help I need.

Any insight or suggestions would be appreciated!

Update: I should have mentioned that this service is behind a Distributor, which is shut down before trying to shut down this service. So I have confidence that new messages will not be added to the service's input queue.

Upvotes: 5

Views: 1767

Answers (2)

Nate Sauber
Nate Sauber

Reputation: 1138

WMI was the answer! Here's a first pass at the code. It could doubtless be improved.

public int GetMessageCount(string queuePath)
{
    const string query = "select * from Win32_PerfRawData_MSMQ_MSMQQueue";
    var query = new WqlObjectQuery(query);
    var searcher = new ManagementObjectSearcher(query);
    var queues = searcher.Get();

    foreach (ManagementObject queue in queues)
    {
        var name = queue["Name"].ToString();
        if (AreTheSameQueue(queuePath, name))
        {
            // Depending on the machine (32/64-bit), this value is a different type.
            // Casting directly to UInt64 or UInt32 only works on the relative CPU architecture.
            // To work around this run-time unknown, convert to string and then parse to int.
            var countAsString = queue["MessagesInQueue"].ToString();
            var messageCount = int.Parse(countAsString);
            return messageCount;
        }
    }

    return 0;
}

private static bool AreTheSameQueue(string path1, string path2)
{
    // Tests whether two queue paths are equivalent, accounting for differences
    // in case and length (if one path was truncated, for example by WMI).

    string sanitizedPath1 = Sanitize(path1);
    string sanitizedPath2 = Sanitize(path2);

    if (sanitizedPath1.Length > sanitizedPath2.Length)
    {
        return sanitizedPath1.StartsWith(sanitizedPath2);
    }

    if (sanitizedPath1.Length < sanitizedPath2.Length)
    {
        return sanitizedPath2.StartsWith(sanitizedPath1);
    }

    return sanitizedPath1 == sanitizedPath2;
}

private static string Sanitize(string queueName)
{
    var machineName = Environment.MachineName.ToLowerInvariant();
    return queueName.ToLowerInvariant().Replace(machineName, ".");
}

Upvotes: 1

Udi Dahan
Udi Dahan

Reputation: 12057

The thing is that it's not actually "one less message", but rather dependent on the number of messages currently being processed by the endpoint which, in a multi-threaded process, can be as high as the number of threads.

There's also the issue of client processes that continue to send messages to that same queue.

Probably the only "sure" way of handling this is by counting the messages multiple times with a delay in between and if the number stay zero over a certain number of attempts that you can assume the queue is empty.

Upvotes: 2

Related Questions