David Murphy
David Murphy

Reputation: 13

Does ActiveMQ Artemis support updating scheduled messages in a last value queue?

In my testing and review of the Artemis LastValueQueue code, it looks like the scheduling delay for a message takes precedence over its evaluation of the "last-value-key". In other words, if you schedule a message, it is only evaluated for replacing the last-value in the queue at the time it is prepared for delivery.

My question is whether I have correctly understood the code, and if so, if there's a workaround or a feature of ActiveMQ / Artemis that might help meet our requirements.

Our requirements are as follows:

  1. Generate a message, and delay processing of that message to a point in the future (usually 30 seconds out).
  2. If an updated version of the message is generated due to a new external event, replace any existing scheduled message with the new version of the message - the scheduled delivery time should also be updated, in addition to the message payload.

Some other notes:

ArtemisConfig.java:

@Configuration
@EnableJms
public class ArtemisConfig {

    @Bean
    public org.apache.activemq.artemis.core.config.Configuration configuration() throws Exception {
        org.apache.activemq.artemis.core.config.Configuration config = new ConfigurationImpl();
        config.addAcceptorConfiguration("in-vm", "vm://0");
        config.setPersistenceEnabled(true);
        config.setSecurityEnabled(false);
        config.setJournalType(JournalType.ASYNCIO);
        config.setCreateJournalDir(true);
        config.setJournalDirectory("/var/mq/journal");
        config.setBindingsDirectory("/var/mq/bindings");
        config.setLargeMessagesDirectory("/var/mq/large-messages");
        config.setJMXManagementEnabled(true);

        QueueConfiguration queueConfiguration = new QueueConfiguration("MYLASTVALUEQUEUE");
        queueConfiguration.setAddress("MYLASTVALUEQUEUE");
        queueConfiguration.setLastValueKey("uniqueJobId");
        queueConfiguration.setDurable(true);
        queueConfiguration.setEnabled(true);
        queueConfiguration.setRoutingType(RoutingType.ANYCAST);

        CoreAddressConfiguration coreAddressConfiguration = new CoreAddressConfiguration();
        coreAddressConfiguration.addQueueConfiguration(queueConfiguration);

        config.addAddressConfiguration(coreAddressConfiguration);

        return config;
    }

    @Bean
    public EmbeddedActiveMQ artemisServer() throws Exception {
        EmbeddedActiveMQ server = new EmbeddedActiveMQ();
        server.setConfiguration(configuration());
        server.start();

        return server;
    }

    @PreDestroy
    public void preDestroy() throws Exception {
        artemisServer().stop();
    }

    @Bean
    public ConnectionFactory activeMqConnectionFactory() throws Exception {
        return ActiveMQJMSClient.createConnectionFactory("vm://0", "artemis-client");
    }


    @Bean
    public JmsListenerContainerFactory<?> jmsListenerContainerFactory() throws Exception {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(activeMqConnectionFactory());
        factory.setSessionTransacted(true);
        factory.setConcurrency("8");
        factory.setMessageConverter(jacksonJmsMessageConverter());
        return factory;
    }

    @Bean
    public MessageConverter jacksonJmsMessageConverter() {
        MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
        converter.setTargetType(MessageType.TEXT);
        converter.setTypeIdPropertyName("_type");
        return converter;
    }

    @Bean
    public JmsTemplate jmsTemplate() throws Exception {
        JmsTemplate jmsTemplate = new JmsTemplate(activeMqConnectionFactory());
        jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
        jmsTemplate.setDeliveryPersistent(true);

        return jmsTemplate;
    }

    @Bean
    QueueMessageService queueMessageService() {
        return new QueueMessageService();
    }
}

QueueMessageService.java

public class QueueMessageService {
    @Resource
    private JmsTemplate jmsTemplate;

    public void queueJobRequest(
            final String queue,
            final int priority,
            final long deliveryDelayInSeconds,
            final MyMessage message) {

        jmsTemplate.convertAndSend(queue, jobRequest, message -> {
            message.setJMSPriority(priority);
            if (deliveryDelayInSeconds > 0 && deliveryDelayInSeconds <= 86400) {
                message.setLongProperty(
                        Message.HDR_SCHEDULED_DELIVERY_TIME.toString(),
                        Instant.now().plus(deliveryDelayInSeconds, ChronoUnit.SECONDS).toEpochMilli()
                );
            }
            message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "uniqueJobId");
            message.setStringProperty("uniqueJobId", jobRequest.getUniqueJobId().toString());
            return message;
        });
    }
}

Upvotes: 1

Views: 818

Answers (1)

Justin Bertram
Justin Bertram

Reputation: 35008

Your understanding about the semantics of scheduled messages with a last-value queue is correct. When a message is scheduled it is not technically on the queue yet. It is not put onto the queue until the scheduled time arrives at which point last-value queue semantics are enforced.

Short of implementing a new feature I don't see how you can implement your desired behavior in any kind of automatic way. My recommendation at this point would be to use the management API (i.e. QueueControl) to manually remove the "old" scheduled message before you send the "new" scheduled message. You can use one of the removeMessage methods for this as they will work on scheduled messages and non-scheduled messages alike.

Upvotes: 2

Related Questions