Reputation: 153
My Producer:
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producer {
//127.0.0.1 loop back to localhost
private static String url="http://127.0.0.1:61616";
private static String subject="DemoQueue4";
public static void main(String[] args)throws JMSException {
ConnectionFactory cf=null;
Connection con = null;
try {
// getting jms connection from the server and starting it.
System.out.println("Please wait connecting...");
cf=new ActiveMQConnectionFactory(url);
con=cf.createConnection();
System.out.println("Successfully Connected \n");
System.out.println("Please wait creating session...");
Session s=con.createSession(false,Session.AUTO_ACKNOWLEDGE);
System.out.println("Session created \n");
System.out.println("Please wait Creating Queue...");
// create queue
Destination d;
d=s.createQueue(subject);
System.out.println("Queue created \n");
System.out.println("Please wait Creating Producer...");
// create producer/sender
MessageProducer mp;
mp=s.createProducer(d);
System.out.println("Producer created \n");
System.out.println("Please wait Connection Starting...");
con.start();
System.out.println("Connection Started \n");
System.out.println("Please wait Creating TextMessage..");
// We will send a small text message saying 'Hello' in Japanese
TextMessage message = s.createTextMessage("Hi How are you!");
System.out.println("TextMessage Created \n");
System.out.println("Please wait TextMessage Sending...");
// Here we are sending the message!
mp.send(message);
System.out.println("TextMessage Sent '" + message.getText() + "'");
System.out.println("Success");
}catch(Exception e) {
e.printStackTrace();
}finally{
if(con!=null){
con.close();
}
}
}
}
/**i am trying to insert the text message to queue but it is not inserting**/
My Broker Config - activemq.xml:
/**this is my config file of activemq.xml**/
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- Allows us to use system properties as variables in this configuration file -->
<bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="locations">
<value>file:${activemq.conf}/credentials.properties</value>
</property>
</bean>
<!-- Allows log searching in hawtio console -->
<bean id="logQuery" class="org.fusesource.insight.log.log4j.Log4jLogQuery"
lazy-init="false" scope="singleton"
init-method="start" destroy-method="stop">
</bean>
<!--
The <broker> element is used to configure the ActiveMQ broker.
-->
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}">
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry topic=">" >
<!-- The constantPendingMessageLimitStrategy is used to prevent
slow topic consumers to block producers and affect other consumers
by limiting the number of messages that are retained
For more information, see:
http://activemq.apache.org/slow-consumer-handling.html
-->
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
<!--
The managementContext is used to configure how ActiveMQ is exposed in
JMX. By default, ActiveMQ uses the MBean server that is started by
the JVM. For more information, see:
http://activemq.apache.org/jmx.html
-->
<managementContext>
<managementContext createConnector="false"/>
</managementContext>
<!--
Configure message persistence for the broker. The default persistence
mechanism is the KahaDB store (identified by the kahaDB tag).
For more information, see:
http://activemq.apache.org/persistence.html
-->
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
<!--
The systemUsage controls the maximum amount of space the broker will
use before disabling caching and/or slowing down producers. For more information, see:
http://activemq.apache.org/producer-flow-control.html
-->
<systemUsage>
<systemUsage>
<memoryUsage>
<memoryUsage percentOfJvmHeap="70" />
</memoryUsage>
<storeUsage>
<storeUsage limit="100 gb"/>
</storeUsage>
<tempUsage>
<tempUsage limit="50 gb"/>
</tempUsage>
</systemUsage>
</systemUsage>
<!--
The transport connectors expose ActiveMQ over a given protocol to
clients and other brokers. For more information, see:
http://activemq.apache.org/configuring-transports.html
-->
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="http://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<!--
<transportConnector name="http" uri="http://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/> -->
</transportConnectors>
<!-- destroy the spring context on shutdown to stop jetty -->
<shutdownHooks>
<bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.hooks.SpringContextHook" />
</shutdownHooks>
</broker>
<!--
Enable web consoles, REST and Ajax APIs and demos
The web consoles requires by default login, you can disable this in the jetty.xml file
Take a look at ${ACTIVEMQ_HOME}/conf/jetty.xml for more details
-->
<import resource="jetty.xml"/>
</beans>
Activemq version 5.9
jdk 1.8
eclipseIDE
jars:
activemq-all.jar
httpclient.jar
httpcore.jar
commonslogging-1.2.jar
com.thoughtworks.xstream.jar
xmlpull-xpp3-1.1.4c.jar
While running getting error mp.send(message)
its a general simple example which you can also copy paste in your environment
Please wait connecting...
log4j:WARN No appenders could be found for logger (org.apache.http.impl.conn.PoolingClientConnectionManager).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Successfully Connected
Please wait creating session...
Session created
Please wait Creating Queue...
Queue created
Please wait Creating Producer...
Producer created
Please wait Connection Starting...
Connection Started
Please wait Creating TextMessage..
TextMessage Created
Please wait TextMessage Sending...
javax.jms.JMSException: Could not post command: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:WSTIN0128-50902-1472648010012-2:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:WSTIN0128-50902-1472648010012-2:1:1:1, destination = queue://DemoQueue4, transactionId = null, expiration = 0, timestamp = 1472648011069, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@67d48005, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Diyotta-http!} due to: java.net.SocketTimeoutException: Read timed out
at org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:72)
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1423)
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1333)
at org.apache.activemq.ActiveMQSession.send(ActiveMQSession.java:1811)
at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:289)
at org.apache.activemq.ActiveMQMessageProducer.send(ActiveMQMessageProducer.java:224)
at org.apache.activemq.ActiveMQMessageProducerSupport.send(ActiveMQMessageProducerSupport.java:241)
at com.kumar.jmsproducer.Producer.main(Producer.java:57)
Caused by: java.io.IOException: Could not post command: ActiveMQTextMessage {commandId = 5, responseRequired = true, messageId = ID:WSTIN0128-50902-1472648010012-2:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:WSTIN0128-50902-1472648010012-2:1:1:1, destination = queue://DemoQueue4, transactionId = null, expiration = 0, timestamp = 1472648011069, arrival = 0, brokerInTime = 0, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@67d48005, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = true, readOnlyBody = true, droppable = false, jmsXGroupFirstForConsumer = false, text = Diyotta-http!} due to: java.net.SocketTimeoutException: Read timed out
at org.apache.activemq.util.IOExceptionSupport.create(IOExceptionSupport.java:33)
at org.apache.activemq.transport.http.HttpClientTransport.oneway(HttpClientTransport.java:138)
at org.apache.activemq.transport.AbstractInactivityMonitor.doOnewaySend(AbstractInactivityMonitor.java:304)
at org.apache.activemq.transport.AbstractInactivityMonitor.oneway(AbstractInactivityMonitor.java:286)
at org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:68)
at org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:81)
at org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:86)
at org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1394)
... 6 more
Caused by: java.net.SocketTimeoutException: Read timed out
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at java.net.SocketInputStream.read(Unknown Source)
at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160)
at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84)
at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273)
at org.apache.http.impl.conn.LoggingSessionInputBuffer.readLine(LoggingSessionInputBuffer.java:116)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140)
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57)
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260)
at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283)
at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251)
at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:197)
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271)
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123)
at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:685)
at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:487)
at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:106)
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57)
at org.apache.activemq.transport.http.HttpClientTransport.oneway(HttpClientTransport.java:125)
... 12 more
The HTTP and HTTPS transports are used to tunnel over HTTP or HTTPS using XML payloads. This allows the ActiveMQ client and broker to tunnel over HTTP avoiding any firewall issues.
If the client is not JMS you might want to look at REST or Ajax support instead.
Note that the HTTP Transport is located in the activemq-optional
jar.
ActiveMQ uses a combination of Jetty's Server and SslSocketConnector
objects to communicate via the HTTPS transport. When using HTTPS, improper configuration of the corresponding SSL certificates and/or keys may very well lead to the Jetty infinite loop problem described in this nabble thread. A good reference on creating and configuring keys and certificates can be found here.
BrokerStartup.java
package com.kumar.httpsprotocol;
import java.net.URI;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.broker.TransportConnector;
public class BrokerStartup {
private static final String KEYSTORE = "i:/apache-activemq-5.9.0/conf/broker.ks";
private static final String TRUST_KEYSTORE = "i:/apache-activemq-5.9.0/conf/broker.ts";
private static final String PASSWORD = "123456";
static BrokerService broker = new BrokerService();
public static void startBorker(String host,String port) throws Exception{
System.setProperty("javax.net.ssl.keyStore", KEYSTORE);
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStore", TRUST_KEYSTORE);
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
TransportConnector connector = new TransportConnector();
connector.setUri(new URI("https://"+host+":"+port));
broker.addConnector(connector);
broker.start();
System.out.println("Broker Started.."+broker.getBrokerName());
}
public static void main(String args[]){
try {
startBorker("HostName","61616");
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
--------------------------------------------------------------------------------
Client Code: (Producer.java)
package com.kumar.httpsprotocol;
import javax.jms.Connection;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Producer {
private static final String KEYSTORE = "i:/apache-activemq-5.9.0/conf/broker.ks";
private static final String TRUSTSTORE = "i:/apache-activemq-5.9.0/conf/client.ts";
private static final String PASSWORD = "123456";
public static void main(String[] args)throws JMSException {
System.setProperty("javax.net.ssl.keyStore", KEYSTORE);
System.setProperty("javax.net.ssl.keyStorePassword", PASSWORD);
System.setProperty("javax.net.ssl.trustStore", TRUSTSTORE);
System.setProperty("javax.net.ssl.trustStorePassword", PASSWORD);
ActiveMQConnectionFactory cf=null;
Connection con = null;
try {
System.out.println("Please wait connecting...");
cf=new ActiveMQConnectionFactory("https://HostName:61617");
con=cf.createConnection();
/*cf.setTrustStore("I:/apache-activemq-5.9.0/conf/client.ts");
cf.setTrustStorePassword("123456");*/
System.out.println("Successfully Connected \n");
Session s=con.createSession(false,Session.AUTO_ACKNOWLEDGE);
System.out.println("Session created \n");
Destination d=s.createQueue("DemoQueue7");
MessageProducer mp=s.createProducer(d);
con.start();
System.out.println("Please wait Creating TextMessage..");
TextMessage message = s.createTextMessage("Recieved from Kumar HTTPS Protocol!");
System.out.println("TextMessage Created \n");
System.out.println("Please wait TextMessage Sending...");
mp.send(message);
System.out.println("TextMessage Sent '" + message.getText() + "'");
}catch(Exception e) {
e.printStackTrace();
}finally{
if(con!=null){
con.close();
}
}
}
}
activemq.xml
<transportConnector name="openwire" uri="https://HostName:61617?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
Exception in thread "main" java.lang.NoSuchMethodError: org.eclipse.jetty.server.ssl.SslConnector.getSslContextFactory()Lorg/eclipse/jetty/util/ssl/SslContextFactory;
at org.apache.activemq.transport.SecureSocketConnectorFactory.createConnector(SecureSocketConnectorFactory.java:65)
at org.apache.activemq.transport.https.HttpsTransportServer.doStart(HttpsTransportServer.java:36)
at org.apache.activemq.util.ServiceSupport.start(ServiceSupport.java:55)
at org.apache.activemq.broker.TransportConnector.start(TransportConnector.java:243)
at org.apache.activemq.broker.BrokerService.startTransportConnector(BrokerService.java:2501)
at org.apache.activemq.broker.BrokerService.startAllConnectors(BrokerService.java:2415)
at org.apache.activemq.broker.BrokerService.doStartBroker(BrokerService.java:666)
at org.apache.activemq.broker.BrokerService.startBroker(BrokerService.java:632)
at org.apache.activemq.broker.BrokerService.start(BrokerService.java:568)
at com.kumar.httpsprotocol.BrokerStartup.startBorker(BrokerStartup.java:26)
at com.kumar.httpsprotocol.BrokerStartup.main(BrokerStartup.java:33)
Upvotes: 2
Views: 5942
Reputation: 3913
your code is ok and broker config too, it seems to be a library version problem, can you verify that the jars version used are the same on the client & broker side specially for these one : httpclient-4.2.5.jar httpcore-4.2.4.jar xstream-1.4.4.jar xpp3-1.1.4c.jar
Upvotes: 1