Reputation:
I've developed a windows service that is writing 1500 messages in every second in a MSMQ.
In another service, I am reading that messages from queue in parallel and process them. But reading from queue is so slow and I think a bottleneck is the queue.Receive(TimeSpan.Zero)
.
I dont know why reading is slow?
My service is running on a server with good processing powers.
This is my code.
static Task Main()
{
GetFromQueueAsync();
}
private static Task GetFromQueueAsync()
{
string queueName = ConfigurationManager.AppSettings["QueueName"].ToString();
while (true)
{
var blockArray = Enumerable.Range(0, 30).ToArray();
Parallel.ForEach(blockArray, (i) =>
{
MessageQueue queue = new MessageQueue(queueName);
try
{
var message = queue.Receive(TimeSpan.Zero);
message.Formatter = new BinaryMessageFormatter();
var labelParts = message.Label.Split('_');
var isValidMessageAddress= Validate(labelParts);
if (isValidMessageAddress)
{
//call my sysnc method
}
}
catch (MessageQueueException mqex)
{
if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
{
return;
}
else throw;
}
});
}
Upvotes: 0
Views: 1037
Reputation: 7202
That may not improve your time to read the messages but will save you some worker threads that you create with the Parallel.ForEach
which blocks, waiting for messages. You can basically subscribe for new messages and dispatch the handling you have on the thread pool and eventually have higher number of simultaneous subscriptions that the Parallel.ForEach
will do.
MessageQueue queue = new MessageQueue(queueName);
private void pollQueue(string queueName, int numberOfSimultaneousRequests)
{
for (int i = 0; i < numberOfSimultaneousRequests; i++)
{
queue.BeginReceive(TimeSpan.MaxValue, null, ProcessMessage);
}
}
private void ProcessMessage(IAsyncResult asyncResult)
{
try
{
var message = queue.EndReceive(asyncResult);
//do something with the message here
//OR
ThreadPool.QueueUserWorkItem(o =>
{
//do something with the message in another threadpool thread
}, message);
}
catch (Exception ex)
{
//handle error and log
}
finally
{
//subscribe again for another message
queue.BeginReceive(TimeSpan.MaxValue, null, ProcessMessage);
}
}
Upvotes: 0
Reputation: 1
Q: I don't know why reading is slow?
Let's first agree on that every process implementation has some performance ceiling, which cannot be broken, even when using infinitely more power.
Next, let's notice, that the concept of any Queue
is by nature SERIAL
, not CONCURRENT
( concurrency may get added to the pure-SERIAL
-one-after-another-after-another message retrieval, but at an additional cost, both performance-wise ( throughput is obviously decreased, so as to become able to receive + signalize + handle and control altogether the many (now) CONCURRENT
-read requests with the internally still pure-SERIAL Queue-head-end for the safe de-Queue-ing process and delivery ) and latency-wise ( latency is obviously increased, often many times a few order of magnitudes higher, if compared to the original, incomplicated pure-SERIAL
, monopolist's exclusive, low-latency message access mechanics ).
Having this said, the true PARALLEL
process scheduling ( all-read-all-messages-at-the-same-moment ( all-served-at-once, synchronously, one may add ) ) simply never happens on a single Queue-instance head-end, never.
For faster throughput, the best way is to keep a pure-SERIAL, low-latency head-end processing on the Queue-instance and dispatch each and every message to some other Worker-thread for processing the off-loaded message, independently of the other messages still waiting inside the ( yes, still and always will be ) pure-SERIAL
message-queue.
There are many ways, how to dispatch an already off-loaded message content for further processing and such choice is up to your architecture and design decisions. Both inproc://
or ipc://
( protocol-stack-less ) inter-thread or inter-process communication means are well equipped for harnessing pools of many hundreds processing threads or sufficiently enough ( collocated or even widely re-distributed ) message-content processing processes, if your application needs to grow and scale-up performance, yet keep the lowest possible latency.
while(true){...}
-infinite loops,The as-is code ( as reported by @TheodorZoulias ) repeats and repeats all the instantiation - there is indeed no reason to reinstantiate and throw away and reinstantiate all the 30-as-proposed Queue-head-ends, per each loop-run. This seems the worst sequence of steps and poorest resources management ever possible:
while (true) //--------------------------------- INFINITE LOOP
{ //----- NEW ARRAY CREATED PER-LOOP
var blockArray = Enumerable.Range(0, 30).ToArray();
//----- NEW .ForEach(){...} CREATED PER-LOOP
// |||||||||||||||||||||||||||||||||||||||||||||||||||||||PAR||| BLOCK
Parallel.ForEach(blockArray, (i) => // ||||||||||||||||PAR||| BLOCK
{ //----- NEW QUEUE CREATED PER-LOOP x PER-BLOCK
MessageQueue queue = new MessageQueue(queueName);
try
{ var message = queue.Receive(TimeSpan.Zero);
message.Formatter = new BinaryMessageFormatter();
var labelParts = message.Label.Split('_');
var isValidMessageAddress= Validate(labelParts);
if (isValidMessageAddress)
{
//--------------------- PER MESSAGE PAYLOAD PROCESSING START
// call my sysnc method
//--------------------- PER MESSAGE PAYLOAD PROCESSING END
}
}
catch (MessageQueueException mqex)
{ if (mqex.MessageQueueErrorCode == MessageQueueErrorCode.IOTimeout)
{ return; //-- <<<< ???
//------------ EACH IOTimeout KILLS +ONE POOL-MEMBER
//------------ 30th IOTimeout LEAVES THE POOL EMPTY
}
else throw;
//-------------- NO ERROR HANDLING
}
});
// |||||||||||||||||||||||||||||||||||||||||||||||||||||||PAR||| BLOCK
} //-------------------------------------------- INFINITE-LOOP
Might be rather re-formulated ( plus variables' re-use / pre allocations may help even more ):
// |||||||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK
Parallel.ForEach(blockArray, (i) => // |||||||||||||||:PAR||| BLOCK
{ //--- A QUEUE HEAD-END CREATED ONLY ONCE! PER-BLOCK
MessageQueue queue = new MessageQueue(queueName); //:PAR||| BLOCK
while (true) //----PER-AGENT's [SERIAL]READING LOOP//:PAR||| BLOCK
{ //:PAR||| BLOCK
try //----PER-AGENT's TRY{} //:PAR||| BLOCK
{ var message = queue.Receive( TimeSpan.Zero );
message.Formatter = new BinaryMessageFormatter();
if ( Validate( message.Label.Split( '_' )))//:PAR||| BLOCK
{ //------PER-AGENT's---- PER MESSAGE //:PAR||| BLOCK
// call my sysnc method //:PAR||| BLOCK
//------PER-AGENT's---- PER MESSAGE //:PAR||| BLOCK
} //:PAR||| BLOCK
} //:PAR||| BLOCK
catch ... //:PAR||| BLOCK
//-------------PER-AGENT's PER EXCEPTION HANDLING:PAR||| BLOCK
// WITHOUT ANY POOL AGENT:PAR||| BLOCK
// CANNIBALISATION :PAR||| BLOCK
} // --------------PER-AGENT's [SERIAL]READING LOOP :PAR||| BLOCK
} //|||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK
// |||||||||||||||||||||||||||||||||||||||||||||||||||||||||:PAR||| BLOCK
Once the code-design concept is clear and sound, and if the Queue-head-end management is the problem, high performance messaging frameworks permit to increase a number of Queue-dedicated I/O-threads. Achievable performance envelopes are way above a few k[msg/s]
up to some ~ 10.000 ~ 100.000 ~ 1.000.000
messages per second, so having small thousands messages en-Queued per second to get de-Queued is definitely not a problem, if proper engineering is put in place ( assuming the Queue is not moving some indeed extremely large BLOB-s, where additionally some Zero-Copy / pointer-moving tricks will have to take place, so as to keep the pace )
Upvotes: 1