Reputation: 23
I have create a sink kafka connect that convert data to other storage; I want to set auto.offset.reset
as latest
when new connector is created with kafka connect rest api
; I have set consumer.auto.offset.reset: latest
in configs;
json
{
"name": "test_v14",
"config": {
"name": "test_v14",
"consumer.auto.offset.reset": "latest",
"connector.class": "...",
...
}
}
But when task started, kafka consumer still poll records from earliest; So is any other ways to set auto.offset.reset
as latest;
Upvotes: 2
Views: 6763
Reputation: 191701
Prior to Kafka 2.3
consumer.auto.offset.reset
needs to be set in the connect-distributed.properties
file (the Worker).
It cannot be applied to any particular Connector unless that connector class is explicitly creating and loading its own Consumer objects that read in that property.
Upvotes: 4
Reputation: 32080
As of Apache Kafka 2.3, it is now possible to set this as part of a connector's configuration.
On the worker set:
connector.client.config.override.policy=All
Then in the connector you can specify
"consumer.override.auto.offset.reset": "latest"
See this for more details: https://rmoff.net/2019/08/09/starting-a-kafka-connect-sink-connector-at-the-end-of-a-topic/
Upvotes: 4