Reputation: 31
Short info about system:
My sink connector config properties:
{
"name": "sink-connector",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics.regex": "public.*",
"connection.url": "jdbc:postgresql://postgres:5432/db",
"connection.username": "postgres",
"connection.password": "postgres",
"insert.mode": "insert",
"table.name.format": "${topic}",
"primary.key.mode": "none",
"schema.evolution": "basic",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "true",
"value.converter.schemas.enable": "true"
}
}
When I add sink connector to kafka connect then I get this error:
Caused by: java.sql.SQLException: Cannot ALTER table '[TableName]' because field '[PrimaryKeyOfTable]' is not optional but has no default value
After adding connector destination table has been added to database, but only with one row. And this behavior is same for other tables.
Data in kafka broker:
{
"schema": {
"type": "struct",
"fields": [
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "..."
},
/* List of fields */
],
"optional": true,
"name": "...",
"field": "before"
},
{
"type": "struct",
"fields": [
{
"type": "int32",
"optional": false,
"field": "...."
},
/* List of fields */
],
"optional": true,
"name": "...",
"field": "after"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "version"
},
{
"type": "string",
"optional": false,
"field": "connector"
},
{
"type": "string",
"optional": false,
"field": "name"
},
{
"type": "int64",
"optional": false,
"field": "ts_ms"
},
{
"type": "string",
"optional": true,
"name": "io.debezium.data.Enum",
"version": 1,
"parameters": {
"allowed": "true,last,false,incremental"
},
"default": "false",
"field": "snapshot"
},
{
"type": "string",
"optional": false,
"field": "db"
},
{
"type": "string",
"optional": true,
"field": "sequence"
},
{
"type": "string",
"optional": false,
"field": "schema"
},
{
"type": "string",
"optional": false,
"field": "table"
},
{
"type": "int64",
"optional": true,
"field": "txId"
},
{
"type": "int64",
"optional": true,
"field": "lsn"
},
{
"type": "int64",
"optional": true,
"field": "xmin"
}
],
"optional": false,
"name": "io.debezium.connector.postgresql.Source",
"field": "source"
},
{
"type": "string",
"optional": false,
"field": "op"
},
{
"type": "int64",
"optional": true,
"field": "ts_ms"
},
{
"type": "struct",
"fields": [
{
"type": "string",
"optional": false,
"field": "id"
},
{
"type": "int64",
"optional": false,
"field": "total_order"
},
{
"type": "int64",
"optional": false,
"field": "data_collection_order"
}
],
"optional": true,
"name": "event.block",
"version": 1,
"field": "transaction"
}
],
"optional": false,
"name": "public.[TableName].Envelope",
"version": 1
},
"payload": {
"before": null,
"after": {
"Column1": 1,
"Column2": "Val1"
/* List of column names and its values */
},
"source": {
"version": "2.3.0.Final",
"connector": "postgresql",
"name": "db",
"ts_ms": 1689113287151,
"snapshot": "false",
"db": "postgres",
"sequence": "[\"29921128\",\"29921128\"]",
"schema": "public",
"table": "[TableName]",
"txId": 788,
"lsn": 29921128,
"xmin": null
},
"op": "u",
"ts_ms": 1689113287337,
"transaction": null
}
}
I tried:
I want to see that CDC data in kafka broker will be consume and save in destination PostgreSQL Db.
Upvotes: 1
Views: 1704
Reputation: 31
Debezium creates tables with mask: "{table_prefix}.{table_schema}.{table_name}". And it was main problem. To fix it you should add param "quote.identifiers": "true"
Upvotes: 2