Reputation: 61
I'm testing Debezium platform in a local deployment with docker-compose. Here's my test case:
{
"name": "database-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"plugin.name": "wal2json",
"slot.name": "database",
"database.hostname": "debezium_postgis_1",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "database",
"database.server.name": "database",
"heartbeat.interval.ms": 5000,
"table.whitelist": "public.outbox",
"transforms.outbox.table.field.event.id": "event_uuid",
"transforms.outbox.table.field.event.key": "event_name",
"transforms.outbox.table.field.event.payload": "payload",
"transforms.outbox.table.field.event.payload.id": "event_uuid",
"transforms.outbox.route.topic.replacement": "${routedByValue}",
"transforms.outbox.route.by.field": "topic",
"transforms": "outbox",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"max.batch.size": 1,
"offset.commit.policy": "io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy",
"binary.handling.mode": "bytes"
}
}
@Transactional
public void write(String eventName, String topic, byte[] payload) {
Outbox newRecord = new Outbox(eventName, topic, payload);
repository.save(newRecord);
repository.delete(newRecord);
}
{
"transaction_id": null,
"lsn_proc": 24360992,
"lsn": 24495808,
"txId": 560,
"ts_usec": 1595337502556806
}
SELECT slot_name, restart_lsn - pg_lsn('0/0') as restart_lsn, confirmed_flush_lsn - pg_lsn('0/0') as confirmed_flush_lsn FROM pg_replication_slots;
and postgres reply:[
{
"slot_name": "database",
"restart_lsn": 24360856,
"confirmed_flush_lsn": 24360992
}
]
max.batch.size: 1
and AlwaysCommitPolicy
) I expect to see max 2001 messages.
Where am I wrong ?Upvotes: 2
Views: 1758
Reputation: 61
I found the problem in my configuration:
"offset.commit.policy": "io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy"
works only with the Embedded API.
Moreover the debezium/connect:1.3 docker image has a default value for OFFSET_FLUSH_INTERVAL_MS of 1 minute. So if I stop the container within its first 1 minute, no offsets will be stored on kafka
Upvotes: 1