cicada_
cicada_

Reputation: 336

Documents not getting deleted in kafka streams

I'm trying to sync my PostgreSQL db to elasticsearch. But encountering some difficulties with the deletion of records.

Here's some information about what I'm trying to achieve.

  1. Get all the tables in to kafka topic through kafka connect (debezium postgres connector). Each table will have it's own topic.
  2. Do Stream processing on them. Like reading kafka topics as KTables, using SpecificAvroSerde, and join them to get a complete doc with embedded information.
  3. Write the result of join to an output topic.

Here's my source config :

{
    "name": "pg-source-1",
    "config": {
        "slot.name" : "debezium",
        "database.server.name": "cdc",
        "slot.drop_on_stop": true,
        "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
        "tasks.max": "1",
        "database.hostname": "postgres",
        "database.port": "5432",
        "database.user": "postgres",
        "database.password": "postgres",
        "database.dbname": "postgres",
        "schema.whitelist": "my_schema",
        "override.message.max.bytes": "524288000",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schemas.enable": "false",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "transforms": "unwrap",
        "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "key.converter.schema.registry.url": "http://schema-registry:8081",
        "database.history.kafka.topic": "schema-changes.my_schema
    }
}

So right now I have 3 table, product (PK id), product_category (only FKs, one references to product.id and other to category.id), category (PK id) Here product_category is a bridging table. And when I delete some relationship between product and categories i.e. a record in product_category table. It's not reflecting on the ES side.

These are the things that do work:

  1. Updating product name in products table.
  2. Creating new entry in product_category table.

Here's ES sink connector config:

{
  "name": "es-sink-1",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "tasks.max": "1",
    "topics": "output-topic",
    "connection.url": "http://elasticsearch:9200",
    "type.name": "_doc",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "key.ignore": false,
    "transforms": "extract",
    "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
    "transforms.createKey.fields": "id",
    "transforms.extract.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
    "transforms.extract.field": "id",
    "behavior.on.null.values" : "delete"
  }
}

Here's the configuration of Streams application.

Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-application-1");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, 1000);
        props.put(CommonClientConfigs.METADATA_MAX_AGE_CONFIG, 500);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, SpecificAvroSerde.class);
        props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, true);

I followed this link.
Let me know if any more info is needed. Thanks :)

Upvotes: 1

Views: 437

Answers (1)

cicada_
cicada_

Reputation: 336

Was able to fix this after looking more into the documentation of debezium.

For a consumer to be able to process a delete event generated for a table that does not have a primary key, set the table’s REPLICA IDENTITY to FULL. When a table does not have a primary key and the table’s REPLICA IDENTITY is set to DEFAULT or NOTHING, a delete event has no before field.

So I just had to change the replica identity to FULL for the bridging table since it had no PKs only FKs.

There was also one more config change that I made in the above config :

 "transforms.unwrap.drop.tombstones":false,

After this I started receiving events with keys (for deleted records) and null values, just had to honor that in streams application.

Upvotes: 2

Related Questions