Lukyanov Mikhail
Lukyanov Mikhail

Reputation: 525

Messages are written to the queue after complete execution and the producer has stopped

I was faced with a broker's (ActiveMQ-Artemis version 2.17.0) behavior unusual for me.

With a large number of messages and when they are quickly sent by the manufacturer, some of the messages reach the queue after the complete execution and the manufacturer has stopped. This is especially evident when the hard drive is normal, not SSD.

As an example, I use the following Apache Camel 2.25.3 route to send messages

        <route autoStartup="false" factor:name="TCP SJMS2"
            factor:trace="false" id="route-e098a2c8-efd4-41dd-9c1d-57937663cfbe">
            <from id="endpoint-cef2b9db-e359-4fb0-aa4d-4afda4f79c10" uri="timer://init?delay=-1&amp;repeatCount=200000">
            </from>
            <setBody factor:component="SetBodyEndpoint"
                factor:custom-name="Установить тело сообщения"
                factor:guid="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5" id="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5">
                <simple>&lt;?xml version="1.0" encoding="utf-8"?&gt;
&lt;env:Envelope xmlns:env="http://www.w3.org/2003/05/soap-envelope"&gt;
  2 kB message body
&lt;/env:Envelope&gt;</simple>
            </setBody>
            <to id="endpoint-546af4a0-ebe5-4479-91f0-f6b6609264cc" uri="local2amq://TCP.IN?connectionFactory=%23tcpArtemisCF">
            </to>
        </route>

    <bean class="org.apache.camel.component.sjms2.Sjms2Component" id="local2amq">
        <property name="connectionFactory" ref="artemisConnectionFactory"/>
        <property name="connectionCount" value="5"/>
    </bean>

    <bean
     class="org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory"
        factor:bean-type="ARTEMIS_CONNECTION_FACTORY" id="tcpArtemisCF" name="tcpArtemisCF">
        <property name="brokerURL" value="(tcp://localhost:61717)?blockOnDurableSend=false"/>
    </bean>

This route works out quickly, sending 200,000 messages, the speed is somewhere around 6,000 s/s.

But if, after completing this route, go to the broker's queues, there will be only about 80,000 messages in the queue, the rest are added further gradually at a speed of 200 - 2000 s/s

enter image description here

I have not seen such a behavior in a regular ActiveMQ, after the route is completed, all messages are in the queue.

Main questions.

  1. Is this behavior common and expected? What parameters is it regulated by?

  2. How can you see the number of messages that have been sent but are not yet in the queue?

  3. How can you achieve the behavior so that when the route terminates, all messages are written to the queue?

Broker config

