Reputation: 151
I am using Spring Cloud Version 2023.0.1 (spring cloud stream version 4.1.1) and I have written a simple kafka consumer in batch mode to simulate an error scenario.
@Bean
Consumer<Message<List<String>>> consumer1() {
return message -> {
final List<String> payload = message.getPayload();
final MessageHeaders messageHeaders = message.getHeaders();
payload.forEach(System.out::println);
payload.forEach(p -> {
if(p.startsWith("a")) {
throw new RuntimeException("Intentional Exception");
}
});
System.out.println(messageHeaders);
System.out.println("Done");
};
}
My application.yml
file looks like this
spring:
cloud:
function:
definition: consumer1;
stream:
bindings:
consumer1-in-0:
destination: topic1
group: consumer1-in-0-v0.1
consumer:
batch-mode: true
use-native-decoding: true
max-attempts: 3
kafka:
binder:
brokers:
- localhost:9092
default:
consumer:
configuration:
max.poll.records: 1000
max.partition.fetch.bytes: 31457280
fetch.max.wait.ms: 200
bindings:
consumer1-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
dlqProducerProperties:
configuration:
value.serializer: org.apache.kafka.common.serialization.StringSerializer
key.serializer: org.apache.kafka.common.serialization.StringSerializer
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
I have also specified a ListenerContainerWithDlqAndRetryCustomizer
to customize the retries
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> template) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
@Nullable BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
@Nullable BackOff backOff) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(template,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return false;
}
};
}
The issue When an error happens, the message batch goes straight to the DLQ. And no retries are attempted.
However the problem is that there can be transient errors because of which the batch failed processing, and I want the batch to be retried a few times before sending it to DLQ. But I am not able to get it to work.
What am I doing wrong?
Upvotes: 0
Views: 217
Reputation: 5924
The original question exposed a bug in the binder, which I believe was causing the issue you saw before. See this issue for details: https://github.com/spring-cloud/spring-cloud-stream/issues/2951.
If you try the binder version 4.1.2-SNAPSHOT
, the above code works for me:
@SpringBootApplication
public class So78485425Application {
public static void main(String[] args) {
SpringApplication.run(So78485425Application.class, args);
}
@Bean
Consumer<Message<List<String>>> consumer1() {
return message -> {
final List<String> payload = message.getPayload();
final MessageHeaders messageHeaders = message.getHeaders();
payload.forEach(System.out::println);
payload.forEach(p -> {
if(p.startsWith("a")) {
throw new RuntimeException("Intentional Exception");
}
});
System.out.println(messageHeaders);
System.out.println("Done");
};
}
@Bean
public KafkaTemplate<String, String> stringStringKafkaTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf, Map.of(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class,
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
}
@Bean
ListenerContainerWithDlqAndRetryCustomizer cust(KafkaTemplate<?, ?> stringStringKafkaTemplate) {
return new ListenerContainerWithDlqAndRetryCustomizer() {
@Override
public void configure(AbstractMessageListenerContainer<?, ?> container, String destinationName,
String group,
BiFunction<ConsumerRecord<?, ?>, Exception, TopicPartition> dlqDestinationResolver,
BackOff backOff) {
ConsumerRecordRecoverer dlpr = new DeadLetterPublishingRecoverer(stringStringKafkaTemplate,
dlqDestinationResolver);
container.setCommonErrorHandler(new DefaultErrorHandler(dlpr, backOff));
}
@Override
public boolean retryAndDlqInBinding(String destinationName, String group) {
return false;
}
};
}
}
and the corresponding configuration:
spring:
cloud:
function:
definition: consumer1
stream:
bindings:
consumer1-in-0:
destination: topic1
group: consumer1-in-0-v0.1
consumer:
batch-mode: true
use-native-decoding: true
max-attempts: 3
kafka:
binder:
brokers:
- localhost:9092
default:
consumer:
configuration:
max.poll.records: 1000
max.partition.fetch.bytes: 31457280
fetch.max.wait.ms: 200
bindings:
consumer1-in-0:
consumer:
enableDlq: true
dlqName: dlq-topic
configuration:
key.deserializer: org.apache.kafka.common.serialization.StringDeserializer
value.deserializer: org.apache.kafka.common.serialization.StringDeserializer
Upvotes: 1
Reputation: 151
In case anyone stumbles upon this in future, i figured out what was wrong.
I had to remove enableDlq
, dlqName
and dlqProducerProperties
from application.yml
file.
Then it worked.
In the java code, I also removed ListenerContainerWithDlqAndRetryCustomizer
and just used ListenerContainerCustomizer
.
The code looked something like this:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<byte[], byte[]>> customizer(DefaultErrorHandler errorHandler) {
return (container, dest, group) -> container.setCommonErrorHandler(errorHandler);
}
@Bean
public DefaultErrorHandler errorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
return new DefaultErrorHandler(deadLetterPublishingRecoverer, new FixedBackOff(0, 4));
}
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaOperations<?, ?> stringTemplate,
KafkaOperations<?, ?> bytesTemplate,
KafkaOperations<?, ?> longTemplate) {
Map<Class<?>, KafkaOperations<?, ?>> templates = new LinkedHashMap<>();
templates.put(String.class, stringTemplate);
templates.put(byte[].class, bytesTemplate);
templates.put(Long.class, longTemplate);
return new DeadLetterPublishingRecoverer(templates);
}
@Bean
public KafkaTemplate<String, String> stringTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf,
Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
}
@Bean
public KafkaTemplate<String, String> bytesTemplate(ProducerFactory<String, String> pf) {
return new KafkaTemplate<>(pf);
}
@Bean
public KafkaTemplate<String, Long> longTemplate(ProducerFactory<String, Long> pf) {
return new KafkaTemplate<>(pf,
Map.of(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, LongSerializer.class, ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class));
}
Upvotes: 0