locklockls
locklockls

Reputation: 1

Kafka Connector use the same topic between debezium source and sink postgres

I have two kafka connector : debezium source and sink postgres. I would like to add a record on bdd kc-source, the record would be on db kc-sink, it is use the same topic.

kafka connector sink postgres

    {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
    "connection.password":"***",
    "connection.user":"postgres",
    "topics":"post.public.test",
    "name":"kc-sink",
    "auto.create":"false",
    "connection.url":"***"
    "database.hostname":"postgresql-sink",
    "auto.create: false"
}

kafka connector postgres debezium

    {"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
    "database.dbname":"postgres",
    "database.user":"postgres",
    "topic.prefix":"post",
    "database.hostname":"postgresql-source",
    "database.password":"***",
    "name":"kc-source"
    "Transforms":  "extractField"
    "transforms.extractField.field":  "after"
    "transforms.extractField.type":   "org.apache.kafka.connect.transforms.ExtractField$Value"
}

I use filter transform to get the payload, when I add row, the consumer has :


    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"default":0,"field":"id"},{"type":"string","optional":true,"field":"value"}],"optional":true,"name":"postgresql.public.test.Value"},"payload":{"id":20,"value":"AAA"}}

The record works :

    {"schema":{"type":"struct","fields":[{"type":"string","field":"value"}]},"payload":{"value":"AAA"}}

So how to convert, the wrong row record to the correct one.

unfortunately I got this error message

Caused by: org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: Exception chain: java.sql.BatchUpdateException: Batch entry 0 INSERT INTO "postgresql"."public"."test" ("id","value") VALUES (('22'::int4),('4F677F7F-5C16-4996-9736-B0E93DFA6E49')) was aborted: ERROR: cross-database references are not implemented: "postgresql.public.test" Position: 13 Call getNextException to see other errors in the batch. org.postgresql.util.PSQLException: ERROR: cross-database references are not implemented: "postgresql.public.test" Position: 13 org.postgresql.util.PSQLException: ERROR: cross-database references are not implemented: "postgresql.public.test" Position: 13

The kafka connect config :

    value.converter:                    org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable:     true

So how to convert, the wrong row record to the correct one.

Upvotes: 0

Views: 47

Answers (0)

Related Questions