JOo
JOo

Reputation: 11

How do I set a specific table in kafka sink connector?

I am using confluent kafka connector. I want to insert data into a specific table, TB_TEST_KAFKA, in the sink connector. I have already created the table.

auto.create=false.

I don't know a properties table name key ​​of the sink connector. I'm trying to insert into specific table, TB_TEST_KAFKA,

connector.class=io.confluent.connect.jdbc.JdbcSinkConnector
errors.log.include.messages=true
dialect.name=SqlServerDatabaseDialect
connection.password=####
tasks.max=1
topics=test-topic
auto.evolve=false
connection.user=kafkauser
auto.create=false
connection.url=jdbc:sqlserver://####:1433;databaseName=TEST
errors.log.enable=true
insert.mode=insert
db.name=TB_TEST_KAFKA

But error

Caused by: org.apache.kafka.connect.errors.ConnectException: Table "TB_TEST_KAFKA" is missing and auto-creation is disabled
at io.confluent.connect.jdbc.sink.DbStructure.create(DbStructure.java:116)
at io.confluent.connect.jdbc.sink.DbStructure.createOrAmendIfNecessary(DbStructure.java:68)
at io.confluent.connect.jdbc.sink.BufferedRecords.add(BufferedRecords.java:123)
at io.confluent.connect.jdbc.sink.JdbcDbWriter.write(JdbcDbWriter.java:73)
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:75)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:586)
... 10 more

Is there any way to put it into the table??

Upvotes: 1

Views: 1971

Answers (2)

gauti
gauti

Reputation: 567

You can implement TableNamingStrategy and override resolveTableName() method.

table.naming.strategy

io.debezium.connector.jdbc.naming.DefaultTableNamingStrategy

Specifies the fully-qualified class name of a TableNamingStrategy implementation that the connector uses to resolve table names from incoming event topic names.

The default behavior is to:

Replace the ${topic} placeholder in the table.name.format configuration property with the event’s topic.

Sanitize the table name by replacing dots (.) with underscores (_).

Upvotes: 0

Iskuskov Alexander
Iskuskov Alexander

Reputation: 4375

JDBC Sink connector use the topic name as the basis for the naming of the table that it populates. You can use the RegExRouter to customize the name of the table (add these lines to your connector properties):

transforms=renameTopic
transforms.renameTopic.type=org.apache.kafka.connect.transforms.RegexRouter
transforms.renameTopic.regex=test-topic
transforms.renameTopic.replacement=TB_TEST_KAFKA

Upvotes: 2

Related Questions