Reputation: 13
In my testing and review of the Artemis LastValueQueue code, it looks like the scheduling delay for a message takes precedence over its evaluation of the "last-value-key". In other words, if you schedule a message, it is only evaluated for replacing the last-value in the queue at the time it is prepared for delivery.
My question is whether I have correctly understood the code, and if so, if there's a workaround or a feature of ActiveMQ / Artemis that might help meet our requirements.
Our requirements are as follows:
Some other notes:
ArtemisConfig.java:
@Configuration
@EnableJms
public class ArtemisConfig {
@Bean
public org.apache.activemq.artemis.core.config.Configuration configuration() throws Exception {
org.apache.activemq.artemis.core.config.Configuration config = new ConfigurationImpl();
config.addAcceptorConfiguration("in-vm", "vm://0");
config.setPersistenceEnabled(true);
config.setSecurityEnabled(false);
config.setJournalType(JournalType.ASYNCIO);
config.setCreateJournalDir(true);
config.setJournalDirectory("/var/mq/journal");
config.setBindingsDirectory("/var/mq/bindings");
config.setLargeMessagesDirectory("/var/mq/large-messages");
config.setJMXManagementEnabled(true);
QueueConfiguration queueConfiguration = new QueueConfiguration("MYLASTVALUEQUEUE");
queueConfiguration.setAddress("MYLASTVALUEQUEUE");
queueConfiguration.setLastValueKey("uniqueJobId");
queueConfiguration.setDurable(true);
queueConfiguration.setEnabled(true);
queueConfiguration.setRoutingType(RoutingType.ANYCAST);
CoreAddressConfiguration coreAddressConfiguration = new CoreAddressConfiguration();
coreAddressConfiguration.addQueueConfiguration(queueConfiguration);
config.addAddressConfiguration(coreAddressConfiguration);
return config;
}
@Bean
public EmbeddedActiveMQ artemisServer() throws Exception {
EmbeddedActiveMQ server = new EmbeddedActiveMQ();
server.setConfiguration(configuration());
server.start();
return server;
}
@PreDestroy
public void preDestroy() throws Exception {
artemisServer().stop();
}
@Bean
public ConnectionFactory activeMqConnectionFactory() throws Exception {
return ActiveMQJMSClient.createConnectionFactory("vm://0", "artemis-client");
}
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerFactory() throws Exception {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(activeMqConnectionFactory());
factory.setSessionTransacted(true);
factory.setConcurrency("8");
factory.setMessageConverter(jacksonJmsMessageConverter());
return factory;
}
@Bean
public MessageConverter jacksonJmsMessageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
converter.setTypeIdPropertyName("_type");
return converter;
}
@Bean
public JmsTemplate jmsTemplate() throws Exception {
JmsTemplate jmsTemplate = new JmsTemplate(activeMqConnectionFactory());
jmsTemplate.setMessageConverter(jacksonJmsMessageConverter());
jmsTemplate.setDeliveryPersistent(true);
return jmsTemplate;
}
@Bean
QueueMessageService queueMessageService() {
return new QueueMessageService();
}
}
QueueMessageService.java
public class QueueMessageService {
@Resource
private JmsTemplate jmsTemplate;
public void queueJobRequest(
final String queue,
final int priority,
final long deliveryDelayInSeconds,
final MyMessage message) {
jmsTemplate.convertAndSend(queue, jobRequest, message -> {
message.setJMSPriority(priority);
if (deliveryDelayInSeconds > 0 && deliveryDelayInSeconds <= 86400) {
message.setLongProperty(
Message.HDR_SCHEDULED_DELIVERY_TIME.toString(),
Instant.now().plus(deliveryDelayInSeconds, ChronoUnit.SECONDS).toEpochMilli()
);
}
message.setStringProperty(Message.HDR_LAST_VALUE_NAME.toString(), "uniqueJobId");
message.setStringProperty("uniqueJobId", jobRequest.getUniqueJobId().toString());
return message;
});
}
}
Upvotes: 1
Views: 818
Reputation: 35008
Your understanding about the semantics of scheduled messages with a last-value queue is correct. When a message is scheduled it is not technically on the queue yet. It is not put onto the queue until the scheduled time arrives at which point last-value queue semantics are enforced.
Short of implementing a new feature I don't see how you can implement your desired behavior in any kind of automatic way. My recommendation at this point would be to use the management API (i.e. QueueControl
) to manually remove the "old" scheduled message before you send the "new" scheduled message. You can use one of the removeMessage
methods for this as they will work on scheduled messages and non-scheduled messages alike.
Upvotes: 2