clay
clay

Reputation: 20470

Kafka Connect Offsets. Get/Set?

How do I get, set, or reset the offset of a Kafka Connect connector/task/sink?

I can use the /usr/bin/kafka-consumer-groups tool which runs kafka.admin.ConsumerGroupCommand to see the offsets for all my regular Kafka consumer groups. However, Kafka Connect tasks and groups do not show up with this tool.

Similarly, I can use the zookeeper-shell to connect to Zookeeper and I can see zookeeper entries for regular Kafka consumer groups, but not for Kafka Connect sinks.

Upvotes: 19

Views: 19815

Answers (4)

Ramin Gharib
Ramin Gharib

Reputation: 65

Starting from Kafka 3.6, Kafka Connect supports first-class admin support (retrieve, alter, reset) for offsets: https://www.confluent.io/blog/introducing-apache-kafka-3-6/#first-class%20offsets-support

Upvotes: 1

Pavel
Pavel

Reputation: 336

You can't set offsets, but you can use kafka-consumer-groups.sh tool to "scroll" the feed forward.

The consumer group of your connector has a name of connect-*CONNECTOR NAME*, but you can double check:

unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --list

To view current offset:

unset JMX_PORT; ./bin/kafka-consumer-groups.sh --bootstrap-server *KAFKA HOSTS* --group connect-*CONNECTOR NAME* --describe

To move the offset forward:

unset JMX_PORT; ./bin/kafka-console-consumer.sh --bootstrap-server *KAFKA HOSTS* --topic *TOPIC* --max-messages 10000 --consumer-property group.id=connect-*CONNECTOR NAME* > /dev/null

I suppose you can move the offset backward as well by deleting the consumer group first, using --delete flag.

Don't forget to pause and resume your connector via Kafka Connect REST API.

Upvotes: 6

WesternGun
WesternGun

Reputation: 12817

In my case(testing reading files into producer and consume in console, all in local only), I just saw this in producer output:

offset.storage.file.filename=/tmp/connect.offsets

So I wanted to open it but it is binary, with some hardly recognizable characters.

I deleted it(rename it also works), and then I can write into the same file and get the file content from consumer again. You have to restart the console producer to take effect because it attempts to read the offset file, if not there, create a new one, so that the offset is reset.

If you want to reset it without deletion, you can use:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group-name> --reset-offsets --to-earliest --topic <topic_name>

You can check all group names by:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --list

and check details of each group:

bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group <group_name> --describe

In production environment, this offset is managed by zookeeper, so more steps (and caution) is needed. You can refer to this page:

https://metabroadcast.com/blog/resetting-kafka-offsets https://community.hortonworks.com/articles/81357/manually-resetting-offset-for-a-kafka-topic.html

Steps:

kafka-topics --list --zookeeper localhost:2181
kafka-run-class kafka.tools.GetOffsetShell --broker-list localhost:9092 -topic vital_signs --time -1 // -1 for largest, -2 for smallest

set /consumers/{yourConsumerGroup}/offsets/{yourFancyTopic}/{partitionId} {newOffset}

Upvotes: 2

Ewen Cheslack-Postava
Ewen Cheslack-Postava

Reputation: 1431

As of 0.10.0.0, Connect doesn't provide an API for managing offsets. It's something we want to improve in the future, but not there yet. The ConsumerGroupCommand would be the right tool to manage offsets for Sink connectors. Note that source connector offsets are stored in a special offsets topic for Connect (they aren't like normal Kafka offsets since they are defined by the source system, see offset.storage.topic in the worker configuration docs) and since sink Connectors uses the new consumer, they won't store their offsets in Zookeeper -- all modern clients use native Kafka-based offset storage. The ConsumerGroupCommand can work with these offsets, you just need to pass the --new-consumer option).

Upvotes: 16

Related Questions