Reputation: 175
I want to load multiple tables data from oracle to postgres using kafka connect
Here is the source configuration i'm using:-
curl -i -X PUT http://localhost:8083/connectors/src_1/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:oracle:thin:@//localhost:1521/db",
"connection.user":"<user>",
"connection.password":"<psswd>",
"mode":"bulk",
"table.whitelist":"EMP,CUS",
"poll.interval.ms":7200000,
"quote.sql.identifiers":"never"
}'
with the above configuration i'm able to create two topics with the names "EMP" and "CUS". Now, I would like to load these data from topics to respective tables which are already created at postgres.
here is my sink configuration:-
curl -X PUT http://localhost:8083/connectors/tgt_1/config \
-H "Content-Type: application/json" \
-d '{
"connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:postgresql://localhost:5432/postgres",
"connection.user":"<user>",
"connection.password":"<passwd>",
"tasks.max" : "1",
"topics":"CUS,EMP",
"insert.mode":"insert",
"quote.sql.identifiers":"never"
}'
with the above configuration i'm facing an error called mismatched table or auto creation is disabled. But When I tried to create separate sink connector and setting the attribute table.name.format
with the table name in lower case it is working fine.
So, I tried to to provide a table name in table.whitelist
in smaller case so that the topic will be created in smaller case and it can be matched with tablename at target. But, the source connector neither throwing an error nor running the tasks.
can anyone please provide me a suitable solution for my issue. Thanks!
Upvotes: 0
Views: 1259
Reputation: 4375
I guess your PostgreSQL sink table names are in lower case (cus
, emp
) and they are not equal to the topic names (CUS
, EMP
).
Kafka Connect JDBC Connector uses getTables()
method to check if a table exists, where tableNamePattern
param is case sensitive (according the docs: must match the table name as it is stored in the database
).
You can use ChangeTopicCase
transformation from Kafka Connect Common Transformations. Another option is to rename the sink tables in upper case.
Upvotes: 1