Reputation: 1096
I am using Kafka JDBC sink connector to sink data to Azure SQL server. I have tested the connector with one database and it worked fine but when I added more databases, I started seeing the following error:
USE statement is not supported to switch between databases. Use a new connection to connect to a different database.
Config:
tasks.max: 1
topics: topic_name
connection.url: jdbc:sqlserver://server:port;database=dbname;user=dbuser
connection.user: dbuser
connection.password: dbpass
transforms: unwrap
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones: false
auto.create: true
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: true
insert.mode: upsert
delete.enabled: true
pk.mode: record_key
Stack:
2020-12-10 11:56:36,990 ERROR WorkerSinkTask{id=NAME-sqlserver-jdbc-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted. Error: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: USE statement is not supported to switch between databases. Use a new connection to connect to a different database.
(org.apache.kafka.connect.runtime.WorkerSinkTask) [task-thread-NAME-sqlserver-jdbc-sink-0]
org.apache.kafka.connect.errors.ConnectException: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: USE statement is not supported to switch between databases. Use a new connection to connect to a different database.
at io.confluent.connect.jdbc.sink.JdbcSinkTask.put(JdbcSinkTask.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:560)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:323)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:226)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:198)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.sql.SQLException: com.microsoft.sqlserver.jdbc.SQLServerException: USE statement is not supported to switch between databases. Use a new connection to connect to a different database.
Upvotes: 1
Views: 1414
Reputation: 1096
I have identified the issue, I thought in the beginning that the issue is due to having multiple databases inside the db server but turned out that the topic name has prefix.dbo.table_name
in it. instead of just table_name
.
Hence, the connector is detecting prefix.dbo
as another database.
The solution is to use transform dropPrefix.
Example, to save data from topic hello.dbo.table1
,hello.dbo.table2
to table1
and table2
in the database, use the following config:
tasks.max: 1
topics: hello.dbo.table1, hello.dbo.table2
connection.url: jdbc:sqlserver://server:port;database=dbname;user=dbuser
connection.user: dbuser
connection.password: dbpass
transforms: dropPrefix,unwrap
transforms.dropPrefix.type: org.apache.kafka.connect.transforms.RegexRouter
transforms.dropPrefix.regex: hello\.dbo\.(.*)
transforms.dropPrefix.replacement: $1
transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
transforms.unwrap.drop.tombstones: false
auto.create: true
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: true
insert.mode: upsert
delete.enabled: true
pk.mode: record_key
Upvotes: 2
Reputation: 32090
If it doesn't work as a single connector then you'll need to create one connector per database.
Upvotes: 0