Bilbo
Bilbo

Reputation: 91

Kafka consumer. Delayed event processing is not working as expected

Code is not working with the latest springboot version. My springboot version is 3.4.3 and kafka version is 3.3.3. It is giving me an exception i.e java.lang.ClassCastException : class org.springframework.kafka.listener.adapter.FilteringBatchMessageListenerAdapter cannot be cast to class org.springframework.kafka.listener.MessageListener (org.springframework.kafka.listener.adapter.FilteringBatchMessageListenerAdapter and org.springframework.kafka.listener.MessageListener are in unamed module of loader 'app')

DelayedMessageListenerAdapter is::

public class DelayedMessageListenerAdapter<K, V> extends AbstractDelegatingMessageListenerAdapter<Object>
                implements AcknowledgingConsumerAwareMessageListener<K, V>, AcknowledgingConsumerAwareBatchMessageListener<K, V> {
   

 
        // Map to store topic-specific delays
        private final Map<String, Duration> delaysPerTopic = new HashMap<>();
    
        // Default delay applied to all topics unless overridden
        private Duration defaultDelay = Duration.ZERO;
    
        // Manages the backoff mechanism for delaying message processing
        private final KafkaConsumerBackoffManager backoffManager;
    
        // Unique identifier for the listener
        private final String listenerId;
    
        // Constructor to initialize the adapter with a delegate listener, backoff manager, and listener ID
        public DelayedMessageListenerAdapter(Object delegate, KafkaConsumerBackoffManager backoffManager, String listenerId) {
            super(delegate);
            this.backoffManager = backoffManager;
            this.listenerId = listenerId;
        }
    
        // Method to set a delay for a specific topic
        public void setDelayForTopic(String topic, Duration delay) {
            this.delaysPerTopic.put(topic, delay);
        }
    
        // Method to set a default delay for all topics
        public void setDefaultDelay(Duration delay) {
            this.defaultDelay = delay;
        }
    
        // Core method to process each Kafka message (single record)
        @Override
        public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            processMessage(consumerRecord, acknowledgment, consumer);
        }
    
        // Core method to process a batch of Kafka messages
        @Override
        public void onMessage(List<ConsumerRecord<K, V>> consumerRecords, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            for (ConsumerRecord<K, V> consumerRecord : consumerRecords) {
                processMessage(consumerRecord, acknowledgment, consumer);
            }
        }
    
        // Helper method to process a single message
        private void processMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
            // Get the delay for the message's topic (or use the default delay if no topic-specific delay is configured)
            Duration delay = this.delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay);
    
            // Get the message's timestamp and the current time
            long recordTimestamp = consumerRecord.timestamp();
            long currentTime = System.currentTimeMillis();
    
            // Check if the message should be delayed based on its timestamp and the configured delay
            if (currentTime - recordTimestamp < delay.toMillis()) {
                // Create a context for the backoff manager with the next execution timestamp, listener ID, topic-partition, and consumer
                Context context = backoffManager.createContext(
                        recordTimestamp + delay.toMillis(), // Next execution timestamp
                        this.listenerId,
                        new org.apache.kafka.common.TopicPartition(consumerRecord.topic(), consumerRecord.partition()),
                        consumer
                );
    
                // Pause the consumer for the specified delay
                backoffManager.backOffIfNecessary(context);
    
                // Skip processing for now
                return;
            }
    
            // If no delay is needed, delegate the message to the original listener for processing
            if (getDelegate() instanceof AcknowledgingConsumerAwareMessageListener) {
                ((AcknowledgingConsumerAwareMessageListener<K, V>) getDelegate()).onMessage(consumerRecord, acknowledgment, consumer);
            } else if (getDelegate() instanceof AcknowledgingConsumerAwareBatchMessageListener) {
                ((AcknowledgingConsumerAwareBatchMessageListener<K, V>) getDelegate()).onMessage(Collections.singletonList(consumerRecord), acknowledgment, consumer);
            }
        }
    }

Bean configuration is::

@Bean(name = "abc")
public ConcurrentKafkaListenerContainerFactory<String, MyClass> ecomCustidDepositAnalytic_Consumer(
        KafkaConsumerBackoffManager backoffManager) {

    // Create a factory for concurrent Kafka listeners
    ConcurrentKafkaListenerContainerFactory<String, MyClass> factory = new ConcurrentKafkaListenerContainerFactory<>();

    // Set the consumer factory for the container
    factory.setConsumerFactory(getConsumerFactoryFor("event-A"));

    // Enable batch listening
    factory.setBatchListener(true);

    // Set manual acknowledgment mode
    factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);

    // Customize the container to wrap the original listener with the DelayedMessageListenerAdapter
    factory.setContainerCustomizer(container -> {
        // Get the original listener from the container
        Object originalListener = container.getContainerProperties().getMessageListener();

        // Wrap the original listener with the DelayedMessageListenerAdapter
        DelayedMessageListenerAdapter<String, ApplyData> delayedAdapter = new DelayedMessageListenerAdapter<>(
                originalListener,
                backoffManager,
                container.getListenerId()
        );

        // Set a default delay of 30 seconds for all topics
        delayedAdapter.setDefaultDelay(Duration.ofSeconds(30));

        // Replace the original listener with the delayed adapter
        container.getContainerProperties().setMessageListener(delayedAdapter);
    });

    return factory;
}

Upvotes: 0

Views: 19

Answers (0)

Related Questions