Yamada
Yamada

Reputation: 723

KAFKA Group Coordinator Fail Recovery on 0.11.0.1

Is there any configuration to enable automatic group coordinator recovery after a crash?

I have a testing topology with 3 brokers and once the Group Coordinator is shutdown, the topic partitions (2 partitions with rf=2) gets correctly re balanced, the producer is not affected, but the consumer group stop receiving messages. If I choose any other broker everything works as expected.

Using the JAVA API Kafka Clients 0.10.2.1 for producer and client

<dependency>
   <groupId>org.apache.kafka</groupId>
   <artifactId>kafka-clients</artifactId>
   <version>0.10.2.1</version>
</dependency>

Monitoring the console output of each broker that remains running, I don't find any reference of a new GroupCoordinator assignment. All consumers resume receiving messages as soon I start the original group coordinator broker. The broker elected as coordinator is always the broker.id=0, no matter the startup order.

Client Config:

private static Consumer<String, String> createFixMessageConsumer(int id) {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092,localhost:9093,localhost:9094");
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "6100");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, MYCONSUMERGROUP);
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, id + "");
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());     
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        return new KafkaConsumer<>(props, new StringDeserializer(), new FixMessageDeserializer());
    }

Consumer Worker snippet:

    @Override
    public void run() {
        try {
            consumer.subscribe(topics);

            while (true) {
                ConsumerRecords<String, FixMessage> records = consumer.poll(2000);
                FixMessage message = null;
                for (ConsumerRecord<String, FixMessage> record : records) {
                    message = record.value();
                    message.setConsumerId(id);
                    message.setKafkaPartition(record.partition());
                    message.setPartitionOffset(BigInteger.valueOf(record.offset()));
                    Map<String, Object> data = new HashMap<>();
                    data.put("partition", record.partition());
                    data.put("offset", record.offset());
                    if(message.getIdfixMessage() == null)
                        createFixMessage(message, data);
                    data.put("value", message.getIdfixMessage());
                    System.out.println(this.id + ": " + data);
                }
            }
      } catch (WakeupException e) {
        // ignore for shutdown 
      } catch(Exception e) {
          System.out.println(e.toString());
      } finally {
        consumer.close();
      }
    }

Upvotes: 3

Views: 3128

Answers (2)

Binita Bharati
Binita Bharati

Reputation: 5898

I was having the same issue with Kafka 2.11-1.0.0. That is , while consumption , if the broker where the consumer group coordinator resides shuts down, new coordinator was not getting discovered. As a result, message consumption was totally halted, inspite of producer being able to continually produce to the newly elected leader (Newly elected leader was in picture, as one of the partitions was falling on the shutdown broker, but it got auto-reassigned to one of the ISRs). After updating the replication factor of internal topic __consumer_offsets to 3 (I have a cluster of 3 brokers), automatic failover of consumer group coordinator started happening. All messages that were successfully produced got consumed after auto discovery of the new consumer group coordinator. To increase RF for internal topic __consumer_offsets , refer to : http://kafka.apache.org/documentation.html#basic_ops_increase_replication_factor

Upvotes: 3

amethystic
amethystic

Reputation: 7091

Ensure the replication factor of topic __consumer_offsets is greater than 1 in your case. Before 0.11.0.0, broker-side parameter default.replication.factor will not be enforced, so it's very likely the rf of this internal topic is less than default.replication.factor you set.

Upvotes: 6

Related Questions