Jonathan Henrique
Jonathan Henrique

Reputation: 124

Spring Cloud Stream Multi Cluster and Multi Input Bindings from a Single Consumer

In my project, I need to connect to two different Kafka brokers and consume all events of two topics, one topic on each Kafka broker.

My application.yaml looks somewhat like this:

spring:
    cloud:
    function:
      definition: orderCreatedListener;orderProcessedListener
    stream:
      bindings:
        orderCreatedProducer-out-0:
          destination: order-created
          binder: kafka-one
        orderCreatedListener-in-0:
          destination: order-created
          group: spot
          binder: kafka-one
        orderCreatedListener-out-0:
          destination: order-processed
          binder: kafka-two # I changed this binder between kafka-one and kafka-two manually for tests, the orderProcessedListener-in-1 binding doesn't have the exclusive producer
        orderProcessedListener-in-0: # CONSUME FROM KAFKA ONE
          destination: order-processed
          group: spot
          binder: kafka-one
        orderProcessedListener-in-1: # CONSUMER FROM KAFKA TWO
          destination: order-processed
          group: spot
          binder: kafka-two
      kafka:
        binder:
          auto-create-topics: true
          configuration:
            security:
              protocol: SASL_PLAINTEXT
            sasl:
              mechanism: PLAIN
        bindings:
          orderCreatedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-created-dlq
              autoCommitOnError: true
              autoCommitOffset: true
          orderProcessedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-processed-dlq
              autoCommitOnError: true
              autoCommitOffset: true
          orderProcessedListener-in-1:
            consumer:
              enableDlq: true
              dlqName: order-processed-dlq
              autoCommitOnError: true
              autoCommitOffset: true
      binders:
        kafka-one:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
                      configuration:
                        sasl:
                          jaas:
                            config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
        kafka-two:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9093
                      configuration:
                        sasl:
                          jaas:
                            config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"

But it didn't work when I ran the application, the consumer binding from Kafka One works normally orderProcessedListener-in-0, but the other consumer binding from Kafka Two doesn't work orderProcessedListener-in-1.

I am using:

I have the two Kafka clusters running fine on my development environment with docker containers, one exposed on the 9092 port and the other exposed on the 9093 port.

Architecture example: project architecture example

Kafka one with all signed consumers on all topics: enter image description here

Kafka two without never signed consumers on all topics: enter image description here

How do we adjust this?

Upvotes: 0

Views: 77

Answers (1)

Jonathan Henrique
Jonathan Henrique

Reputation: 124

The simple approach to solve this issue is to use two beans pointing to a unique entry point method.

Example:


@Component
public class OrderProcessedListener {

    public void consume(final Message<Order> message) {
        // your business logic to process message
    }

    @Bean
    public Consumer<Message<Order>> orderProcessedFromKafkaOneListener() {
        return this::consume;
    }

    @Bean
    public Consumer<Message<Order>> orderProcessedFromKafkaTwoListener() {
        return this::consume;
    }

}

The properties configuration should be:

spring:
    cloud:
    function:
      definition: orderCreatedListener;orderProcessedFromKafkaOneListener;orderProcessedFromKafkaTwoListener
    stream:
      bindings:
        orderCreatedProducer-out-0:
          destination: order-created
          binder: kafka-one
        orderCreatedListener-in-0:
          destination: order-created
          group: spot
          binder: kafka-one
        orderCreatedListener-out-0:
          destination: order-processed
          binder: kafka-two
        orderProcessedFromKafkaOneListener-in-0:
          destination: order-processed
          group: spot
          binder: kafka-one
        orderProcessedFromKafkaTwoListener-in-0:
          destination: order-processed
          group: spot
          binder: kafka-two
      kafka:
        binder:
          auto-create-topics: true
          configuration:
            security:
              protocol: SASL_PLAINTEXT
            sasl:
              mechanism: PLAIN
        bindings:
          orderCreatedListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-created-dlq
              autoCommitOnError: true
              autoCommitOffset: true
          orderProcessedFromKafkaOneListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-processed-dlq
              autoCommitOnError: true
              autoCommitOffset: true
          orderProcessedFromKafkaTwoListener-in-0:
            consumer:
              enableDlq: true
              dlqName: order-processed-dlq
              autoCommitOnError: true
              autoCommitOffset: true
      binders:
        kafka-one:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9092
                      configuration:
                        sasl:
                          jaas:
                            config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"
        kafka-two:
          type: kafka
          environment:
            spring:
              cloud:
                stream:
                  kafka:
                    binder:
                      brokers: localhost:9093
                      configuration:
                        sasl:
                          jaas:
                            config: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"spot\" password=\"spot\";"

Anytime the best solution for this is to use the reactive functions.

Upvotes: 0

Related Questions