
Reputation: 353

How to reset offsets to arbitrary value in Kafka Consumer Group?

I want to reset offsets of all partitions to specific values .... I see provides option of --from-file Reset offsets to values defined in CSV file

Can anyone please share contents/format of this csv file and example command for it ?

for example: ./kafka_2.12-2.1.1/bin/ --bootstrap-server ${KAFKA_BROKER} --group ${GROUP_NAME} --topic ${TOPIC} --reset-offsets --from-file offsets.csv --execute

Whats contents/format of offsets.csv ?

Upvotes: 11

Views: 8207

Answers (3)


Reputation: 19685

@wardzinski's answer is the key information requested, but I can add the following useful tidbit:

You can use the --export command of kafka-consumer-groups to create the CSV file from existing information, without changing anything via --dry-run. For example:

bin/kafka-consumer-groups \
  --bootstrap-server $KAFKA \
  --export --group $GROUP_NAME --topic $TOPIC \
  --reset-offsets --to-current \

The value --to-current can be changed to various other values, such as --to-datetime, --by-period, etc.

The output of that command is the required CSV file necessary for --from-file.

One very useful use case for this is to copy the offsets from one consumer group to another consumer group, for example:

bin/kafka-consumer-groups \
  --bootstrap-server $KAFKA \
  --export --group $FROM_GROUP_NAME --topic $TOPIC \
  --reset-offsets --to-current \
  --dry-run > offsets.txt

bin/kafka-consumer-groups \
  --bootstrap-server $KAFKA \
  --execute --group $TO_GROUP_NAME \
  --reset-offsets --from-file offsets.txt

Upvotes: 9

Michael Heil
Michael Heil

Reputation: 18525

In case you plan to alter the offset within a Java application, you can make use of the AdminClient's API alterConsumerGroupOffsets.

Here is a simple example tested with Kafka 2.8.0:

String brokers = "localhost:9092";
String consumerGroupName = "test1337";
TopicPartition topicPartition = new TopicPartition("test", 0);
Long offset = 4L;

Map<TopicPartition, OffsetAndMetadata> toOffset = new HashMap<>();
toOffset.put(topicPartition, new OffsetAndMetadata(offset));

// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);

try {
  // Check offsets before altering
  KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsBeforeResetFuture = adminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata();
  System.out.println("Before: " + offsetsBeforeResetFuture.get().toString());

  // Alter offsets
  adminClient.alterConsumerGroupOffsets(consumerGroupName, toOffset).partitionResult(topicPartition).get();

  // Check offsets after altering
  KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsAfterResetFuture = adminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata();
  System.out.println("After:  " + offsetsAfterResetFuture.get().toString());
} catch (InterruptedException e) {
} catch (ExecutionException e) {
} finally {

This will print out

Before: {test-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}
After:  {test-0=OffsetAndMetadata{offset=4, leaderEpoch=null, metadata=''}}

You can extend that example to load a csv file that contains all information on consumer group, topic, partition and the new offset for that partition.

Upvotes: 3

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6623

Csv file format is (Each line contains information about one partition):


Sample csv content (reset-policy.csv).


Command to reset offset based on csv file is:

./bin/ --bootstrap-server localhost:9092 --group gr1 --from-file reset-policy.csv --reset-offsets --execute

Upvotes: 12

Related Questions