Reputation: 9134
I have used ActiveMQ to Send messages and Receive them Asynchronously.
There, I'm having a problem with deciding the best way to waiting in the for messages. Sleeping thread in a loop is one option. But it feels doesn't look good for me.
Can anyone suggest a better way for this.
AsyncReceiver.java
public class AsyncReceiver implements MessageListener, ExceptionListener{
public static void main(String[] args) throws Exception{
Properties env = new Properties();
env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
env.put("queue.queueSampleQueue","MyNewQueue");
InitialContext ctx = new InitialContext(env);
Queue queue = (Queue) ctx.lookup("queueSampleQueue");
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
QueueConnection queueConn = connFactory.createQueueConnection();
QueueSession queueSession = queueConn.createQueueSession(false,Session.AUTO_ACKNOWLEDGE);
QueueReceiver queueReceiver = queueSession.createReceiver(queue);
AsyncReceiver asyncReceiver = new AsyncReceiver();
queueReceiver.setMessageListener(asyncReceiver);
queueConn.setExceptionListener(asyncReceiver);
queueConn.start();
// Waiting for messages
System.out.print("waiting for messages");
for (int i = 0; i < 10; i++) {
Thread.sleep(1000);
}
queueConn.close();
}
public void onMessage(Message message){
TextMessage msg = (TextMessage) message;
try {
System.out.println("received: " + msg.getText());
} catch (JMSException ex) {
ex.printStackTrace();
}
}
public void onException(JMSException exception){
System.err.println("an error occurred: " + exception);
}
}
Sender.java
public class Sender{
public static void main(String[] args) throws Exception{
Properties env = new Properties();
env.put(Context.INITIAL_CONTEXT_FACTORY,"org.apache.activemq.jndi.ActiveMQInitialContextFactory");
env.put(Context.PROVIDER_URL, "tcp://localhost:61616");
env.put("queue.queueSampleQueue", "MyNewQueue");
InitialContext ctx = new InitialContext(env);
Queue queue = (Queue) ctx.lookup("queueSampleQueue");
QueueConnectionFactory connFactory = (QueueConnectionFactory) ctx.lookup("QueueConnectionFactory");
QueueConnection queueConn = connFactory.createQueueConnection();
QueueSession queueSession = queueConn.createQueueSession(false,Session.DUPS_OK_ACKNOWLEDGE);
QueueSender queueSender = queueSession.createSender(queue);
queueSender.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
TextMessage message = queueSession.createTextMessage("Hello");
queueSender.send(message);
System.out.println("sent: " + message.getText());
queueConn.close();
}
}
Upvotes: 1
Views: 3520
Reputation: 108
There are two ways to process/consume messages in the Queue.
Periodically check the queue for new messages - This is suitable if you run your program periodically. You can do this by implementing a loop with some thread sleeps. Ex. twice a day, once a day etc.
Register consumers (use MessageListener) with the queue. You can do this as below example.
Consumer.java
javax.jms.Connection connection = null;
Session session = null;
Destination destination = null;
MessageConsumer consumer = null;
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(URL);
connection = connectionFactory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue(queueName);
consumer = session.createConsumer(destination);
consumer.setMessageListener(new YourClass());
YourClass.java
public class YourClass implements MessageListener {
@Override
public void onMessage(Message message) {
TextMessage textMessage = (TextMessage) message;
inputJsonString = textMessage.getText();
//do what ever you want with inputJsonString
message.acknowledge();
}
}
Upvotes: 3