Reputation: 91
Code is not working with the latest springboot version. My springboot version is 3.4.3 and kafka version is 3.3.3. It is giving me an exception i.e java.lang.ClassCastException : class org.springframework.kafka.listener.adapter.FilteringBatchMessageListenerAdapter cannot be cast to class org.springframework.kafka.listener.MessageListener (org.springframework.kafka.listener.adapter.FilteringBatchMessageListenerAdapter and org.springframework.kafka.listener.MessageListener are in unamed module of loader 'app')
DelayedMessageListenerAdapter is::
public class DelayedMessageListenerAdapter<K, V> extends AbstractDelegatingMessageListenerAdapter<Object>
implements AcknowledgingConsumerAwareMessageListener<K, V>, AcknowledgingConsumerAwareBatchMessageListener<K, V> {
// Map to store topic-specific delays
private final Map<String, Duration> delaysPerTopic = new HashMap<>();
// Default delay applied to all topics unless overridden
private Duration defaultDelay = Duration.ZERO;
// Manages the backoff mechanism for delaying message processing
private final KafkaConsumerBackoffManager backoffManager;
// Unique identifier for the listener
private final String listenerId;
// Constructor to initialize the adapter with a delegate listener, backoff manager, and listener ID
public DelayedMessageListenerAdapter(Object delegate, KafkaConsumerBackoffManager backoffManager, String listenerId) {
super(delegate);
this.backoffManager = backoffManager;
this.listenerId = listenerId;
}
// Method to set a delay for a specific topic
public void setDelayForTopic(String topic, Duration delay) {
this.delaysPerTopic.put(topic, delay);
}
// Method to set a default delay for all topics
public void setDefaultDelay(Duration delay) {
this.defaultDelay = delay;
}
// Core method to process each Kafka message (single record)
@Override
public void onMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
processMessage(consumerRecord, acknowledgment, consumer);
}
// Core method to process a batch of Kafka messages
@Override
public void onMessage(List<ConsumerRecord<K, V>> consumerRecords, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
for (ConsumerRecord<K, V> consumerRecord : consumerRecords) {
processMessage(consumerRecord, acknowledgment, consumer);
}
}
// Helper method to process a single message
private void processMessage(ConsumerRecord<K, V> consumerRecord, Acknowledgment acknowledgment, Consumer<?, ?> consumer) {
// Get the delay for the message's topic (or use the default delay if no topic-specific delay is configured)
Duration delay = this.delaysPerTopic.getOrDefault(consumerRecord.topic(), this.defaultDelay);
// Get the message's timestamp and the current time
long recordTimestamp = consumerRecord.timestamp();
long currentTime = System.currentTimeMillis();
// Check if the message should be delayed based on its timestamp and the configured delay
if (currentTime - recordTimestamp < delay.toMillis()) {
// Create a context for the backoff manager with the next execution timestamp, listener ID, topic-partition, and consumer
Context context = backoffManager.createContext(
recordTimestamp + delay.toMillis(), // Next execution timestamp
this.listenerId,
new org.apache.kafka.common.TopicPartition(consumerRecord.topic(), consumerRecord.partition()),
consumer
);
// Pause the consumer for the specified delay
backoffManager.backOffIfNecessary(context);
// Skip processing for now
return;
}
// If no delay is needed, delegate the message to the original listener for processing
if (getDelegate() instanceof AcknowledgingConsumerAwareMessageListener) {
((AcknowledgingConsumerAwareMessageListener<K, V>) getDelegate()).onMessage(consumerRecord, acknowledgment, consumer);
} else if (getDelegate() instanceof AcknowledgingConsumerAwareBatchMessageListener) {
((AcknowledgingConsumerAwareBatchMessageListener<K, V>) getDelegate()).onMessage(Collections.singletonList(consumerRecord), acknowledgment, consumer);
}
}
}
Bean configuration is::
@Bean(name = "abc")
public ConcurrentKafkaListenerContainerFactory<String, MyClass> ecomCustidDepositAnalytic_Consumer(
KafkaConsumerBackoffManager backoffManager) {
// Create a factory for concurrent Kafka listeners
ConcurrentKafkaListenerContainerFactory<String, MyClass> factory = new ConcurrentKafkaListenerContainerFactory<>();
// Set the consumer factory for the container
factory.setConsumerFactory(getConsumerFactoryFor("event-A"));
// Enable batch listening
factory.setBatchListener(true);
// Set manual acknowledgment mode
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
// Customize the container to wrap the original listener with the DelayedMessageListenerAdapter
factory.setContainerCustomizer(container -> {
// Get the original listener from the container
Object originalListener = container.getContainerProperties().getMessageListener();
// Wrap the original listener with the DelayedMessageListenerAdapter
DelayedMessageListenerAdapter<String, ApplyData> delayedAdapter = new DelayedMessageListenerAdapter<>(
originalListener,
backoffManager,
container.getListenerId()
);
// Set a default delay of 30 seconds for all topics
delayedAdapter.setDefaultDelay(Duration.ofSeconds(30));
// Replace the original listener with the delayed adapter
container.getContainerProperties().setMessageListener(delayedAdapter);
});
return factory;
}
Upvotes: 0
Views: 19