wulitaotao
wulitaotao

Reputation: 21

Can I merge handlebatch in debezium-embedded?

I'm using debezium-embedded 2.7.0(without kafka) to listen mysql binlog data and put in other storage system.

but the handlebatch method of ChangeConsumer excuted too often(every 5 seconds) and just handle 3 or 4 records each time,so I want to use cache list to merge.

class MysqlChangeBatchConsumer implements DebeziumEngine.ChangeConsumer<ChangeEvent<String, String>> {
        private List<ChangeEvent<String, String>> recordsCache = new ArrayList<>();

        @Override
        public void handleBatch(List<ChangeEvent<String, String>> records, DebeziumEngine.RecordCommitter<ChangeEvent<String, String>> committer) throws InterruptedException {

            recordsCache.addAll(records);

            if (recordsCache.size() >= 100) {

                //put records to other storage

                for (ChangeEvent<String, String> record : recordsCache) {
                    committer.markProcessed(record);
                }

                committer.markBatchFinished();

                recordsCache.clear();
            }
        }
    }

so committer.markProcessed(record) and committer.markBatchFinished() did not run every time when handlebatch method excuted,is it ok or will lead to offset problem?

Upvotes: 2

Views: 7

Answers (0)

Related Questions