Georgii Lvov
Georgii Lvov

Reputation: 2752

Sequential processing of logically grouped messages in Oracle AQ by multiple consumers

I have a spring-boot service using Oracle AQ. The service receives http-requests, enqueues request bodies as message into Oracle AQ, then dequeues it and sends to the destination services via http, so queue is like a buffer here.

Each message contains a field by which we can understand that they are related to each other. And the goal is that the order in which the messages are enqueued to the queue should be preserved within the group whereas messages of different groups can be processed in parallel.

So, e.g. we have 5 messages in the queue: A1-A2-A3-A4-A5. A1 and A3 are related to each other, so we have to ensure that message A1 will be processed before A3, (not only be listened, but processed). So, to preserve the order within the group for single instance of the app, I am using selector:

Producer:

import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {

    private final JmsTemplate jmsTemplate;

    public MessageProducer(JmsTemplate jmsTemplate) {
        this.jmsTemplate = jmsTemplate;
    }

    public void sendMessage(RequestDto requestDto) {
        jmsTemplate.convertAndSend(
                "test_queue",
                requestDto,
                message -> {
                    message.setStringProperty("requestDtoGroup", requestDto.getGroup());
                    return message;
                }
        );
    }
}

Consumer:


import jakarta.jms.Message;
import lombok.extern.log4j.Log4j2;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;

@Component
@Log4j2
public class MessageConsumer {

    @JmsListener(
            destination = "test_queue",
            selector = "requestDtoGroup = 'group_1'"
    )
    public void listenGroup1(Message message) {
       log.info("Message with group_1" + message);
       // further processing
    }

    @JmsListener(
            destination = "test_queue",
            selector = "requestDtoGroup = 'group_2'"
    )
    public void listenGroup2(Message message) {
       log.info("Message with group_2" + message);
       // further processing
    }
}

And that is my config for JMS:

import jakarta.jms.ConnectionFactory;
import jakarta.jms.JMSException;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;

import java.time.Duration;
import java.util.Properties;

@Configuration
@EnableJms
public class JmsConfiguration {

    @Bean
    public ConnectionFactory connectionFactory() throws JMSException {
        Properties properties = new Properties();
        properties.setProperty("user", "AQ_USER");
        properties.setProperty("password", "your_password");

        return AQjmsFactory.getQueueConnectionFactory(
                "jdbc:oracle:thin:@localhost:1521/ORCLPDB1",
                properties
        );
    }

    @Bean
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(SomeHandler someHandler) throws JMSException {
        var containerFactory = new DefaultJmsListenerContainerFactory();

        containerFactory.setConnectionFactory(connectionFactory());

        containerFactory.setSessionAcknowledgeMode(Session.SESSION_TRANSACTED);
        containerFactory.setSessionTransacted(true);

        return containerFactory;
    }
}

For different selectors messages are processed in parallel and it works as expected.

But I cannot achieve it if 2 or more instances of the app are running.

If I raise up an another instance of the application in parallel (namely I start another application locally with the copy-pasted MessageConsumer class), then the messages with the same requestDtoGroup (A1 and A3) are processed in parallel, which I want to avoid.

How can I achieve that messages with the same requestDtoGroup are processed sequentially even with 2 or more instances of the app?

What I tried so far:

  1. In some answers on SO I came across advice to look at message grouping, but as far as I understood it doesn't suit to me, because Oracle AQ groups messages within the same transaction, and my messages are grouped logically.

  2. But still I tried to set up the requestDtoGroup value into correlation ID or JMSXGroupID by sending message via jmsTemplate.convertAndSend, but it didn't affect anything.

  3. Also I have found this question on SO, where is recommended to use dequeue options to search by dequeuing the most old enqueue-time for the message, but I don't know if it is possible to set it somehow to listeners.

Actually I achieved the desired behaviour with Kafka, where I use separate partitions to group messages inside the topic, but I must use Oracle AQ. Probably it makes sense to look at Oracle Transactional Event Queues, they say that it is similar to Kafka, but I would try to do it at first using simple AQ.

Upvotes: 0

Views: 190

Answers (1)

Parthiv Pradhan
Parthiv Pradhan

Reputation: 1

Honestly, I think you should not go for such a solution.

If scalability is your concern then you must configure your application instances to process different groups of messages. Rather, trying to distribute message processing within a group.

However, if you want to do it:

Option 1

you must write a custom component that tracks the message processing in the group with the help of a cache. The consumer proceeds with the processing, only if there's no message in the group currently processed. If the consumer picks the message or completes the processing of the messages, it has to update the cache with the processing state of the group.

Not sure, how scalable this will be because of the overhead of tracking the messages.

Option 2

In the producer, try further partitioning the messages based on some message attribute or logic. Then, configure the consumers dynamically with different selectors on the consumer instances. But again here also you have to do some tracking :( like in Option 1.

You could achieve the same with Kafka because you applied additional partitioning. However, in AQ you probably have to do the same at the program level.

Upvotes: 0

Related Questions