Reputation: 629
I am using spring kafka and would like to have multiple consumer groupIds mentioned in the application yaml that I can use in my kafkaListener class where I am listening to multiple topics.
My kafka properties in application.yml file look like this for now
kafka:
properties:
topics:
topic1: topic1
topic2: topic2
bootstrap-servers: server1,server2
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
retries: 4
consumer:
group-id: mygroupid
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
Is there a way to have multiple groupIDs in the consumer block above.
Alternatively, I have give different groupIDs in kafkaListener in my spring code as follows, I am not sure how to set up the rest of the propeties like auto-offset-reset, key-deserializer etc:
@KafkaListener(topics = "topic1", groupId = "cd1")
public void consumeMessage(String message) throws Exception {
// some code goes here
}
Please let me know how to accomplish what I am trying to do as I am new to kafka.
Upvotes: 3
Views: 6749
Reputation: 174554
Boot only autoconfigures one DefaultKafkaConsumerFactory
and DefaultKafkaConsumerFactory
from the yml, so all the properties are shared across all consumers. That's why we added the groupId
(and use the id
if groupId
is not provided); this is the most common property that changes between consumers.
You can, of course, use property placeholders so groupId = "${group.one}"
will use the property group.one
from the yml.
To change more fundamental stuff like serializers/deserializers, if you are using a version earlier than 2.2.4, you will need to create multiple factories and container factories.
However, starting with version 2.2.4, you can now set any arbitrary kafka consumer property in the KafkaListener
annotation...
/**
* Kafka consumer properties; they will supersede any properties with the same name
* defined in the consumer factory (if the consumer factory supports property overrides).
* <h3>Supported Syntax</h3>
* <p>The supported syntax for key-value pairs is the same as the
* syntax defined for entries in a Java
* {@linkplain java.util.Properties#load(java.io.Reader) properties file}:
* <ul>
* <li>{@code key=value}</li>
* <li>{@code key:value}</li>
* <li>{@code key value}</li>
* </ul>
* {@code group.id} and {@code client.id} are ignored.
* @return the properties.
* @since 2.2.4
* @see org.apache.kafka.clients.consumer.ConsumerConfig
* @see #groupId()
* @see #clientIdPrefix()
*/
String[] properties() default {};
Note that these properties are in the native kafka dotted format (auto.offset.reset
) not the boot hyphenated or camel case properties.
Here's an example from the documentation:
@KafkaListener(topics = "myTopic", groupId="group", properties= {
"max.poll.interval.ms:60000",
ConsumerConfig.MAX_POLL_RECORDS_CONFIG + "=100"
})
Again, the values can be property placeholders.
On the producer side, you need multiple factories still.
Upvotes: 1