iamssin
iamssin

Reputation: 3

When activemq networkconnector was setting, topic message expired

Broker A.

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mqtt_1" dataDirectory="${activemq.data}" persistent="true" schedulePeriodForDestinationPurge="13000" offlineDurableSubscriberTimeout="90000" offlineDurableSubscriberTaskSchedule="20000" useJmx="true">
</broker>

Broker B.

<broker xmlns="http://activemq.apache.org/schema/core" brokerName="mqtt_2" dataDirectory="${activemq.data}" persistent="true" schedulePeriodForDestinationPurge="13000" offlineDurableSubscriberTimeout="90000" offlineDurableSubscriberTaskSchedule="20000" useJmx="true">
    <networkConnectors>
        <networkConnector
            name="mqtt_2"
            uri="static:(tcp://BrokerA IP:61616)"
            conduitSubscriptions="false"
            dynamicOnly="true"
            prefetchSize="1"
            networkTTL="1"
            messageTTL="1"
            consumerTTL="1"
            />
        </networkConnectors> </broker>

Subscriber was connected Broker A. I publish a message to Broker B.

String url = "tcp://Broker B IP:61616";
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
TopicConnection tpConnection = connectionFactory.createTopicConnection();
tpConnection.start();
TopicSession tpSession = tpConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = tpSession.createTopic("a/b/c/e");
TopicPublisher topicPublisher = tpSession.createPublisher(topic);
topicPublisher.setTimeToLive(5000);

MapMessage map = tpSession.createMapMessage();
map.setString("test", "value");
topicPublisher.publish(map);

tpConnection.stop();
topicPublisher.close();
tpSession.close();
tpConnection.close();

But, as soon as publish a message to Broker B, Message was expired

2015-01-23 16:51:41,930 | DEBUG | Message expired ActiveMQMapMessage {commandId = 6, responseRequired = true, messageId = ID:test.local-50762-1421999415422-1:1:1:1:1, originalDestination = null, originalTransactionId = null, producerId = ID:test.local-50762-1421999415422-1:1:1:1, destination = topic://a.b.c.e, transactionId = null, expiration = 1421999420843, timestamp = 1421999415843, arrival = 0, brokerInTime = 1421999501929, brokerOutTime = 0, correlationId = null, replyTo = null, persistent = true, type = null, priority = 4, groupID = null, groupSequence = 0, targetConsumerId = null, compressed = false, userID = null, content = org.apache.activemq.util.ByteSequence@457096, marshalledProperties = null, dataStructure = null, redeliveryCounter = 0, size = 0, properties = null, readOnlyProperties = false, readOnlyBody = false, droppable = false, jmsXGroupFirstForConsumer = false} ActiveMQMapMessage{ theTable = {} } | org.apache.activemq.broker.region.RegionBroker | ActiveMQ Transport: tcp:///xxx.xxx.xxx.xxx:50763@61616

After this code('topicPublisher.setTimeToLive(5000);') was removed, success. Why is this code is the problem?

Upvotes: 0

Views: 419

Answers (2)

iamssin
iamssin

Reputation: 3

It solved the problem.

activemq.xml

<plugins>
    <timeStampingBrokerPlugin/>
</plugins>

Upvotes: 0

ash
ash

Reputation: 5155

topicPublisher.setTimeToLive(5000) causes the message to expire after 5 seconds.

Upvotes: 0

Related Questions