Reputation: 353
I want to reset offsets of all partitions to specific values .... I see kafka-consumer-groups.sh provides option of --from-file Reset offsets to values defined in CSV file
Can anyone please share contents/format of this csv file and example command for it ?
for example:
./kafka_2.12-2.1.1/bin/kafka-consumer-groups.sh --bootstrap-server ${KAFKA_BROKER} --group ${GROUP_NAME} --topic ${TOPIC} --reset-offsets --from-file offsets.csv --execute
Whats contents/format of offsets.csv ?
Upvotes: 11
Views: 8207
Reputation: 19685
@wardzinski's answer is the key information requested, but I can add the following useful tidbit:
You can use the --export
command of kafka-consumer-groups
to create the CSV file from existing information, without changing anything via --dry-run
. For example:
bin/kafka-consumer-groups \
--bootstrap-server $KAFKA \
--export --group $GROUP_NAME --topic $TOPIC \
--reset-offsets --to-current \
--dry-run
The value --to-current
can be changed to various other values, such as --to-datetime
, --by-period
, etc.
The output of that command is the required CSV file necessary for --from-file
.
One very useful use case for this is to copy the offsets from one consumer group to another consumer group, for example:
bin/kafka-consumer-groups \
--bootstrap-server $KAFKA \
--export --group $FROM_GROUP_NAME --topic $TOPIC \
--reset-offsets --to-current \
--dry-run > offsets.txt
bin/kafka-consumer-groups \
--bootstrap-server $KAFKA \
--execute --group $TO_GROUP_NAME \
--reset-offsets --from-file offsets.txt
Upvotes: 9
Reputation: 18525
In case you plan to alter the offset within a Java application, you can make use of the AdminClient's API alterConsumerGroupOffsets
.
Here is a simple example tested with Kafka 2.8.0:
String brokers = "localhost:9092";
String consumerGroupName = "test1337";
TopicPartition topicPartition = new TopicPartition("test", 0);
Long offset = 4L;
Map<TopicPartition, OffsetAndMetadata> toOffset = new HashMap<>();
toOffset.put(topicPartition, new OffsetAndMetadata(offset));
// Create AdminClient
final Properties properties = new Properties();
properties.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
AdminClient adminClient = AdminClient.create(properties);
try {
// Check offsets before altering
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsBeforeResetFuture = adminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata();
System.out.println("Before: " + offsetsBeforeResetFuture.get().toString());
// Alter offsets
adminClient.alterConsumerGroupOffsets(consumerGroupName, toOffset).partitionResult(topicPartition).get();
// Check offsets after altering
KafkaFuture<Map<TopicPartition, OffsetAndMetadata>> offsetsAfterResetFuture = adminClient.listConsumerGroupOffsets(consumerGroupName).partitionsToOffsetAndMetadata();
System.out.println("After: " + offsetsAfterResetFuture.get().toString());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
} finally {
adminClient.close();
}
This will print out
Before: {test-0=OffsetAndMetadata{offset=1, leaderEpoch=null, metadata=''}}
After: {test-0=OffsetAndMetadata{offset=4, leaderEpoch=null, metadata=''}}
You can extend that example to load a csv file that contains all information on consumer group, topic, partition and the new offset for that partition.
Upvotes: 3
Reputation: 6623
Csv file format is (Each line contains information about one partition):
topicName,partitionNumber,offset
topicName,partitionNumber,offset
Sample csv content (reset-policy.csv
).
someTopic1,0,1
someTopic2,1,5
Command to reset offset based on csv file is:
./bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --group gr1 --from-file reset-policy.csv --reset-offsets --execute
Upvotes: 12