Reputation: 1439
I am trying to have multiple threads (Task) from a windows service to connect to same queue as subscribers so that I can process my messages faster. When I connect to my queues, I am receiving messages on my first subscribed thread and the rest are idle. Following is the code I am using to subscribe and receive messages:
private MQQueueManager _queueManager;
private MQQueue _queue;
private MQTopic _topic;
public bool isSubscribed = false;
public void Subscribe()
{
var queueManagerName = "myQueueManager";
var properties = new Hashtable();
//Set all the properties here
_queueManager = new MQQueueManager(queueManagerName, properties);
//Conect to Queue
_queue = _queueManager.AccessQueue("devQueue", MQC.MQOO_INPUT_AS_Q_DEF);
isSubscribed = true;
while (isSubscribed)
{
if (cancellationToken.IsCancellationRequested)
{
isSubscribed = false;
cancellationToken.ThrowIfCancellationRequested();
}
try
{
Receive(onMessageReceived);
}
catch (Exception ex)
{
Console.WriteLine("Exception: {0}", ex);
}
}
}
public override void Receive<T>(Action<T> onMessageReceived)
{
try
{
var dataReceived = new MQMessage();
_queue.Get(dataReceived);
T message;
message = (T)(object)dataReceived;
onMessageReceived(message);
_queueManager.Commit();
}
catch (Exception ex)
{
throw ex;
}
}
I also tried using MQC.MQOO_INPUT_SHARED
for accessing queue which did not work either. Am I missing something?
Upvotes: 0
Views: 1408
Reputation: 51
The code you've quoted attempts to open and receive messages from a queue named "devQueue". If that code runs under multiple threads then the messages sent to each thread will depend, in part, how quickly the messages are processed, and which version of MQ you are connecting to.
However, you mention subscribers in the question (and the code you quote declares a topic as well as the queue). Are you creating a subscription to the topic elsewhere in the app and then using this code to receive messages sent to that subscription? If so, then it's possible that your "queue receiving" threads are simply opening up a different queue to the one that is holding the subscription messages. How is the initial subscription being created?
Upvotes: 1