Reputation: 1795
I am new to ActiveMQ. I have tried to implement producer-consumer (sender-receiver) in activemq. In my code, I am easily send & receive the messages from single producer to single consumer via ActiveMQ. But, the problem is, I can't send the message to multiple consumers from the same producer.
Here is my producer & consumer class.
MsgProducer.java
package jms_service;
import javax.jms.JMSException;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MsgProducer {
private static String url = "failover://tcp://localhost:61616";
public static javax.jms.ConnectionFactory connFactory;
public static javax.jms.Connection connection;
public static javax.jms.Session mqSession;
public static javax.jms.Topic topic;
public static javax.jms.MessageProducer producer;
public static void main(String[] args) throws JMSException {
connFactory = new ActiveMQConnectionFactory(url);
connection = connFactory.createConnection("system","manager");
connection.start();
mqSession = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
topic = mqSession.createTopic("RealTimeData");
producer = mqSession.createProducer(topic);
producer.setTimeToLive(30000);
TextMessage message = mqSession.createTextMessage();
int seq_id =1;
while(true)
{
message.setText("Hello world | " +"seq_id #"+seq_id);
producer.send(message);
seq_id++;
System.out.println("sent_msg =>> "+ message.getText());
// if(seq_id>100000) break;
try {
Thread.sleep(1000);
}
catch (InterruptedException e) { e.printStackTrace();}
}
}
}
MsgConsumer.java
package jms_service;
import java.text.SimpleDateFormat;
import java.util.Calendar;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class MsgConsumer {
private static String url = "failover://tcp://localhost:61616";
public static javax.jms.ConnectionFactory connFactory;
public static javax.jms.Connection connection;
public static javax.jms.Session mqSession;
public static javax.jms.Topic topic;
public static javax.jms.MessageConsumer consumer;
public static void main(String[] args) throws JMSException, InterruptedException {
connFactory = new ActiveMQConnectionFactory(url);
connection = connFactory.createConnection("system", "manager");
connection.setClientID("0002");
//connection.start();
mqSession = connection.createSession(true, Session.CLIENT_ACKNOWLEDGE);
topic = mqSession.createTopic("RealTimeData");
consumer = mqSession.createDurableSubscriber(topic, "SUBS01");
connection.start();
MessageListener listner = new MessageListener() {
public void onMessage(Message message) {
try {
if (message instanceof TextMessage) {
TextMessage txtmsg = (TextMessage) message;
Calendar cal = Calendar.getInstance();
//cal.getTime();
SimpleDateFormat sdf = new SimpleDateFormat("HH:mm:ss");
String time = sdf.format(cal.getTime());
String msg="received_message =>> "+ txtmsg.getText() + " | received_at :: "+time;
System.out.println(msg);
//consumer.sendData(msg);
}
} catch (JMSException e) {
System.out.println("Caught:" + e);
e.printStackTrace();
}
}
};
consumer.setMessageListener(listner);
}
}
Can anyone help to figure out the way for sending message to multiple consumers. Thanks in advance.
Upvotes: 2
Views: 5933
Reputation: 822
Topics is the best route. One producer to many consumers or one publisher to many subscribers. With Queues you have go write a loop to to get all the possible consumers and use different destinations to send the messages. Your motive would also determine whether to use Queues or Topics.
Upvotes: 3
Reputation: 5155
Queue semantics deliver a message once-and-only-once across all consumers. This is per the JMS spec (a great read to understand the basics).
Topic semantics deliver a message to every consumer. So, a Topic may be the answer to your needs.
Upvotes: 11
Reputation: 840
Assuming your question is
Can anyone help to figure out the way for sending message to multiple consumers
and without reading through your complete code, an approach could be to put your clients in a collection
static Vector<consumer> vecConsumer;
where you put in every new client and give a reference to all existing clients.
The broadcasting is just like sending to a single client, encapsulated in, for an example, a foreach
loop
for(consumer cons : vecConsumer)
{
//send stuff or put in sending queue
}
Upvotes: 4