Reputation: 313
I'm using Kafka Stream library for streaming application. I wanted to set kafka consumer group id. Then, I put Kafka stream configuration like below.
streamsCopnfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "JoinTestApp");
streamsCopnfiguration.put(StreamsConfig.CLIENT_ID_CONFIG, "JonTestClientId1");
streamsCopnfiguration.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 10 * 1000);
streamsCopnfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstreapServer);
streamsCopnfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsCopnfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.Bytes().getClass().getName());
streamsCopnfiguration.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
streamsCopnfiguration.put(StreamsConfig.consumerPrefix("group.id"), "groupId1");
// streamsCopnfiguration.put(ConsumerConfig.GROUP_ID_CONFIG, "groupId1");
But, my console log kafka consumer configuration not seted group.id. Just stream application id..
2019-03-19 17:17:03,206 [main] INFO org.apache.kafka.clients.consumer.ConsumerConfig - ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = earliest
bootstrap.servers = [....]
check.crcs = true
client.dns.lookup = default
client.id = JonTestClientId111-StreamThread-1-consumer
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = false
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = JoinTestApp
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = false
Can I set kafka stream consumer group.id??
Upvotes: 19
Views: 12931
Reputation: 2892
No, you can't. For this purpose in Kafka Streams you need to use application.id
.
Kafka Streams application.id
is used at various places to isolate resources used by application from others.
application.id
is used as Kafka consumer group.id
for coordination. That's why you can not set group.id
explicitly.
From Kafka Streams Official documentation, application.id
is also used at following places:
- As the default Kafka consumer and producer client.id prefix
- As the name of the subdirectory in the state directory (cf. state.dir)
- As the prefix of internal Kafka topic names
Upvotes: 22