Reputation: 11
With Spring cloud stream, we can define bindings with multiple destinations like this
bindings:
functionA-in-0:
destination: topic-1,topic-2
binder: kafka
group: local.kafka-sink
consumer:
max-attempts: 3
any message from topic-1, topic-2 will be consumed by 'functionA' but can we use comma delimited with Kafka binder DLQ as well?
spring:
cloud:
stream:
kafka:
binder:
brokers: localhost:9092
bindings:
functionA-in-0:
consumer:
enable-dlq: true
dlq-name: topic-1,topic-2
Is it possible to do it like this, or this is not the way we should approach with Spring cloud stream?
I'm using spring-boot-starter-parent 3.3.1
I've tried this configuration and it's not working got an exception as below.
org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [topic-1-dlq,topic-2-dlq]
2024-07-23T19:30:21.601+07:00 ERROR 18584 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder : Error sending to DLQ a message with key='byte[5]' and payload='byte[5]' received from 0
org.springframework.kafka.KafkaException: Send failed
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:816) ~[spring-kafka-3.2.1.jar:3.2.1]
at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:773) ~[spring-kafka-3.2.1.jar:3.2.1]
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:576) ~[spring-kafka-3.2.1.jar:3.2.1]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$DlqSender.sendToDlq(KafkaMessageChannelBinder.java:1607) ~[spring-cloud-stream-binder-kafka-4.1.0.jar:4.1.0]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.handleRecordForDlq(KafkaMessageChannelBinder.java:1273) ~[spring-cloud-stream-binder-kafka-4.1.0.jar:4.1.0]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$getErrorMessageHandler$6(KafkaMessageChannelBinder.java:1130) ~[spring-cloud-stream-binder-kafka-4.1.0.jar:4.1.0]
at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$getErrorMessageHandler$7(KafkaMessageChannelBinder.java:1129) ~[spring-cloud-stream-binder-kafka-4.1.0.jar:4.1.0]
at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:235) ~[spring-integration-core-6.3.1.jar:6.3.1]
at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:192) ~[spring-integration-core-6.3.1.jar:6.3.1]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.3.1.jar:6.3.1]
at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:390) ~[spring-integration-core-6.3.1.jar:6.3.1]
at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:361) ~[spring-integration-core-6.3.1.jar:6.3.1]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:331) ~[spring-integration-core-6.3.1.jar:6.3.1]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:304) ~[spring-integration-core-6.3.1.jar:6.3.1]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.1.10.jar:6.1.10]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.1.10.jar:6.1.10]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.1.10.jar:6.1.10]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.1.10.jar:6.1.10]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:99) ~[spring-messaging-6.1.10.jar:6.1.10]
at org.springframework.integration.core.ErrorMessagePublisher.publish(ErrorMessagePublisher.java:168) ~[spring-integration-core-6.3.1.jar:6.3.1]
at org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer.recover(ErrorMessageSendingRecoverer.java:83) ~[spring-integration-core-6.3.1.jar:6.3.1]
at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:560) ~[spring-retry-2.0.6.jar:na]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:405) ~[spring-retry-2.0.6.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:233) ~[spring-retry-2.0.6.jar:na]
at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:70) ~[spring-integration-kafka-6.3.1.jar:6.3.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:457) ~[spring-integration-kafka-6.3.1.jar:6.3.1]
at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:422) ~[spring-integration-kafka-6.3.1.jar:6.3.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.1.jar:3.2.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778) ~[spring-kafka-3.2.1.jar:3.2.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.1.jar:3.2.1]
at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.1.jar:1.13.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.1.jar:3.2.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.1.jar:3.2.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.1.jar:3.2.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.1.jar:3.2.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.1.jar:3.2.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.1.jar:3.2.1]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.1.jar:3.2.1]
at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804) ~[na:na]
but if I input only topic-1. All exception occurred on consuming topic-1 and topic-2 will be route to topic-1-dlq. Which is probably correct and acting the way it should. But how should I approach what I aim for?
Upvotes: 1
Views: 85
Reputation: 11
Answering my own question here.
Although we can't do that directly from yaml configuration. It seems we can implement a DlqDestinationResolver bean and resolve our own destination just like that according to https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/dlq-partition.html
My bad that I didn't see this document page at first.
Upvotes: 0