Reputation: 797
Background:
I have a standard Producer consumer queue, the consumers are slow while producers are fast. Expectation is whenever a producer completes the requested message it acknowledges the message and Producer will assume the task associated with the message is done. Since Producers are fast I don't want threads Producing to wait, instead whenever the message is acknowledged a callback should be invoked. Since JMS is limited on this front and I have used the ActiveMQ Classes like ActiveMQMessageProducer
directly - as much as possible.
Problem:
The message are getting Auto acknowledged, the registered async callback is being invoke even if Consumer hasn't started yet.
public void send(Destination destination,
Message message,
AsyncCallback onComplete)
Producer
public static boolean setup() {
Producer.connectionFactory = new
ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
// Create a Connection
Producer.connection =
(ActiveMQConnection)connectionFactory.createConnection();
connection.setAlwaysSessionAsync(true);
connection.start();
}
public Producer() {
session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = (ActiveMQDestination)session.createQueue("TEST.FOO");
producer = (ActiveMQMessageProducer)session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
}
...
public void run() {
long id = messageID.getAndIncrement();
String text = "Hello world!"
Message message = session.createTextMessage(text);
producer.send(message, new MessageCompletion(id, this.messageRundown));
}
Consumer
public static boolean setup() {
Consumer.connectionFactory = new ActiveMQConnectionFactory("tcp://127.0.0.1:61616");
Consumer.connection = (ActiveMQConnection)connectionFactory.createConnection();
connection.setAlwaysSessionAsync(true);
return true;
}
public Consumer() {
session = (ActiveMQSession)connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
destination = (ActiveMQDestination)session.createQueue("TEST.FOO");
consumer = (ActiveMQMessageConsumer)session.createConsumer(destination);
consumer.setMessageListener(this);
connection.start();
}
// implements MessageListener
@Override
public void onMessage(Message message) {
messageQueue.add(message);
}
public void run() {
while(true) {
Message message = messageQueue.poll();
while(message != null) {
// do some work
message.acknowledge();
message = messageQueue.poll();
}
Thread.sleep(10000);
}
}
Although consumer is not needed I have added it for reference, stuff has been removed to ensure brevity, this is part of working code.
Upvotes: 0
Views: 1351
Reputation: 18356
Your understanding of the way acknowledgement works is wrong. The async callback on the sender is only telling you that the broker has received the message. If it were a persistent send the callback would indicate that the message was written to disk as well.
There is no coupling of producer and consumer in a JMS or most other messaging brokers. The producer places a message on a queue and then the consumer can come along at any point and consume from that queue. There is no coupling of the two, a producer can't wait for a consumer before going on to produce the next message.
If you want to know when particular messages are processed so you can throttle work then you want to look into JMS Request / Response style messaging patterns.
Upvotes: 1