hampi2017
hampi2017

Reputation: 721

default consumer group id in kafka

I am working with Kafka 2.11 and fairly new to it. I am trying to understand kafka consumer groups, I have 3 spark applications consuming from same topic and each of them are receiving all the messages from that topic. As i have not mentioned any consumer group id in applications I'm assuming that Kafka is assigning some distinct consumer group id to each of them. I need to reset kafka offset for one of the application using below command.As I don't know the consumer group name of my application I'm kind of stuck here. Do I need to explicitly assign group id in application and then use it in the command below?

./kafka-consumer-groups.sh --bootstrap-server localhost:9092 --reset-offsets --to-datetime 2017-11-1907:52:43:00:000 --group <group_name> --topic <topic_name> --execute

If this is true, how can I get consumer group id of each application? I can't

Upvotes: 3

Views: 10297

Answers (3)

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6593

If you go to Spark code you can find KafkaSourceProvider class, that is responsible for Kafka source reader, you can see that random group.id is generated:

private[kafka010] class KafkaSourceProvider extends DataSourceRegister

  override def createSource(
    sqlContext: SQLContext,
    metadataPath: String,
    schema: Option[StructType],
    providerName: String,
    parameters: Map[String, String]): Source = {
      validateStreamOptions(parameters)
      // Each running query should use its own group id. Otherwise, the query may be only assigned
      // partial data since Kafka will assign partitions to multiple consumers having the same group
      // id. Hence, we should generate a unique id for each query.
      val uniqueGroupId = s"spark-kafka-source-${UUID.randomUUID}-${metadataPath.hashCode}"
    ...
  }

You can search group.id with spark-kafka-source prefix, but you can't find group.id for particular group.

To find all consumer group ids you can use following command: ./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --list

To check consumer groups offsets you can use following command: ./kafka-consumer-groups.sh --bootstrap-server KAFKKA_ADDRESS --group=GROUP_ID --describe

Upvotes: 2

ppatierno
ppatierno

Reputation: 10065

As i have not mentioned any consumer group id in applications I'm assuming that Kafka is assigning some distinct consumer group id to each of them

The Kafka brokers don't assign consumer group names to consumers connected to them. When a consumer connects, subscribing to a topic, it "joins" a group. If you are using Spark application without specifying any consumer group, it means that in some way the library/framework you are using for connecting to Kafka from a Spark application is assigning consumer group names itself.

Upvotes: 1

Monzurul Shimul
Monzurul Shimul

Reputation: 8386

Consumer group.id is mandatory. If you do not set consumer group.id, you will get exception. So obviously you're setting it somewhere in your code or the framework or library you're using is setting it internally. You should always set group.id by yourself.

You can get the consumer group ids by using the following command:

bin/kafka-consumer-groups.sh  --list --bootstrap-server <kafka-broker-ip>:9092

Upvotes: 4

Related Questions