Reputation: 1025
I have debezium embedded in spring boot app (kafka-less solution). I connect to mysql db and I store offset in the file.
.with("offset.storage","org.apache.kafka.connect.storage.FileOffsetBackingStore")
I don't want to commit the offset, when the message failed to process. Because there are cases, where I cannot process next changes, when the previous has not been processed.
I couldn't find any API which would allow me to do so. So the only idea is to disable offset commiting policy:
import io.debezium.engine.spi.OffsetCommitPolicy;
public class CustomOffsetCommitPolicy implements OffsetCommitPolicy {
@Override
public boolean performCommit(Map<String, ?> offsets) {
// Always return false to prevent automatic commits
return false;
}
}
And then manually update the offset in the file. However, it looks cumbersome. So maybe you have some better idea how to handle offset management is such a case.
Upvotes: 0
Views: 72