Ray S
Ray S

Reputation: 629

How to have multiple kafka consumer groups in application properties

I am using spring kafka and would like to have multiple consumer groupIds mentioned in the application yaml that I can use in my kafkaListener class where I am listening to multiple topics.

My kafka properties in application.yml file look like this for now

kafka:
    properties:
      topics:
        topic1: topic1
        topic2: topic2
    bootstrap-servers: server1,server2
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
      retries: 4
    consumer:
      group-id: mygroupid
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

Is there a way to have multiple groupIDs in the consumer block above.

Alternatively, I have give different groupIDs in kafkaListener in my spring code as follows, I am not sure how to set up the rest of the propeties like auto-offset-reset, key-deserializer etc:

 @KafkaListener(topics = "topic1",  groupId = "cd1")
  public void consumeMessage(String message) throws Exception {
    // some code goes here
  }

Please let me know how to accomplish what I am trying to do as I am new to kafka.

Upvotes: 3

Views: 6749

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

Boot only autoconfigures one DefaultKafkaConsumerFactory and DefaultKafkaConsumerFactory from the yml, so all the properties are shared across all consumers. That's why we added the groupId (and use the id if groupId is not provided); this is the most common property that changes between consumers.

You can, of course, use property placeholders so groupId = "${group.one}" will use the property group.one from the yml.

To change more fundamental stuff like serializers/deserializers, if you are using a version earlier than 2.2.4, you will need to create multiple factories and container factories.

However, starting with version 2.2.4, you can now set any arbitrary kafka consumer property in the KafkaListener annotation...

/**
 * Kafka consumer properties; they will supersede any properties with the same name
 * defined in the consumer factory (if the consumer factory supports property overrides).
 * <h3>Supported Syntax</h3>
 * <p>The supported syntax for key-value pairs is the same as the
 * syntax defined for entries in a Java
 * {@linkplain java.util.Properties#load(java.io.Reader) properties file}:
 * <ul>
 * <li>{@code key=value}</li>
 * <li>{@code key:value}</li>
 * <li>{@code key value}</li>
 * </ul>
 * {@code group.id} and {@code client.id} are ignored.
 * @return the properties.
 * @since 2.2.4
 * @see org.apache.kafka.clients.consumer.ConsumerConfig
 * @see #groupId()
 * @see #clientIdPrefix()
 */
String[] properties() default {};

Note that these properties are in the native kafka dotted format (auto.offset.reset) not the boot hyphenated or camel case properties.

Here's an example from the documentation:

@KafkaListener(topics = "myTopic", groupId="group", properties= {
    "max.poll.interval.ms:60000",
    ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})

Again, the values can be property placeholders.

On the producer side, you need multiple factories still.

Upvotes: 1

Related Questions