<?xml version='1.0'?>
<configuration xmlns="urn:activemq"
               xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
               xmlns:xi="http://www.w3.org/2001/XInclude"
               xsi:schemaLocation="urn:activemq /schema/artemis-configuration.xsd">

   <core xmlns="urn:activemq:core" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="urn:activemq:core ">

      <name>0.0.0.0</name>


      <persistence-enabled>true</persistence-enabled>

      <journal-type>NIO</journal-type>

      <paging-directory>data/paging</paging-directory>

      <bindings-directory>data/bindings</bindings-directory>

      <journal-directory>data/journal</journal-directory>

      <large-messages-directory>data/large-messages</large-messages-directory>

      <journal-datasync>true</journal-datasync>

      <journal-min-files>2</journal-min-files>

      <journal-pool-files>10</journal-pool-files>

      <journal-device-block-size>4096</journal-device-block-size>

      <journal-file-size>10M</journal-file-size>
      
      <journal-buffer-timeout>836000</journal-buffer-timeout>

      <journal-max-io>1</journal-max-io>

      <disk-scan-period>5000</disk-scan-period>

      <max-disk-usage>100</max-disk-usage>

      <critical-analyzer>true</critical-analyzer>

      <critical-analyzer-timeout>120000</critical-analyzer-timeout>

      <critical-analyzer-check-period>60000</critical-analyzer-check-period>

      <critical-analyzer-policy>HALT</critical-analyzer-policy>

      
      <page-sync-timeout>836000</page-sync-timeout>


      <acceptors>
         <acceptor name="artemis">tcp://0.0.0.0:61717</acceptor>
      </acceptors>


      <security-settings>
         <security-setting match="#">
            <permission type="createNonDurableQueue" roles="amq"/>
            <permission type="deleteNonDurableQueue" roles="amq"/>
            <permission type="createDurableQueue" roles="amq"/>
            <permission type="deleteDurableQueue" roles="amq"/>
            <permission type="createAddress" roles="amq"/>
            <permission type="deleteAddress" roles="amq"/>
            <permission type="consume" roles="amq"/>
            <permission type="browse" roles="amq"/>
            <permission type="send" roles="amq"/>
            <!-- we need this otherwise ./artemis data imp wouldn't work -->
            <permission type="manage" roles="amq"/>
         </security-setting>
      </security-settings>

      <address-settings>
         <!-- if you define auto-create on certain queues, management has to be auto-create -->
         <address-setting match="activemq.management#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
         <!--default for catch all-->
         <address-setting match="#">
            <dead-letter-address>DLQ</dead-letter-address>
            <expiry-address>ExpiryQueue</expiry-address>
            <redelivery-delay>0</redelivery-delay>
            <!-- with -1 only the global-max-size is in use for limiting -->
            <max-size-bytes>-1</max-size-bytes>
            <message-counter-history-day-limit>10</message-counter-history-day-limit>
            <address-full-policy>PAGE</address-full-policy>
            <auto-create-queues>true</auto-create-queues>
            <auto-create-addresses>true</auto-create-addresses>
            <auto-create-jms-queues>true</auto-create-jms-queues>
            <auto-create-jms-topics>true</auto-create-jms-topics>
         </address-setting>
      </address-settings>

      <addresses>
         <address name="DLQ">
            <anycast>
               <queue name="DLQ" />
            </anycast>
         </address>
         <address name="ExpiryQueue">
            <anycast>
               <queue name="ExpiryQueue" />
            </anycast>
         </address>

      </addresses>
   </core>
</configuration>

Broker logs

