Reputation: 27
We're using Artemis 2.19.0 and distributing XML messages.Recently, we found that some messages might get lost when sending them to a multicast address which has two durable multicast queues bound, those two durable queues both have a XPATH filter.
E.g.:
Somehow, IN.QUEUE1.FOO or IN.QUEUE2.FOO or both won't receive 1000 messages eventually.
We've tired to remove filter from one of them, then everything works fine, both queue will receive 1000 messages.
So, my question is:
If you have anything unclear, please ask. Thanks
UPDATE 1:
Versions:
Java: 1.8
Spring-Integration: 5.5.11
Spring-jms: 5.3.19
Artemis: 2.19.0
XML File:
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:int-xml="http://www.springframework.org/schema/integration/xml"
xmlns:int-file="http://www.springframework.org/schema/integration/file"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:jms="http://www.springframework.org/schema/integration/jms"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-4.3.xsd
http://www.springframework.org/schema/integration/jms
http://www.springframework.org/schema/integration/jms/spring-integration-jms-4.3.xsd
http://www.springframework.org/schema/integration/xml
http://www.springframework.org/schema/integration/xml/spring-integration-xml-4.3.xsd
http://www.springframework.org/schema/integration/file
http://www.springframework.org/schema/integration/file/spring-integration-file-4.3.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task-4.3.xsd">
<!-- Multicast address -->
<beans:bean id="topic" class="org.apache.activemq.artemis.jms.client.ActiveMQTopic">
<beans:constructor-arg value="IN.ADDRESS.FOO"/>
</beans:bean>
<!-- Anycast queue -->
<beans:bean id="queue" class="org.apache.activemq.artemis.jms.client.ActiveMQQueue">
<beans:constructor-arg value="AQ.QUEUE.FOO"/>
</beans:bean>
<channel id="topicChannel">
</channel>
<task:executor id="executor" pool-size="2"/>
<publish-subscribe-channel id="outChannel" task-executor="executor"/>
<filter id="consumer1" input-channel="outChannel" output-channel="topicChannel" expression="payload.length() > 0"/>
<filter id="consumer2" input-channel="outChannel" output-channel="topicChannel" expression="payload.length() > 0"/>
<jms:message-driven-channel-adapter id="queueAdapter" destination="queue" channel="outChannel"
acknowledge="auto" connection-factory="ConnectionFactory"/>
<jms:outbound-channel-adapter id="topicAdapter" destination="topic" channel="topicChannel"
connection-factory="ConnectionFactory"/>
</beans:beans>
ConnectionFactory Bean:
@Bean(name = "ConnectionFactory")
public SingleConnectionFactory ibConnectionFactory(
@Value("${artemis.broker-url}") String brokerUrl,
@Value("${artemis.user}") String username,
@Value("${artemis.password}") String password) throws JMSException {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL(brokerUrl);
factory.setUser(username);
factory.setPassword(password);
return new SingleConnectionFactory(factory);
}
Sender program:
try(ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(brokerUrl, user, password)) {
Connection conn = fac.createConnection();
Session session = conn.createSession();
MessageProducer producer = session.createProducer(new ActiveMQQueue("AQ.QUEUE.FOO"));
Message msg = session.createTextMessage("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Root xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>");
int count = 0;
while (count < 1000) {
System.out.println(count);
producer.send(msg);
count ++;
}
}
XPATH Filter:
XPATH '/Root/Data/PrimaryKey/Key/DetailedIdentity/ATCode[text()="AK"]'
Message Sample:
<?xml version="1.0" encoding="UTF-8"?><Root xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>
Either remove the task-executor attribute on publish-subscribe-channel or remove one of the queue's filter fix the problem.
UPDATE 2:
A minimal example with 10 concurrent tasks sending 1000 messages in total, if all the queues under the same address both have the XPATH filter then it would not receive 1000 messages, but with removing one of the filter it works.
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.jms.client.ActiveMQTopic;
import javax.jms.*;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class TestRunner {
public static void main(String[] args) throws Exception {
String brokerUrl = "(tcp://server1:61616,tcp://server2:61616)?ha=true&reconnectAttempts=-1&retryInterval=100&retryIntervalMultiplier=1.5&maxRetryInterval=6000";
String user = "admin";
String password = "admin";
ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(brokerUrl, user, password);
ExecutorService ser = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
ser.submit(() -> {
Connection conn = null;
try {
conn = fac.createConnection();
Session session = conn.createSession();
MessageProducer producer = session.createProducer(new ActiveMQTopic("IN.ADDRESS.FOO"));
Message msg = session.createTextMessage("<?xml version=\"1.0\" encoding=\"UTF-8\"?><Root xmlns:xsi=\"http://www.w3.org/2001/XMLSchema-instance\"><Data><PrimaryKey><Key><DetailedIdentity><ATCode>AK</ATCode></DetailedIdentity></Key></PrimaryKey></Data></Root>");
int count = 0;
while (count < 100) {
System.out.println(count);
msg.setStringProperty("MessageId", String.valueOf(count));
producer.send(msg);
count++;
}
session.close();
} catch (JMSException e) {
e.printStackTrace();
}
});
}
}
}
Upvotes: 0
Views: 104
Reputation: 35122
Thanks for the test-case. I was able to use it to reproduce the error you were seeing. I opened ARTEMIS-4687 and sent a PR to fix the issue. This will be fixed in 2.33.0 release which is due in the next few weeks.
Upvotes: 1