Reputation: 3
Broker A.
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mqtt_1" dataDirectory="${activemq.data}" persistent="true" schedulePeriodForDestinationPurge="13000" offlineDurableSubscriberTimeout="90000" offlineDurableSubscriberTaskSchedule="20000" useJmx="true">
</broker>
Broker B.
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mqtt_2" dataDirectory="${activemq.data}" persistent="true" schedulePeriodForDestinationPurge="13000" offlineDurableSubscriberTimeout="90000" offlineDurableSubscriberTaskSchedule="20000" useJmx="true">
<networkConnectors>
<networkConnector
name="mqtt_2"
uri="static:(tcp://BrokerA IP:61616)"
conduitSubscriptions="false"
dynamicOnly="true"
prefetchSize="1"
networkTTL="1"
messageTTL="1"
consumerTTL="1"
/>
</networkConnectors> </broker>
Subscriber was connected Broker A. I publish a message to Broker B.
String url = "tcp://Broker B IP:61616";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
TopicConnection tpConnection = connectionFactory.createTopicConnection();
tpConnection.start();
TopicSession tpSession = tpConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = tpSession.createTopic("a/b/c/e");
TopicPublisher topicPublisher = tpSession.createPublisher(topic);
topicPublisher.setTimeToLive(5000);
MapMessage map = tpSession.createMapMessage();
map.setString("test", "value");
topicPublisher.publish(map);
tpConnection.stop();
topicPublisher.close();
tpSession.close();
tpConnection.close();
But, as soon as publish a message to Broker B, Message was expired
2015-01-23 16:51:41,930 | DEBUG | Message expired ActiveMQMapMessage {commandId = 6, responseRequired = true, messageId = ID:test.local-50762-1421999415422-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:test.local-50762-1421999415422-1:1:1:1, destination = topic://a.b.c.e, transactionId = null, expiration = 1421999420843, timestamp = 1421999415843, arrival = 0, brokerInTime = 1421999501929, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@457096, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {} } | org.apache.activemq.broker.region.RegionBroker | ActiveMQ Transport: tcp:///xxx.xxx.xxx.xxx:50763@61616
After this code('topicPublisher.setTimeToLive(5000);') was removed, success. Why is this code is the problem?
Upvotes: 0
Views: 419
Reputation: 3
It solved the problem.
activemq.xml
<plugins>
<timeStampingBrokerPlugin/>
</plugins>
Upvotes: 0
Reputation: 5155
topicPublisher.setTimeToLive(5000) causes the message to expire after 5 seconds.
Upvotes: 0