Jeahyun Kim
Jeahyun Kim

Reputation: 313

Can I set Kafka Stream consumer group.id?

I'm using Kafka Stream library for streaming application. I wanted to set kafka consumer group id. Then, I put Kafka stream configuration like below.



    streamsCopnfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "JoinTestApp");
    streamsCopnfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "JonTestClientId1");
    streamsCopnfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
    streamsCopnfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstreapServer);
    streamsCopnfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    streamsCopnfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass().getName());
    streamsCopnfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    streamsCopnfiguration.put(StreamsConfig.consumerPrefix("group.id"), "groupId1");
//    streamsCopnfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId1");

But, my console log kafka consumer configuration not seted group.id. Just stream application id..

2019-03-19 17:17:03,206 [main] INFO  org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values: 
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [....]
check.crcs = true
client.dns.lookup = default
client.id = JonTestClientId111-StreamThread-1-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = JoinTestApp
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false

Can I set kafka stream consumer group.id??

Upvotes: 19

Views: 12931

Answers (1)

mukesh210
mukesh210

Reputation: 2892

No, you can't. For this purpose in Kafka Streams you need to use application.id.

Kafka Streams application.id is used at various places to isolate resources used by application from others.

application.id is used as Kafka consumer group.id for coordination. That's why you can not set group.id explicitly.

From Kafka Streams Official documentation, application.id is also used at following places:

 - As the default Kafka consumer and producer client.id prefix
 - As the name of the subdirectory in the state directory (cf. state.dir)
 - As the prefix of internal Kafka topic names

Upvotes: 22

Related Questions