user2894829
user2894829

Reputation: 815

How to set Kafka offset for consumer?

Assume there are already 10 data in my topic and now I start my consumer written Flink, the consumer will consume the 11th data.

Therefore, I have 3 questions:

  1. How to get the number of partitions of current topic and the offset of each partition respectively?
  2. How to set the starting position for each partition for consumer manually?
  3. If the Flink consumer crashes and after several minutes it gets recovered. How will the consumer know where to restart?

Any help is appreciated. The sample codes (I tried FlinkKafkaConsumer08, FlinkKafkaConsumer10 but all exception.):

public class kafkaConsumer {
public static void main(String[] args) throws Exception {
    // create execution environment
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.enableCheckpointing(5000);

    Properties properties = new Properties();
    properties.setProperty("bootstrap.servers", "192.168.95.2:9092");
    properties.setProperty("group.id", "test");
    properties.setProperty("auto.offset.reset", "earliest");

    FlinkKafkaConsumer09<String> myConsumer = new FlinkKafkaConsumer09<>(
            "game_event", new SimpleStringSchema(), properties);


    DataStream<String> stream = env.addSource(myConsumer);

    stream.map(new MapFunction<String, String>() {
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(String value) throws Exception {
            return "Stream Value: " + value;
        }
    }).print();

    env.execute();
    }
}

And pom.xml:

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.11</artifactId>
        <version>1.6.1</version>
    </dependency>

Upvotes: 3

Views: 11840

Answers (1)

Giorgos Myrianthous
Giorgos Myrianthous

Reputation: 39930

  1. In order consume messages from a partition starting from a particular offset you can refer to the Flink Documentationl:

You can also specify the exact offsets the consumer should start from for each partition:

Map<KafkaTopicPartition, Long> specificStartOffsets = new HashMap<>();
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 0), 23L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 1), 31L);
specificStartOffsets.put(new KafkaTopicPartition("myTopic", 2), 43L);

myConsumer.setStartFromSpecificOffsets(specificStartOffsets);

The above example configures the consumer to start from the specified offsets for partitions 0, 1, and 2 of topic myTopic. The offset values should be the next record that the consumer should read for each partition. Note that if the consumer needs to read a partition which does not have a specified offset within the provided offsets map, it will fallback to the default group offsets behaviour (i.e. setStartFromGroupOffsets()) for that particular partition.

Note that these start position configuration methods do not affect the start position when the job is automatically restored from a failure or manually restored using a savepoint. On restore, the start position of each Kafka partition is determined by the offsets stored in the savepoint or checkpoint (please see the next section for information about checkpointing to enable fault tolerance for the consumer).

  1. In case one of the consumers crashes, once it recovers Kafka will refer to consumer_offsets topic in order to continue processing the messages from the point it was left to before crashing. consumer_offsets is a topic which is used to store meta-data information regarding committed offsets for each triple (topic, partition, group). It is also periodically compacted so that only latest offsets are available. You can also refer to Flink's Kafka Connectors Metrics:

Flink’s Kafka connectors provide some metrics through Flink’s metrics system to analyze the behavior of the connector. The producers export Kafka’s internal metrics through Flink’s metric system for all supported versions. The consumers export all metrics starting from Kafka version 0.9. The Kafka documentation lists all exported metrics in its documentation.

In addition to these metrics, all consumers expose the current-offsets and committed-offsets for each topic partition. The current-offsets refers to the current offset in the partition. This refers to the offset of the last element that we retrieved and emitted successfully. The committed-offsets is the last committed offset.

The Kafka Consumers in Flink commit the offsets back to Zookeeper (Kafka 0.8) or the Kafka brokers (Kafka 0.9+). If checkpointing is disabled, offsets are committed periodically. With checkpointing, the commit happens once all operators in the streaming topology have confirmed that they’ve created a checkpoint of their state. This provides users with at-least-once semantics for the offsets committed to Zookeeper or the broker. For offsets checkpointed to Flink, the system provides exactly once guarantees.

The offsets committed to ZK or the broker can also be used to track the read progress of the Kafka consumer. The difference between the committed offset and the most recent offset in each partition is called the consumer lag. If the Flink topology is consuming the data slower from the topic than new data is added, the lag will increase and the consumer will fall behind. For large production deployments we recommend monitoring that metric to avoid increasing latency.

Upvotes: 2

Related Questions