17:58:19,887 INFO  [org.apache.activemq.artemis.integration.bootstrap] AMQ101000: Starting ActiveMQ Artemis Server
2021-04-05 17:58:19,926 INFO  [org.apache.activemq.artemis.core.server] AMQ221000: live Message Broker is starting with configuration Broker Configuration (clustered=false,journalDirectory=data/journal,bindingsDirectory=data/bindings,largeMessagesDirectory=data/large-messages,pagingDirectory=data/paging)
2021-04-05 17:58:19,958 INFO  [org.apache.activemq.artemis.core.server] AMQ221013: Using NIO Journal
2021-04-05 17:58:20,038 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-server]. Adding protocol support for: CORE
2021-04-05 17:58:20,039 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-amqp-protocol]. Adding protocol support for: AMQP
2021-04-05 17:58:20,041 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-hornetq-protocol]. Adding protocol support for: HORNETQ
2021-04-05 17:58:20,047 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-mqtt-protocol]. Adding protocol support for: MQTT
2021-04-05 17:58:20,047 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-openwire-protocol]. Adding protocol support for: OPENWIRE
2021-04-05 17:58:20,048 INFO  [org.apache.activemq.artemis.core.server] AMQ221043: Protocol module found: [artemis-stomp-protocol]. Adding protocol support for: STOMP
2021-04-05 17:58:20,163 INFO  [org.apache.activemq.artemis.core.server] AMQ221034: Waiting indefinitely to obtain live lock
2021-04-05 17:58:20,163 INFO  [org.apache.activemq.artemis.core.server] AMQ221035: Live Server Obtained live lock
2021-04-05 17:58:21,867 INFO  [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address DLQ supporting [ANYCAST]
2021-04-05 17:58:21,869 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: Deploying ANYCAST queue DLQ on address DLQ
2021-04-05 17:58:21,876 INFO  [org.apache.activemq.artemis.core.server] AMQ221080: Deploying address ExpiryQueue supporting [ANYCAST]
2021-04-05 17:58:21,877 INFO  [org.apache.activemq.artemis.core.server] AMQ221003: Deploying ANYCAST queue ExpiryQueue on address ExpiryQueue
2021-04-05 17:58:22,686 INFO  [org.apache.activemq.artemis.core.server] AMQ221020: Started NIO Acceptor at 0.0.0.0:61717 for protocols [CORE,MQTT,AMQP,HORNETQ,STOMP,OPENWIRE]
2021-04-05 17:58:22,797 INFO  [org.apache.activemq.artemis.core.server] AMQ221007: Server is now live
2021-04-05 17:58:22,798 INFO  [org.apache.activemq.artemis.core.server] AMQ221001: Apache ActiveMQ Artemis Message Broker version 2.17.0 [0.0.0.0, nodeID=024cff0e-8ff2-11eb-8968-c0b6f9f8ba29]
2021-04-05 17:58:23,113 INFO  [org.apache.activemq.hawtio.branding.PluginContextListener] Initialized activemq-branding plugin
2021-04-05 17:58:23,250 INFO  [org.apache.activemq.hawtio.plugin.PluginContextListener] Initialized artemis-plugin plugin
2021-04-05 17:58:24,336 INFO  [io.hawt.HawtioContextListener] Initialising hawtio services
2021-04-05 17:58:24,349 INFO  [io.hawt.system.ConfigManager] Configuration will be discovered via system properties
2021-04-05 17:58:24,352 INFO  [io.hawt.jmx.JmxTreeWatcher] Welcome to Hawtio 2.11.0
2021-04-05 17:58:24,359 INFO  [io.hawt.web.auth.AuthenticationConfiguration] Starting hawtio authentication filter, JAAS realm: "activemq" authorized role(s): "amq" role principal classes: "org.apache.activemq.artemis.spi.core.security.jaas.RolePrincipal"
2021-04-05 17:58:24,378 INFO  [io.hawt.web.proxy.ProxyServlet] Proxy servlet is disabled
2021-04-05 17:58:24,385 INFO  [io.hawt.web.servlets.JolokiaConfiguredAgentServlet] Jolokia overridden property: [key=policyLocation, value=file:/D:/Documents/apache-artemis-2.17.0/bin/emptyNew/etc/\jolokia-access.xml]
2021-04-05 17:58:24,712 INFO  [org.apache.activemq.artemis] AMQ241001: HTTP Server started at http://localhost:8161
2021-04-05 17:58:24,713 INFO  [org.apache.activemq.artemis] AMQ241002: Artemis Jolokia REST API available at http://localhost:8161/console/jolokia
2021-04-05 17:58:24,714 INFO  [org.apache.activemq.artemis] AMQ241004: Artemis Console available at http://localhost:8161/console
2021-04-05 17:59:08,763 INFO  [io.hawt.web.auth.LoginServlet] Hawtio login is using 1800 sec. HttpSession timeout
2021-04-05 17:59:10,512 INFO  [io.hawt.web.auth.LoginServlet] Logging in user: root
2021-04-05 17:59:11,206 INFO  [io.hawt.web.auth.keycloak.KeycloakServlet] Keycloak integration is disabled

UPDATE

data for regular ActiveMQ version 5.15.11

Camel route

        <route autoStartup="false" factor:name="TCP SJMS2"
            factor:trace="false" id="route-e098a2c8-efd4-41dd-9c1d-57937663cfbe">
            <from id="endpoint-cef2b9db-e359-4fb0-aa4d-4afda4f79c10" uri="timer://init?delay=-1&amp;repeatCount=200000">
            </from>
            <setBody factor:component="SetBodyEndpoint"
                factor:custom-name="Установить тело сообщения"
                factor:guid="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5" id="endpoint-361ea09a-9e8a-4f44-a428-05e27dbdf3b5">
                <simple>&lt;?xml version="1.0" encoding="utf-8"?&gt;
&lt;env:Envelope xmlns:env="http://www.w3.org/2003/05/soap-envelope"&gt;
  2 kB message body
&lt;/env:Envelope&gt;</simple>
            </setBody>
            <to id="endpoint-f697cad9-90db-47b9-877f-4189febdd010" uri="localmq://PERF.IN?connectionFactory=%23tcpActiveMQCF">
            </to>
        </route>

    <bean class="org.apache.activemq.camel.component.ActiveMQComponent" id="localmq">
        <property name="configuration" ref="jmsConfig"/>
    </bean>

     <bean class="org.apache.camel.component.jms.JmsConfiguration" id="jmsConfig">
        <property name="asyncStartListener" value="true"/>
        <property name="cacheLevelName" value="CACHE_CONSUMER"/> 
        <property name="preserveMessageQos" value="true"/>  
    </bean> 
    <bean class="org.apache.activemq.pool.PooledConnectionFactory"
        destroy-method="stop" factor:bean-type="AMQ_CONNECTION_FACTORY"
        id="tcpActiveMQCF" init-method="start" name="tcpActiveMQCF">
        <property name="maxConnections" value="1"/>
        <property name="maximumActiveSessionPerConnection" value="15"/>
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory"
                id="tcpActiveMQCF_connection" name="tcpActiveMQCF_connection">
                <property name="brokerURL" value="tcp://localhost:61616?jms.useAsyncSend=false"/>
            </bean>
        </property>
    </bean>

For this route, I get a speed of about 2500 s/s and messages are immediately written to the queue, while changing the parameter jms.useAsyncSend practically does not affect performance in my case.

Upvotes: 1

Views: 1081

Answers (1)

Justin Bertram
Justin Bertram

Reputation: 35217

This kind of behavior is expected when sending non-durable messages because non-durable messages are sent in a non-blocking manner. It's not clear whether or not you're sending non-durable messages, but you've also set blockOnDurableSend=false on your client's URL so even durable messages will be sent non-blocking.

From the broker's perspective the messages haven't actually arrived so there's no way to see the number of messages that have been sent but are not yet in the queue.

If you want to ensure that when the Camel route terminates all messages are written to the queue then you should send durable messages and set blockOnDurableSend=true (which is the default value).

Keep in mind that blocking will reduce performance (potentially substantially) based on the speed of you hard disk. This is because the client will have to wait for a response from the broker for every message it sends, and for every message the broker receives it will have to persist that message to disk and wait for the hard disk to sync before it sends a response back to the client. Therefore, if your hard disk can't sync quickly the client will have to wait a long time relatively speaking.

One of the configuration parameters that influences this behavior is journal-buffer-timeout. This value is calculated automatically and set when the broker instance is first created. You'll see evidence of this logged, e.g.:

Auto tuning journal ...
done! Your system can make 250 writes per millisecond, your journal-buffer-timeout will be 4000

In your case the journal-buffer-timeout has been set to 836000 which is quite slow (the higher the timeout the slower the disk). It means that you disk can only make around 1.2 writes per millisecond. If you think this value is in error you can run the artemis perf-journal command to calculate it again and update the configuration accordingly.

To give you a comparison, my journal-buffer-timeout is 4000, and I can run the artemis producer --protocol amqp command with ActiveMQ Artemis which will send 1,000 durable messages in less than 700 milliseconds after a few runs. If I use the --non-persistent flag that duration drops down to around 200 milliseconds.

If I perform the same test on a default installation of ActiveMQ 5.16.0 it takes around 900 and 200 milliseconds respectively which is not terribly surprising given the nature of the test.

It's worth noting that ActiveMQ 5.x has the jms.useAsyncSend parameter that is functionally equivalent to blockOnDurableSend and blockOnNonDurableSend in Artemis. However, you're unlikely to see as much of a difference if you use it because the 5.x broker has a lot of inherent internal blocking whereas Artemis was written from the ground up to be completely non-blocking. The potential performance ceiling of Artemis is therefore much higher than 5.x, and that's one of the main reasons that Artemis exists in the first place.

Remember that by blocking or not you're really just trading reliability for speed respectively. By not blocking you're telling the client to "fire and forget." The client, by definition, will have no knowledge of whether or not the message is actually successfully received by the broker. In other words, sending message is a non-blocking way is inherently unreliable from the client's perspective. That's the fundamental trade-off you make for speed. JMS 2 added javax.jms.CompletionListener to help mitigate this a bit, but it's unlikely that any Camel JMS component makes intelligent use of this.

Upvotes: 1

Related Questions