Reputation: 2920
I have searched quite a lot on this but there doesn't seems to be a good guide around this.
From what I have searched there are a few things to consider:
Question: Is there even a need to reset these topics?
--reset-offsets
to --to-earliest
What would be the best way to restart both a sink and a source connector to read from beginning?
Upvotes: 22
Views: 26699
Reputation: 79
With version 3.6.0, Kafka Connect will add native support for resetting the offsets of both sink and source connectors via the REST API as part of KIP-875.
If you are running version 3.6.0 or later, first issue a PUT request to the /connectors/{name}/stop
endpoint to stop (but not delete) the connector, then reset its offsets by issuing a DELETE request to the /connectors/{name}/offsets
endpoint.
Upvotes: 3
Reputation: 551
a bit late but found another way. Just set the offset.storage.file.name in standalone mode to dev/null:
#worker.properties
offset.storage.file.filename=/dev/null
#cmdline
connect-standalone /data/config/worker.properties /data/config/connector.properties
Upvotes: 0
Reputation: 2901
Source connector Distributed mode - has another option which is producing a new message to the offset topic. For example I use jdbc source connector: When looking on the offset topic I see the following:
./kafka-console-consumer.sh --zookeeper localhost:2181/kafka11-staging --topic kc-staging--offsets --from-beginning --property print.key=true
["referrer-family-jdbc-source",{"query":"query"}] {"incrementing":100}
Now in order to reset this I just produce another message with incrementing:0
For example: how to produce from shell with key from here
./kafka-console-producer.sh \
--broker-list `hostname`:9092 \
--topic kc-staging--offsets \
--property "parse.key=true" \
--property "key.separator=|"
["referrer-family-jdbc-source",{"query":"query"}]|{"incrementing":0}
Please note that you need to do the following:
Upvotes: 5
Reputation: 6613
Source Connector:
/tmp/connect.offsets
) or change connector name.Sink Connector (both modes) one of the following methods:
To reset offset you have to first delete connector, reset offset (./bin/kafka-consumer-groups.sh --bootstrap-server :9092 --group connectorName --reset-offsets --to-earliest --execute --topic topicName
), add same configuration one more time
You can check following question: Reset the JDBC Kafka Connector to start pulling rows from the beginning of time?
Upvotes: 26