Reputation: 1
I have two kafka connector : debezium source and sink postgres. I would like to add a record on bdd kc-source, the record would be on db kc-sink, it is use the same topic.
kafka connector sink postgres
{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.password":"***",
"connection.user":"postgres",
"topics":"post.public.test",
"name":"kc-sink",
"auto.create":"false",
"connection.url":"***"
"database.hostname":"postgresql-sink",
"auto.create: false"
}
kafka connector postgres debezium
{"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
"database.dbname":"postgres",
"database.user":"postgres",
"topic.prefix":"post",
"database.hostname":"postgresql-source",
"database.password":"***",
"name":"kc-source"
"Transforms": "extractField"
"transforms.extractField.field": "after"
"transforms.extractField.type": "org.apache.kafka.connect.transforms.ExtractField$Value"
}
I use filter transform to get the payload, when I add row, the consumer has :
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":true,"field":"value"}],"optional":true,"name":"postgresql.public.test.Value"},"payload":{"id":20,"value":"AAA"}}
The record works :
{"schema":{"type":"struct","fields":[{"type":"string","field":"value"}]},"payload":{"value":"AAA"}}
So how to convert, the wrong row record to the correct one.
unfortunately I got this error message
Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "postgresql"."public"."test" ("id","value") VALUES (('22'::int4),('4F677F7F-5C16-4996-9736-B0E93DFA6E49')) was aborted: ERROR: cross-database references are not implemented: "postgresql.public.test" Position: 13 Call getNextException to see other errors in the batch. org.postgresql.util.PSQLException: ERROR: cross-database references are not implemented: "postgresql.public.test" Position: 13 org.postgresql.util.PSQLException: ERROR: cross-database references are not implemented: "postgresql.public.test" Position: 13
The kafka connect config :
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: true
So how to convert, the wrong row record to the correct one.
Upvotes: 0
Views: 47