z.wang
z.wang

Reputation: 51

Kafka source connector get stale offsets value

I have a SourceTask which has a simple poll method (completes quite fast). I found that the offsets value got from the context.offsetStorageReader is mostly stale, which means not matching the offsets value returned in the previous poll() method.

At the same time, I can observe from logs that the offsets value only get updated to "fresh" when "commitOffsets successfully" occurred.

My question is: is this designed on purpose? Should I decrease the "OFFSET_COMMIT_INTERVAL_MS_CONFIG" value to assure the offset is committed faster than the SourceTask.poll() method executed?

Upvotes: 2

Views: 634

Answers (2)

code-gorilla
code-gorilla

Reputation: 2438

I had the same misconception about how Kafka Connect SourceTasks work:

Fetching the current offset in poll() only makes sense if you think about Tasks as "one-off" jobs: The Task is started, then poll() is called, which sends the record(s) to Kafka and persists its offset with it; then the Task is killed and the next Task will pick up the offset and continue reading data from the source.

This would not work well with Kafka, because the Connector partitions/offsets are themselves persisted in a Kafka topic - so there is no guarantee on how long the replication of the partition/offset value will take. This is why you receive stale offsets in the poll() method.

In reality, Tasks are started once, and after that their poll() method is called as long as the Task is running. So the Task can store its offset in memory (e.g. a field in the SourceTask deriving class) and access/update it during each poll(). An obvious result of this is that multiple tasks working on the same partition (e.g. same file, same table, ...) can lead to message duplication as their offsets are not in sync within the partition.

FileStreamSourceTask from the official Kafka repository is a good example on how reading offsets can be handled in a Source connector:

// stream is only null if the poll method has not run yet - only in this case the offset will be read!
if (stream == null) {
    ...
    Map<String, Object> offset = context.offsetStorageReader().offset(Collections.singletonMap(FILENAME_FIELD, filename));
    Object lastRecordedOffset = offset.get(POSITION_FIELD);
    ...
    streamOffset = (lastRecordedOffset != null) ? (Long) lastRecordedOffset : 0L;
}
    

Upvotes: 0

z.wang
z.wang

Reputation: 51

The comments of org.apache.kafka.connect.runtime.OffsetStorageWriter class says "Offset data should only be read during startup or reconfiguration of a task...", instead of being read in each execution of poll() method.

Upvotes: 2

Related Questions