Reputation: 8942
I use a ActiveMQServer as a broker.
Server.java
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.core.config.Configuration;
import org.apache.activemq.core.config.impl.ConfigurationImpl;
import org.apache.activemq.core.remoting.impl.netty.NettyAcceptorFactory;
import org.apache.activemq.core.server.ActiveMQServer;
import org.apache.activemq.core.server.ActiveMQServers;
public class Server {
public static void main(final String arg[]) throws Exception
{
try
{
// Step 1. Create the Configuration, and set the properties accordingly
Configuration configuration = new ConfigurationImpl();
//we only need this for the server lock file
configuration.setJournalDirectory("target/data/journal");
configuration.setPersistenceEnabled(false); // http://activemq.apache.org/what-is-the-difference-between-persistent-and-non-persistent-delivery.html
configuration.setSecurityEnabled(false); // http://activemq.apache.org/security.html
/**
* this map with configuration values is not necessary (it configures the default values).
* If you want to modify it to run the example in two different hosts, remember to also
* modify the client's Connector at {@link EmbeddedRemoteExample}.
*/
Map<String, Object> map = new HashMap<String, Object>();
map.put("host", "localhost");
map.put("port", 61616);
// https://access.redhat.com/documentation/en-US/JBoss_Enterprise_Application_Platform/5/html/HornetQ_User_Guide/ch14s04.html
TransportConfiguration transpConf = new TransportConfiguration(NettyAcceptorFactory.class.getName(),map);
HashSet<TransportConfiguration> setTransp = new HashSet<TransportConfiguration>();
setTransp.add(transpConf);
configuration.setAcceptorConfigurations(setTransp); // https://github.com/apache/activemq-6/blob/master/activemq-server/src/main/java/org/apache/activemq/spi/core/remoting/Acceptor.java
// Step 2. Create and start the server
ActiveMQServer server = ActiveMQServers.newActiveMQServer(configuration);
server.start();
}
catch (Exception e)
{
e.printStackTrace();
throw e;
}
}
}
and I have a Client.java class, where I create a queue, message producer and message consumer.
Client.java
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
import org.apache.activemq.api.core.ActiveMQException;
import org.apache.activemq.api.core.SimpleString;
import org.apache.activemq.api.core.TransportConfiguration;
import org.apache.activemq.api.core.client.ActiveMQClient;
import org.apache.activemq.api.core.client.ClientConsumer;
import org.apache.activemq.api.core.client.ClientMessage;
import org.apache.activemq.api.core.client.ClientProducer;
import org.apache.activemq.api.core.client.ClientSession;
import org.apache.activemq.api.core.client.ClientSessionFactory;
import org.apache.activemq.api.core.client.MessageHandler;
import org.apache.activemq.api.core.client.ServerLocator;
import org.apache.activemq.core.remoting.impl.netty.NettyConnectorFactory;
public class Client
{
private static final String queueName = "queue.exampleQueue";
private static final String propName = "myprop";
ClientSessionFactory sf = null;
ClientSession session = null;
ClientProducer producer = null;
ClientMessage message = null;
ClientConsumer consumer = null;
String name;
public Client(String name){
this.name = name;
}
public void initializeComponents(){
try
{
// Step 3. As we are not using a JNDI environment we instantiate the objects directly
/**
* this map with configuration values is not necessary (it configures the default values).
* If you modify it to run the example in two different hosts, remember to also modify the
* server's Acceptor at {@link EmbeddedServer}
*/
Map<String,Object> map = new HashMap<String,Object>();
map.put("host", "localhost");
map.put("port", 61616);
// -------------------------------------------------------
ServerLocator serverLocator = ActiveMQClient.createServerLocatorWithoutHA(new TransportConfiguration(NettyConnectorFactory.class.getName(), map));
sf = serverLocator.createSessionFactory();
// Step 4. Create a core queue
ClientSession coreSession = sf.createSession(false, false, false);
if (!coreSession.queueQuery(new SimpleString(queueName)).isExists())
coreSession.createTemporaryQueue(queueName, queueName);
coreSession.close();
// Step 5. Create the session, and producer
session = sf.createSession();
producer = session.createProducer(queueName); //
// Step 7. Create the message consumer and start the connection
consumer = session.createConsumer(queueName);
session.start();
// Step 8. Receive the message.
consumer.setMessageHandler(new MessageHandler(){
public void onMessage(ClientMessage message)
{
System.out.println("client " + name + " received message " + message.getStringProperty(propName));
}
});
}
catch (Exception e)
{
e.printStackTrace();
}
}
public void sendMessage(String messageText){
// Step 6. Create and send a message
message = session.createMessage(false);
message.putStringProperty(propName, messageText);
try {
System.out.println("Producer is going to send a message");
producer.send(message);
} catch (ActiveMQException e) {
e.printStackTrace();
}
}
public void cleanUpConnection(){
if (sf != null)
{
sf.close();
}
}
}
In this main I create one queue, two producers and two consumers.
public static void main(final String[] args)
{
Client cl1 = new Client("cl1");
cl1.initializeComponents();
Client cl2 = new Client("cl2");
cl2.initializeComponents();
for (int i = 0; i < 10; i++){
try {
Date date = new Date();
String formattedDate = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss").format(date);
cl1.sendMessage(formattedDate + " number of iteration " + i);
Thread.sleep(2000);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
cl1.cleanUpConnection();
cl2.cleanUpConnection();
}
and this is the output of main:
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:22 number of iteration 0
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:24 number of iteration 1
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:26 number of iteration 2
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:28 number of iteration 3
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:30 number of iteration 4
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:32 number of iteration 5
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:34 number of iteration 6
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:36 number of iteration 7
Producer is going to send a message
client cl1 received message 05/06/2015 16:56:38 number of iteration 8
Producer is going to send a message
client cl2 received message 05/06/2015 16:56:40 number of iteration 9
client cl2 received message 05/06/2015 16:56:22 number of iteration 0
client cl2 received message 05/06/2015 16:56:26 number of iteration 2
client cl2 received message 05/06/2015 16:56:30 number of iteration 4
client cl2 received message 05/06/2015 16:56:34 number of iteration 6
client cl2 received message 05/06/2015 16:56:38 number of iteration 8
I would like to ask, how can I make concurrent message consumption for all consumers.
I mean:
client cl1 received ... message of iteration 0
client cl2 received ... message of iteration 0
client cl1 received ... message of iteration 1
client cl2 received ... message of iteration 1
I found prefetch limit settings, but don't know how to use ActiveMQConnectionFactory and ActiveMQConnection classes without refactoring in client.java class. Is there any other options to make message consumption concurrent for all consumers?
Upvotes: 1
Views: 1818
Reputation: 1036
You are only sending a single message per iteration, thus only a single client will receive a message. If you want each client to receive the same messages, use a topic instead of a queue.
Upvotes: 1