Reputation: 7335
I'm trying to connect to an ActiveMQ broker using AMQP 1.0, but I want to use JMS within my application code. I'm interested in using JMS primarily because I want developers to be able to use API's that they are already familiar with.
I have ActiveMQ 5.14.0 running on localhost and the following code :
public static void main(String[] args) throws JMSException, InterruptedException {
Connection connection = null;
try {
// Producer
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("amqp://localhost:5672");
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic("customerTopic");
// Publish
MessageProducer producer = session.createProducer(topic);
for ( int i = 0; i < 10; i++) {
Message msg = session.createTextMessage("Task : " + i);
producer.send(msg);
}
session.close();
} finally {
if (connection != null) {
connection.close();
}
}
}
The code always fails in the same way, with the following root cause in the stacktrace :
Caused by: org.apache.activemq.transport.InactivityIOException: Channel was inactive for too (>30000) long: tcp://127.0.0.1:5672
This happens on the connection.start()
method call.
If I run the same code against the ActiveMQ tcp endpoint then it executes as expected.
My pom file dependencies are as follows ( and I suspect that this is the root of my problem as I find the documentation of dependencies extremely hard to follow )
<dependencies>
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client-jms</artifactId>
<version>0.32</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-amqp</artifactId>
<version>5.14.0</version>
</dependency>
</dependencies>
My immediate question is "Why doesn't this work?".
My supplementary ( opinion based ) question is "Is it worthwhile trying to use the JMS abstraction above AMQP 1.0, or should I just abandon myself to learning the provider specific APIs?"
Upvotes: 1
Views: 1954
Reputation: 3913
it is better to work with jndi
public static void main(String[] args) throws JMSException, InterruptedException, NamingException {
Connection connection = null;
try {
Properties props = new Properties();
props.put(Context.INITIAL_CONTEXT_FACTORY, "org.apache.qpid.jms.jndi.JmsInitialContextFactory");
props.setProperty("connectionfactory.myFactoryLookup",
"amqp://localhost:5672");
props.put("topic." + "MyTOPIC", "customerTopic");
InitialContext ic = new InitialContext(props);
ConnectionFactory cf1 = (ConnectionFactory) ic.lookup("myFactoryLookup");
Topic topic = (Topic) ic.lookup("MyTOPIC");
connection = cf1.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
MessageProducer producer = session.createProducer(topic);
connection.start();
for (int i = 0; i < 10; i++) {
Message msg = session.createTextMessage("Task : " + i);
producer.send(msg);
}
session.close();
} finally {
if (connection != null) {
connection.close();
}
}
}
replace
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-amqp-1-0-client-jms</artifactId>
<version>0.32</version>
</dependency>
by
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>qpid-jms-client</artifactId>
<version>0.9.0</version>
</dependency>
on the broker side you need to add:
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?transport.transformer=jms"/>
ref http://activemq.apache.org/amqp.html
Upvotes: 2