VarutJ
VarutJ

Reputation: 11

How to define multiple DLQ destination with Spring Cloud Stream

With Spring cloud stream, we can define bindings with multiple destinations like this

bindings:
  functionA-in-0:
     destination: topic-1,topic-2
     binder: kafka
     group: local.kafka-sink
     consumer:
          max-attempts: 3

any message from topic-1, topic-2 will be consumed by 'functionA' but can we use comma delimited with Kafka binder DLQ as well?

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
        bindings:
            functionA-in-0:
              consumer:
                enable-dlq: true
                dlq-name: topic-1,topic-2

Is it possible to do it like this, or this is not the way we should approach with Spring cloud stream?

I'm using spring-boot-starter-parent 3.3.1

I've tried this configuration and it's not working got an exception as below.

org.apache.kafka.common.errors.InvalidTopicException: Invalid topics: [topic-1-dlq,topic-2-dlq]

2024-07-23T19:30:21.601+07:00 ERROR 18584 --- [container-0-C-1] o.s.c.s.b.k.KafkaMessageChannelBinder    : Error sending to DLQ  a message with key='byte[5]' and payload='byte[5]' received from 0

org.springframework.kafka.KafkaException: Send failed
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:816) ~[spring-kafka-3.2.1.jar:3.2.1]
    at org.springframework.kafka.core.KafkaTemplate.observeSend(KafkaTemplate.java:773) ~[spring-kafka-3.2.1.jar:3.2.1]
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:576) ~[spring-kafka-3.2.1.jar:3.2.1]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder$DlqSender.sendToDlq(KafkaMessageChannelBinder.java:1607) ~[spring-cloud-stream-binder-kafka-4.1.0.jar:4.1.0]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.handleRecordForDlq(KafkaMessageChannelBinder.java:1273) ~[spring-cloud-stream-binder-kafka-4.1.0.jar:4.1.0]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$getErrorMessageHandler$6(KafkaMessageChannelBinder.java:1130) ~[spring-cloud-stream-binder-kafka-4.1.0.jar:4.1.0]
    at java.base/java.lang.Iterable.forEach(Iterable.java:75) ~[na:na]
    at org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder.lambda$getErrorMessageHandler$7(KafkaMessageChannelBinder.java:1129) ~[spring-cloud-stream-binder-kafka-4.1.0.jar:4.1.0]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:235) ~[spring-integration-core-6.3.1.jar:6.3.1]
    at org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java:192) ~[spring-integration-core-6.3.1.jar:6.3.1]
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-6.3.1.jar:6.3.1]
    at org.springframework.integration.channel.AbstractMessageChannel.sendInternal(AbstractMessageChannel.java:390) ~[spring-integration-core-6.3.1.jar:6.3.1]
    at org.springframework.integration.channel.AbstractMessageChannel.sendWithMetrics(AbstractMessageChannel.java:361) ~[spring-integration-core-6.3.1.jar:6.3.1]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:331) ~[spring-integration-core-6.3.1.jar:6.3.1]
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:304) ~[spring-integration-core-6.3.1.jar:6.3.1]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-6.1.10.jar:6.1.10]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-6.1.10.jar:6.1.10]
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-6.1.10.jar:6.1.10]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-6.1.10.jar:6.1.10]
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:99) ~[spring-messaging-6.1.10.jar:6.1.10]
    at org.springframework.integration.core.ErrorMessagePublisher.publish(ErrorMessagePublisher.java:168) ~[spring-integration-core-6.3.1.jar:6.3.1]
    at org.springframework.integration.handler.advice.ErrorMessageSendingRecoverer.recover(ErrorMessageSendingRecoverer.java:83) ~[spring-integration-core-6.3.1.jar:6.3.1]
    at org.springframework.retry.support.RetryTemplate.handleRetryExhausted(RetryTemplate.java:560) ~[spring-retry-2.0.6.jar:na]
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:405) ~[spring-retry-2.0.6.jar:na]
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:233) ~[spring-retry-2.0.6.jar:na]
    at org.springframework.integration.kafka.inbound.KafkaInboundEndpoint.doWithRetry(KafkaInboundEndpoint.java:70) ~[spring-integration-kafka-6.3.1.jar:6.3.1]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:457) ~[spring-integration-kafka-6.3.1.jar:6.3.1]
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:422) ~[spring-integration-kafka-6.3.1.jar:6.3.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2800) ~[spring-kafka-3.2.1.jar:3.2.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2778) ~[spring-kafka-3.2.1.jar:3.2.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.lambda$doInvokeRecordListener$53(KafkaMessageListenerContainer.java:2701) ~[spring-kafka-3.2.1.jar:3.2.1]
    at io.micrometer.observation.Observation.observe(Observation.java:565) ~[micrometer-observation-1.13.1.jar:1.13.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2699) ~[spring-kafka-3.2.1.jar:3.2.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2541) ~[spring-kafka-3.2.1.jar:3.2.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2430) ~[spring-kafka-3.2.1.jar:3.2.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:2085) ~[spring-kafka-3.2.1.jar:3.2.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1461) ~[spring-kafka-3.2.1.jar:3.2.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1426) ~[spring-kafka-3.2.1.jar:3.2.1]
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1296) ~[spring-kafka-3.2.1.jar:3.2.1]
    at java.base/java.util.concurrent.CompletableFuture$AsyncRun.run$$$capture(CompletableFuture.java:1804) ~[na:na]

but if I input only topic-1. All exception occurred on consuming topic-1 and topic-2 will be route to topic-1-dlq. Which is probably correct and acting the way it should. But how should I approach what I aim for?

Upvotes: 1

Views: 85

Answers (1)

VarutJ
VarutJ

Reputation: 11

Answering my own question here.

Although we can't do that directly from yaml configuration. It seems we can implement a DlqDestinationResolver bean and resolve our own destination just like that according to https://docs.spring.io/spring-cloud-stream/reference/kafka/kafka-binder/dlq-partition.html

My bad that I didn't see this document page at first.

Upvotes: 0

Related Questions