user3649344
user3649344

Reputation:

Why it is very slow to read the message from the queue in parallel?

I've developed a windows service that is writing 1500 messages in every second in a MSMQ.

In another service, I am reading that messages from queue in parallel and process them. But reading from queue is so slow and I think a bottleneck is the queue.Receive(TimeSpan.Zero).

I dont know why reading is slow?

My service is running on a server with good processing powers.

This is my code.

static Task Main()
{
    GetFromQueueAsync();
}


private static Task GetFromQueueAsync()
{
    string queueName = ConfigurationManager.AppSettings["QueueName"].ToString();

    while (true)
    {
        var blockArray = Enumerable.Range(0, 30).ToArray();

        Parallel.ForEach(blockArray, (i) =>
        {
            MessageQueue queue = new MessageQueue(queueName);

            try
            {                 
                var message = queue.Receive(TimeSpan.Zero);
                
                message.Formatter = new BinaryMessageFormatter();
                var labelParts = message.Label.Split('_');

                var isValidMessageAddress=  Validate(labelParts);

                if (isValidMessageAddress)
                {
                    //call my sysnc method
                }
        }
        catch (MessageQueueException mqex)
        {
            if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
            {
                return;
            }
            else throw;
        }
    });
}

Upvotes: 0

Views: 1037

Answers (2)

Ventsyslav Raikov
Ventsyslav Raikov

Reputation: 7202

That may not improve your time to read the messages but will save you some worker threads that you create with the Parallel.ForEach which blocks, waiting for messages. You can basically subscribe for new messages and dispatch the handling you have on the thread pool and eventually have higher number of simultaneous subscriptions that the Parallel.ForEach will do.

    MessageQueue queue = new MessageQueue(queueName);
    private void pollQueue(string queueName, int numberOfSimultaneousRequests)
    {
        for (int i = 0; i < numberOfSimultaneousRequests; i++)
        {
            queue.BeginReceive(TimeSpan.MaxValue, null, ProcessMessage);
        }
    }

    private void ProcessMessage(IAsyncResult asyncResult)
    {
        try
        {
            var message = queue.EndReceive(asyncResult);
            //do something with the message here
            //OR
            ThreadPool.QueueUserWorkItem(o =>
            {
                //do something with the message in another threadpool thread
            }, message);
        }
        catch (Exception ex)
        {
            //handle error and log

        }
        finally
        {
            //subscribe again for another message
            queue.BeginReceive(TimeSpan.MaxValue, null, ProcessMessage);
        }
    }

Upvotes: 0

user3666197
user3666197

Reputation: 1

Q: I don't know why reading is slow?

Let's first agree on that every process implementation has some performance ceiling, which cannot be broken, even when using infinitely more power.

Next, let's notice, that the concept of any Queue is by nature SERIAL, not CONCURRENT ( concurrency may get added to the pure-SERIAL-one-after-another-after-another message retrieval, but at an additional cost, both performance-wise ( throughput is obviously decreased, so as to become able to receive + signalize + handle and control altogether the many (now) CONCURRENT-read requests with the internally still pure-SERIAL Queue-head-end for the safe de-Queue-ing process and delivery ) and latency-wise ( latency is obviously increased, often many times a few order of magnitudes higher, if compared to the original, incomplicated pure-SERIAL, monopolist's exclusive, low-latency message access mechanics ).

Having this said, the true PARALLEL process scheduling ( all-read-all-messages-at-the-same-moment ( all-served-at-once, synchronously, one may add ) ) simply never happens on a single Queue-instance head-end, never.

Solution:

For faster throughput, the best way is to keep a pure-SERIAL, low-latency head-end processing on the Queue-instance and dispatch each and every message to some other Worker-thread for processing the off-loaded message, independently of the other messages still waiting inside the ( yes, still and always will be ) pure-SERIAL message-queue.

There are many ways, how to dispatch an already off-loaded message content for further processing and such choice is up to your architecture and design decisions. Both inproc:// or ipc:// ( protocol-stack-less ) inter-thread or inter-process communication means are well equipped for harnessing pools of many hundreds processing threads or sufficiently enough ( collocated or even widely re-distributed ) message-content processing processes, if your application needs to grow and scale-up performance, yet keep the lowest possible latency.

