mukh007
mukh007

Reputation: 339

Flink not commiting offsets to kafka

I have a flink streaming job that is reading data from kafka and just logging it. I have enabled checkpoints.

I cannot see the commited offsets in kafka instead i'm getting below error.

Any help is much appriciated.

{$KAFKA_HOME/bin/kafka-consumer-groups.sh --new-consumer --bootstrap-server localhost:9092 --describe --group flink-consumer-group
Error while executing consumer group command Group flink-consumer-group with protocol type '' is not a valid consumer group
java.lang.IllegalArgumentException: Group flink-consumer-group with protocol type '' is not a valid consumer group
at kafka.admin.AdminClient.describeConsumerGroup(AdminClient.scala:152)
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describeGroup(ConsumerGroupCommand.scala:308)
at kafka.admin.ConsumerGroupCommand$ConsumerGroupService$class.describe(ConsumerGroupCommand.scala:89)
at kafka.admin.ConsumerGroupCommand$KafkaConsumerGroupService.describe(ConsumerGroupCommand.scala:296)
at kafka.admin.ConsumerGroupCommand$.main(ConsumerGroupCommand.scala:68)
at kafka.admin.ConsumerGroupCommand.main(ConsumerGroupCommand.scala)}

Versions

kafka_2.11-0.10.1.0 (server with) flink-connector-kafka-0.10_2.11

Upvotes: 4

Views: 3287

Answers (2)

mukh007
mukh007

Reputation: 339

So I figured that flink does commit the offsets to kafka on chekpointing by default, because value FlinkKafkaConsumer#setCommitOffsetsOnCheckpoints is true by default.

Unfortunately these offsets are not visible via the kafka-offset checker cli.

We implemented a scala kafka consumer that connects to kafka with same consumer group but doesnot subscribe to the topic to get the offsets from kafka.

Note: Starting from Kafka version 0.9 consumer Flink Kafka exports all standard metrics, see documentation.

Upvotes: 3

TobiSH
TobiSH

Reputation: 2921

Flink handles the offsets by it's own. The offsets which are committed to kafka (or zookeeper in older versions or setups) are more or less just for your info or for monitoring purposes.

Your error looks like you mixed up different kafka versions (broker version vs. client version). Maybe you can double check this.

Upvotes: 1

Related Questions