eomercelik
eomercelik

Reputation: 595

How to apply retention time configuration for Dead Letter Queue in Spring Cloud Stream Kafka Binder?

I have an application using Spring Cloud Stream Kafka. For user defined topics I can delete records from specified topics by giving the configuration I mentioned below. But this configuration doesn't work for DLQ Topics.

For example in the configuration below, I configured retention time in binder level. So my producer topic (student-topic) defined under bindings level is correctly configured, I can check that the records are deleted when topic logs exceed specified retention byte(300000000).

But binder level retention time doesn't work DLQ topic(person-topic-error-dlq). Is there any different configuration for cleaning records from DLQ topics other than retention time.

How can I do this?

spring:
  cloud:
    stream:
      kafka:
        bindings:
          person-topic-in:
            consumer:
              enableDlq: true
              dlqName: person-topic-error-dlq
      binders:
        defaultKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    default:
                      producer:
                        topic:
                          properties:
                            retention.bytes: 300000000
                            segment.bytes: 300000000
                    binder:
                      brokers: localhost:19092
      bindings:
        person-topic-in:
          binder: defaultKafka
          destination: person-topic
          contentType: application/json
          group: person-topic-group
        student-topic-out:
          binder: defaultKafka
          destination: student-topic
          contentType: application/json

Upvotes: 1

Views: 975

Answers (1)

Gary Russell
Gary Russell

Reputation: 174779

You are only setting the (default) properties for producer bindings.

That said, this still doesn't work for me:

      binders:
        defaultKafka:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    default:
                      producer:
                        topic:
                          properties:
                            retention.bytes: 300000000
                            segment.bytes: 300000000
                      consumer:
                        topic:
                          properties:
                            retention.bytes: 300000000
                            segment.bytes: 300000000

(the properties are not applied to even the primary topic).

Looks like there is a problem with default kafka consumer binding properties.

This works for me; the properties are applied to both the primary and dead letter topics:

spring:
  cloud:
    stream:
      kafka:
        bindings:
          person-topic-in:
            consumer:
              enableDlq: true
              dlqName: person-topic-error-dlq
              topic:
                properties:
                  retention.bytes: 300000000
                  segment.bytes: 300000000

Upvotes: 1

Related Questions