Shivani Bhansali
Shivani Bhansali

Reputation: 109

How to commit offset manually in Kafka Sink Connector

I have a Kafka Sink Task which is listening to a Kafka topic via put() method.
But I do not want to auto commit the offset as I have some processing logic once record is fetched from Kafka.
After fetching the records from Kafka, if the processing is successful then only I want to commit the offset else it should read from the same offset again.

I can see there is method commitSync() in Kafka consumer but cannot find an alternative in Sink Connector for the same.

Upvotes: 3

Views: 1878

Answers (2)

0gam
0gam

Reputation: 1443

Sink Kafka Connector-Commit

If the option(enable.auto.commit) is False, automatically commit every 60 seconds according to the option(offset.flush.interval.ms) below. and if there is no error in your put() method, it will be committed normally.

offset.flush.interval.ms
Interval at which to try committing offsets for tasks.

Type: long
Default: 60000
Importance: low

To manage offset in Sink Kafka

Kafka Connect should commit all the offsets it passed to the connector via preCommit. But if your preCommit returns an empty set of offsets, then Kafka Connect will record no offsets at all. enter link description here

SinkTask.java

/**
 * Pre-commit hook invoked prior to an offset commit.
 *
 * The default implementation simply invokes {@link #flush(Map)} and is thus able to assume all {@code currentOffsets} are committable.
 *
 * @param currentOffsets the current offset state as of the last call to {@link #put(Collection)}},
 *                       provided for convenience but could also be determined by tracking all offsets included in the {@link SinkRecord}s
 *                       passed to {@link #put}.
 *
 * @return an empty map if Connect-managed offset commits are not desired, otherwise a map of committable offsets by topic-partition.
 */
public Map<TopicPartition, OffsetAndMetadata> preCommit(Map<TopicPartition, OffsetAndMetadata> currentOffsets) {
    flush(currentOffsets);
    return currentOffsets;
}

or

SinkTaskContext.java

/**
 * Request an offset commit. Sink tasks can use this to minimize the potential for redelivery
 * by requesting an offset commit as soon as they flush data to the destination system.
 *
 * This is a hint to the runtime and no timing guarantee should be assumed.
 */
void requestCommit();

Upvotes: 2

Add this property : ("enable.auto.commit", "false")

enable.auto.commit has a default value of true and a second property auto.commit.interval.ms has a default value of 5000

Upvotes: 0

Related Questions