arstan
arstan

Reputation: 31

DEBEZIUM. Cannot ALTER table because field is not optional but has no default value

Short info about system:

  1. Target Postgres db with data. For this DB need to add CDC Debezium
  2. Kafka connect with zookepeer and debezium connector for change data capture.
  3. Source connector PostgreSQL. It's working fine and properly sends data to kafka connect.
  4. Sink connector for consume messages from kafka and insert it to destination PostgreSQL database

My sink connector config properties:

{
    "name": "sink-connector",
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics.regex": "public.*",
        "connection.url": "jdbc:postgresql://postgres:5432/db",
        "connection.username": "postgres",
        "connection.password": "postgres",
        "insert.mode": "insert",
        "table.name.format": "${topic}",
        "primary.key.mode": "none",
        "schema.evolution": "basic",
        "key.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "key.converter.schemas.enable": "true",
        "value.converter.schemas.enable": "true"
    }
}

When I add sink connector to kafka connect then I get this error: Caused by: java.sql.SQLException: Cannot ALTER table '[TableName]' because field '[PrimaryKeyOfTable]' is not optional but has no default value After adding connector destination table has been added to database, but only with one row. And this behavior is same for other tables.

Data in kafka broker:

{
    "schema": {
        "type": "struct",
        "fields": [
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "..."
                    },
                    /* List of fields */
                ],
                "optional": true,
                "name": "...",
                "field": "before"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "int32",
                        "optional": false,
                        "field": "...."
                    },
                    /* List of fields */
                ],
                "optional": true,
                "name": "...",
                "field": "after"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "version"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "connector"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "name"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "ts_ms"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "name": "io.debezium.data.Enum",
                        "version": 1,
                        "parameters": {
                            "allowed": "true,last,false,incremental"
                        },
                        "default": "false",
                        "field": "snapshot"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "db"
                    },
                    {
                        "type": "string",
                        "optional": true,
                        "field": "sequence"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "schema"
                    },
                    {
                        "type": "string",
                        "optional": false,
                        "field": "table"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "txId"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "lsn"
                    },
                    {
                        "type": "int64",
                        "optional": true,
                        "field": "xmin"
                    }
                ],
                "optional": false,
                "name": "io.debezium.connector.postgresql.Source",
                "field": "source"
            },
            {
                "type": "string",
                "optional": false,
                "field": "op"
            },
            {
                "type": "int64",
                "optional": true,
                "field": "ts_ms"
            },
            {
                "type": "struct",
                "fields": [
                    {
                        "type": "string",
                        "optional": false,
                        "field": "id"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "total_order"
                    },
                    {
                        "type": "int64",
                        "optional": false,
                        "field": "data_collection_order"
                    }
                ],
                "optional": true,
                "name": "event.block",
                "version": 1,
                "field": "transaction"
            }
        ],
        "optional": false,
        "name": "public.[TableName].Envelope",
        "version": 1
    },
    "payload": {
        "before": null,
        "after": {
            "Column1": 1,
            "Column2": "Val1"
            /* List of column names and its values */
        },
        "source": {
            "version": "2.3.0.Final",
            "connector": "postgresql",
            "name": "db",
            "ts_ms": 1689113287151,
            "snapshot": "false",
            "db": "postgres",
            "sequence": "[\"29921128\",\"29921128\"]",
            "schema": "public",
            "table": "[TableName]",
            "txId": 788,
            "lsn": 29921128,
            "xmin": null
        },
        "op": "u",
        "ts_ms": 1689113287337,
        "transaction": null
    }
}

I tried:

I want to see that CDC data in kafka broker will be consume and save in destination PostgreSQL Db.

Upvotes: 1

Views: 1704

Answers (1)

arstan
arstan

Reputation: 31

Debezium creates tables with mask: "{table_prefix}.{table_schema}.{table_name}". And it was main problem. To fix it you should add param "quote.identifiers": "true"

Upvotes: 2

Related Questions