Philippe Boileau
Philippe Boileau

Reputation: 21

Kafka connect JDBC sink isse with table name containing a " . "

I am trying to build a kafka connect jdbc sink connector. The issue is, the database table name contains a dot and when the connector is created, the process splits the table name in two leading to unfound database table. I tried multiple things to escape the dot so it can be read as a string in the table name but nothing worked ..

Here is the actual name :

"table.name.format":"Bte3_myname.centrallogging",

here is the error :

Caused by: org.apache.kafka.connect.errors.ConnectException: Table \"Bte3_myname\".\"centrallogrecord\" is missing.

Here is my config file :

{
    "name": "jdbc-connect-central-logging-sink",
    "config": 
    {
        "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
        "tasks.max": "3",
        "topics": "central_logging",

        "connection.url": "...",         
        "connection.user": "...",
        "connection.password": "...",
        "table.name.format":"Bte3_myname.centrallogging",
        "pk.mode": "kafka",

        "auto.create": "false",
        "auto.evolve": "false"
    }
}

Would someone have any idea about how to parse that correctly in the config file ?

Thanks a lot !

Upvotes: 1

Views: 5730

Answers (2)

Dmitry
Dmitry

Reputation: 56

In case when a topic name contains dots due to naming reasons, but the table name is just a part of it like topic.prefix.MY_TABLE_NAME.topic.suffix, then the sink connector can be configured with a RegexRouter transformation, which can extract MY_TABLE_NAME for sinking operations.

The transformation may look like:

"transforms": "changeTopicName",
"transforms.changeTopicName.type": org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeTopicName.regex": "topic.prefix.(MY_.*).topic.suffix",
"transforms.changeTopicName.replacement": "$1",

then the connector will use MY_TABLE_NAME as a table name.

P.S. Indeed, the regex should be defined smarter, but it's up to the case, right? ;)

Upvotes: 3

selston
selston

Reputation: 1

If bte3_myname is actually your schema, this may work

"table.name.format": "bte3_myname.${topic}"

(give or take one extra underscore).

I also notice you are using mixed case - so you may need to set "quote.sql.identifiers" accordingly.

Upvotes: 0

Related Questions