Reputation: 21
I'm using debezium-embedded 2.7.0(without kafka) to listen mysql binlog data and put in other storage system.
but the handlebatch method of ChangeConsumer excuted too often(every 5 seconds) and just handle 3 or 4 records each time,so I want to use cache list to merge.
class MysqlChangeBatchConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>> {
private List<ChangeEvent<String, String>> recordsCache = new ArrayList<>();
@Override
public void handleBatch(List<ChangeEvent<String, String>> records, DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) throws InterruptedException {
recordsCache.addAll(records);
if (recordsCache.size() >= 100) {
//put records to other storage
for (ChangeEvent<String, String> record : recordsCache) {
committer.markProcessed(record);
}
committer.markBatchFinished();
recordsCache.clear();
}
}
}
so committer.markProcessed(record) and committer.markBatchFinished() did not run every time when handlebatch method excuted,is it ok or will lead to offset problem?
Upvotes: 2
Views: 7