Reputation: 629
I am new to Kafka,and I don't really understand the meaning of Kafka configuration, can anyone explain in a more understandable way to me?
Here is my code:
val kafkaParams = Map[String, Object](
"bootstrap.servers" -> "master:9092,slave1:9092",
"key.deserializer" -> classOf[StringDeserializer],
"value.deserializer" -> classOf[StringDeserializer],
"group.id" -> "GROUP_2017",
"auto.offset.reset" -> "latest", //earliest or latest
"enable.auto.commit" -> (true: java.lang.Boolean)
)
what does it mean in my code?
Upvotes: 33
Views: 108534
Reputation: 4458
I will explain to you the meaning, but I highly suggest to read Kafka Web Site Configuration
"bootstrap.servers" -> "master:9092,slave1:9092"
Essentially the Kafka cluster configuration: IP and Port.
"key.deserializer" -> classOf[StringDeserializer]
"value.deserializer" -> classOf[StringDeserializer]
This SO answer explain what is the purpose.
"group.id" -> "GROUP_2017"
A consumer process will belong to a consumer "group". A group can have multiple consumers and Kafka will assign exactly one consumer process to each partition (for consuming messages). If the number of consumers is greater than number of partitions available, then some consumer processes will be idle.
"enable.auto.commit" -> (true: java.lang.Boolean)
When that flag is true, Kafka is able to commit the message you brought from Kafka using Zookeeper to persist the last 'offset' which it read. This approach is not the best to use when you want a more robust solution for a production system, because does not ensure that the records you brought were correctly processed (using the logic you wrote in your code). If this flag is false, Kafka will not know which was the last offset read so when you restart the process, it will start reading the 'earliest' or the 'latest' offset depending on the value of your next flag (auto.offset.reset). Finally, This Cloudera article explains in details how to manage in a proper way the offsets.
"auto.offset.reset" -> "latest"
This flag tells Kafka where to start reading offsets in case you do not have any 'commit' yet. In others words, it will start either from the 'earliest' or from the 'latest' if you have not persisted any offset in Zookeeper yet (Manually or using enable.auto.commit flag).
Upvotes: 28
Reputation: 8335
auto.offset.reset
is ONLY at play when there is no valid committed offset; such as at the first time you start the system, or after a committed offset expires and is deleted because its too old.
enable.auto.commit
is about a choice to have offsets committed automatically in the background vs explicit manual control in the foreground.
auto.offset.reset
What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted):
earliest
: automatically reset the offset to the earliest offsetlatest
: automatically reset the offset to the latest offsetnone
: throw exception to the consumer if no previous offset is found for the consumer's groupType:
string
Default:
latest
Valid Values:
[latest, earliest, none]
Importance:
medium
enable.auto.commit
If true, the consumer's offset will be periodically committed in the background.
Type:
boolean
Default:
true
Valid Values:
Importance:
medium
auto.commit.interval.ms
The frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit
is set to true
.
Type:
int
Default:
5000 (5 seconds)
Valid Values:
[0,...]
Importance:
low
The full set of consumer configuration parameters in documented on the Apache Kafka web site at https://kafka.apache.org/documentation.html#newconsumerconfigs
Upvotes: 9
Reputation: 18475
Adding more details on configurations mentioned in the title: "Not clear about the meaning of auto.offset.reset
and enable.auto.commit
in Kafka"
With the auto.offset.reset
configuration you can steer the behavior of your consumer (as part of a consumer group) in situations when your Consumer Group has never consumed and committed from a particular topic or the last committed offset from that Consumer Group was deleted (e.g. through cleanup policy).
Each message in a partition of a Kafka topic has a unique identifier which is the offset
. Offsets are unique per Kafka partition. A consumer usually commits back the offsets on each partition of the topic it consumed. That way, the consumer is able to avoid duplicate readings.
Imagine you have a consumer reading from a topic for the first time (or if you change the consumer group name). The consumer group has therefore never committed any offsets. According to the Config Docs you can choose between the following behaviors with the configuration auto.offset.reset
:
earliest: automatically reset the offset to the earliest offset
latest: automatically reset the offset to the latest offset
none: throw exception to the consumer if no previous offset is found for the consumer's group
anything else: throw exception to the consumer.
The default setting is latest
.
As mentioned above it is critical to think about your offsets and their commits when consuming messages from Kafka. When setting the configuration enable.auto.commit
to true
the consumer offsets will be committed automatically in the background.
In the JavaDocs of KafkaConsumer you will find a nice example on how to manually commit the offsets in a Consumer Client using
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.commitSync();
To emphasize the importance of the Offset Management in you consumer client again it is worth reading the whole Java Docs description or the confluent Kafka Documentation on Offset Management.
Upvotes: 7