Usernameless
Usernameless

Reputation: 61

Why I receive a lot of duplicates with debezium?

I'm testing Debezium platform in a local deployment with docker-compose. Here's my test case:

  1. run postgres, kafka, zookeeper and 3 replicas of debezium/connect:1.3
  2. configure connector in one of the replica with the following configs:
{
  "name": "database-connector",  
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector", 
    "plugin.name": "wal2json",
    "slot.name": "database",
    "database.hostname": "debezium_postgis_1", 
    "database.port": "5432", 
    "database.user": "postgres", 
    "database.password": "postgres", 
    "database.dbname" : "database", 
    "database.server.name": "database",
    "heartbeat.interval.ms": 5000,
    "table.whitelist": "public.outbox",
    "transforms.outbox.table.field.event.id": "event_uuid",
    "transforms.outbox.table.field.event.key": "event_name",
    "transforms.outbox.table.field.event.payload": "payload",
    "transforms.outbox.table.field.event.payload.id": "event_uuid",
    "transforms.outbox.route.topic.replacement": "${routedByValue}",
    "transforms.outbox.route.by.field": "topic",
    "transforms": "outbox",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "max.batch.size": 1,
    "offset.commit.policy": "io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy",
    "binary.handling.mode": "bytes"
  }
}
  1. run a script that executes 2000 insert in outbox table by calling this method from another class
    @Transactional
    public void write(String eventName, String topic, byte[] payload) {
        Outbox newRecord = new Outbox(eventName, topic, payload);
        repository.save(newRecord);
        repository.delete(newRecord);
    }
  1. After some seconds (when I see the first messages on Kafka), I kill the replica who's handling the stream. Let's say it delivered successfully 200 messages on the right topic.
  2. I get from the topic where debezium stores offsets the last offset message:
{
   "transaction_id": null,
   "lsn_proc": 24360992,
   "lsn": 24495808,
   "txId": 560,
   "ts_usec": 1595337502556806
}
  1. then I open a db shell and run the following SELECT slot_name, restart_lsn - pg_lsn('0/0') as restart_lsn, confirmed_flush_lsn - pg_lsn('0/0') as confirmed_flush_lsn FROM pg_replication_slots; and postgres reply:
[
  {
    "slot_name": "database",
    "restart_lsn": 24360856,
    "confirmed_flush_lsn": 24360992
  }
]
  1. After 5 minutes I killed the replica, Kafka rebalances connectors and it deploy a new running task on one of the living replicas.
  2. The new connector starts handling the stream, but it seems that it starts from the beginning because after it finish I found 2200 messages on Kafka. With that configuration (max.batch.size: 1 and AlwaysCommitPolicy) I expect to see max 2001 messages. Where am I wrong ?

Upvotes: 2

Views: 1758

Answers (1)

Usernameless
Usernameless

Reputation: 61

I found the problem in my configuration: "offset.commit.policy": "io.debezium.engine.spi.OffsetCommitPolicy.AlwaysCommitOffsetPolicy" works only with the Embedded API.

Moreover the debezium/connect:1.3 docker image has a default value for OFFSET_FLUSH_INTERVAL_MS of 1 minute. So if I stop the container within its first 1 minute, no offsets will be stored on kafka

Upvotes: 1

Related Questions