eric zhao
eric zhao

Reputation: 175

debezium how to commit offset manually

I use debezium to sync data from Postgres to flink, and create engine use this code

this.engine = DebeziumEngine.create(Connect.class)
            .using(properties)
            .notifying(debeziumConsumer)
            .using((success, message, error) -> {
                if (!success && error != null) {
                    this.reportError(error);
                }
            })
            .build();

I want to call ChangeEventSourceCoordinator#commitOffset when flink doing checkpoint, but coordinator is private in BaseSourceTask and task is private in EmbeddedEngine, so I can't call commitOffset in my code, Is there any other way to achieve manual commit?

public final class EmbeddedEngine implements DebeziumEngine<SourceRecord>{
       private SourceTask task;
}
public abstract class BaseSourceTask extends SourceTask {
       private ChangeEventSourceCoordinator coordinator;
}

Upvotes: 1

Views: 400

Answers (1)

ribram
ribram

Reputation: 2460

I was hitting a similar issue thinking I needed this ability because I was not observing checkpoints pushed to the server in testing.

I learned that the methods DebeziumEngine ChangeConsumer handleBatch RecordCommitter markProcessed(record) and markBatchFinished() are responsible for conveying the offsets to the ChangeEventSourceCoordinator implementation for commitOffset.

The issue for me ended up being that my test was not waiting long enough after committer.markBatchFinished() for the ChangeEventSourceCoordinator to invoke commitOffset to the server. My tests were behaving like a checkpoint was never set at the server because, in fact, it wasn't.

I ran some tests with OffsetCommitPolicy.always() and observed the behavior I was expecting to see.

So, in general, it appears that you do not need to manually ever push these to the server, but you may need to tune your OffsetCommitPolicy for the engine depending on your workflow. I think OffsetCommitPolicy.always() would get you as close to the behavior you want but there is still an asynchronous aspect to when you commit is actually pushed.

This test case helped explain YugabyteDBExplicitCheckpointingTest.java

Upvotes: 0

Related Questions