Reputation: 175
I use debezium to sync data from Postgres to flink, and create engine use this code
this.engine = DebeziumEngine.create(Connect.class)
.using(properties)
.notifying(debeziumConsumer)
.using((success, message, error) -> {
if (!success && error != null) {
this.reportError(error);
}
})
.build();
I want to call ChangeEventSourceCoordinator#commitOffset
when flink doing checkpoint, but coordinator
is private in BaseSourceTask
and task
is private in EmbeddedEngine
, so I can't call commitOffset
in my code, Is there any other way to achieve manual commit?
public final class EmbeddedEngine implements DebeziumEngine<SourceRecord>{
private SourceTask task;
}
public abstract class BaseSourceTask extends SourceTask {
private ChangeEventSourceCoordinator coordinator;
}
Upvotes: 1
Views: 400
Reputation: 2460
I was hitting a similar issue thinking I needed this ability because I was not observing checkpoints pushed to the server in testing.
I learned that the methods DebeziumEngine ChangeConsumer handleBatch RecordCommitter markProcessed(record)
and markBatchFinished()
are responsible for conveying the offsets to the ChangeEventSourceCoordinator
implementation for commitOffset
.
The issue for me ended up being that my test was not waiting long enough after committer.markBatchFinished()
for the ChangeEventSourceCoordinator
to invoke commitOffset
to the server. My tests were behaving like a checkpoint was never set at the server because, in fact, it wasn't.
I ran some tests with OffsetCommitPolicy.always()
and observed the behavior I was expecting to see.
So, in general, it appears that you do not need to manually ever push these to the server, but you may need to tune your OffsetCommitPolicy
for the engine depending on your workflow. I think OffsetCommitPolicy.always()
would get you as close to the behavior you want but there is still an asynchronous aspect to when you commit is actually pushed.
This test case helped explain YugabyteDBExplicitCheckpointingTest.java
Upvotes: 0