Beware of all the add-on costs
still left inside the while(true){...}-infinite loops,
any such, the more the repetitive ones, degenerate the Amdahl's Law computed PARALLEL-code Speedup:

The as-is code ( as reported by @TheodorZoulias ) repeats and repeats all the instantiation - there is indeed no reason to reinstantiate and throw away and reinstantiate all the 30-as-proposed Queue-head-ends, per each loop-run. This seems the worst sequence of steps and poorest resources management ever possible:

        while (true) //--------------------------------- INFINITE LOOP
        {            //----- NEW ARRAY                CREATED PER-LOOP
            var blockArray = Enumerable.Range(0, 30).ToArray();
                     //----- NEW .ForEach(){...}      CREATED PER-LOOP                
            // |||||||||||||||||||||||||||||||||||||||||||||||||||||||PAR||| BLOCK
            Parallel.ForEach(blockArray, (i) =>    // ||||||||||||||||PAR||| BLOCK
            {        //----- NEW QUEUE                CREATED PER-LOOP x PER-BLOCK
                MessageQueue queue = new MessageQueue(queueName);
                try
                {   var message = queue.Receive(TimeSpan.Zero);
                    message.Formatter = new BinaryMessageFormatter();
                    var labelParts = message.Label.Split('_');
                    var isValidMessageAddress=  Validate(labelParts);
                    if (isValidMessageAddress)
                    {
                     //--------------------- PER MESSAGE PAYLOAD PROCESSING START
                     // call my sysnc method
                     //--------------------- PER MESSAGE PAYLOAD PROCESSING END
                    }
                }
                catch (MessageQueueException mqex)
                {   if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
                    {   return; //-- <<<< ???
                      //------------ EACH IOTimeout KILLS +ONE POOL-MEMBER
                      //------------ 30th IOTimeout LEAVES THE POOL EMPTY
                    }
                    else throw;
                    //-------------- NO ERROR HANDLING
                }
            });
            // |||||||||||||||||||||||||||||||||||||||||||||||||||||||PAR||| BLOCK
        } //-------------------------------------------- INFINITE-LOOP

Might be rather re-formulated ( plus variables' re-use / pre allocations may help even more ):

         // |||||||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK
            Parallel.ForEach(blockArray, (i) =>    // |||||||||||||||:PAR||| BLOCK
            {        //--- A QUEUE HEAD-END           CREATED ONLY ONCE! PER-BLOCK
                MessageQueue queue = new MessageQueue(queueName);  //:PAR||| BLOCK
                while (true) //----PER-AGENT's [SERIAL]READING LOOP//:PAR||| BLOCK
                {                                                  //:PAR||| BLOCK
                    try      //----PER-AGENT's TRY{}               //:PAR||| BLOCK
                    {   var            message = queue.Receive( TimeSpan.Zero );
                                       message.Formatter = new BinaryMessageFormatter();
                        if ( Validate( message.Label.Split( '_' )))//:PAR||| BLOCK
                        {  //------PER-AGENT's---- PER MESSAGE     //:PAR||| BLOCK
                           // call my sysnc method                 //:PAR||| BLOCK
                           //------PER-AGENT's---- PER MESSAGE     //:PAR||| BLOCK
                        }                                          //:PAR||| BLOCK
                    }                                              //:PAR||| BLOCK
                    catch ...                                      //:PAR||| BLOCK
                    //-------------PER-AGENT's PER EXCEPTION HANDLING:PAR||| BLOCK
                    //                         WITHOUT ANY POOL AGENT:PAR||| BLOCK
                    //                         CANNIBALISATION       :PAR||| BLOCK
                } // --------------PER-AGENT's [SERIAL]READING LOOP  :PAR||| BLOCK
            } //|||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK
         // |||||||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK

Let more threads for moving more messages - performance scaling

Once the code-design concept is clear and sound, and if the Queue-head-end management is the problem, high performance messaging frameworks permit to increase a number of Queue-dedicated I/O-threads. Achievable performance envelopes are way above a few k[msg/s] up to some ~ 10.000 ~ 100.000 ~ 1.000.000 messages per second, so having small thousands messages en-Queued per second to get de-Queued is definitely not a problem, if proper engineering is put in place ( assuming the Queue is not moving some indeed extremely large BLOB-s, where additionally some Zero-Copy / pointer-moving tricks will have to take place, so as to keep the pace )

Upvotes: 1

Related Questions