kuti
kuti

Reputation: 211

confluent - kafka-connect - JDBC source connector - ORA-00933: SQL command not properly ended

I've the following sql query in my kafka jdbc source connector properties file :

query=SELECT * FROM JENNY.WORKFLOW where ID = '565231'

If I run the same query in sql developer, it works fine and fetching the results. But if I use the same query in the "jdbc_workflow_connect.properties", getting the following error :

(io.confluent.connect.jdbc.source.JdbcSourceTaskConfig:223)
[2018-09-19 12:32:15,130] INFO WorkerSourceTask{id=Workflow-DB-source-0} 
Source task finished initialization and start 
(org.apache.kafka.connect.runtime.WorkerSourceTask:158)
[2018-09-19 12:32:15,328] ERROR Failed to run query for table 
TimestampIncrementingTableQuerier{name='null', query='SELECT * FROM 
JENNY.WORKFLOW where ID = '565231'', topicPrefix='workflow_data1', 
timestampColumn='null', incrementingColumn='ID'}: {} 
(io.confluent.connect.jdbc.source.JdbcSourceTask:247)
java.sql.SQLSyntaxErrorException: ORA-00933: SQL command not properly ended

at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:450)
at oracle.jdbc.driver.T4CTTIoer.processError(T4CTTIoer.java:399)
at oracle.jdbc.driver.T4C8Oall.processError(T4C8Oall.java:1017)
at oracle.jdbc.driver.T4CTTIfun.receive(T4CTTIfun.java:655)
at oracle.jdbc.driver.T4CTTIfun.doRPC(T4CTTIfun.java:249)
at oracle.jdbc.driver.T4C8Oall.doOALL(T4C8Oall.java:566)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:215)
at oracle.jdbc.driver.T4CPreparedStatement.doOall8(T4CPreparedStatement.java:58)
at oracle.jdbc.driver.T4CPreparedStatement.executeForDescribe(T4CPreparedStatement.java:776)
at oracle.jdbc.driver.OracleStatement.executeMaybeDescribe(OracleStatement.java:897)
at oracle.jdbc.driver.OracleStatement.doExecuteWithTimeout(OracleStatement.java:1034)
at oracle.jdbc.driver.OraclePreparedStatement.executeInternal(OraclePreparedStatement.java:3820)
at oracle.jdbc.driver.OraclePreparedStatement.executeQuery(OraclePreparedStatement.java:3867)
at oracle.jdbc.driver.OraclePreparedStatementWrapper.executeQuery(OraclePreparedStatementWrapper.java:1502)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.executeQuery(TimestampIncrementingTableQuerier.java:201)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:84)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:55)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:225)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:179)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:170)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:214)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

Here is my JDBC source connector properties file content :

name=Workflow-DB-source
connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
connection.password = ******
connection.url = jdbc:oracle:thin:@1.1.1.1:****/****
connection.user = *****
table.types=TABLE
query=SELECT * FROM JENNY.WORKFLOW where ID = '565231'

mode=incrementing
incrementing.column.name=ID
topic.prefix=workflow_data1
timestamp.delay.interval.ms=60000

transforms:createKey
transforms.createKey.type:org.apache.kafka.connect.transforms.ValueToKey
transforms.createKey.fields:ID

I'm using ojdbc7.jar

Observation :

If I remove the "WHERE" clause, the query is working fine(like below) :

SELECT * FROM JENNY.WORKFLOW

Please let me know if I'm doing something wrong or any modifications required for setting in the jdbc source connector.

Thanks in advance.

Upvotes: 1

Views: 1220

Answers (1)

Marmite Bomber
Marmite Bomber

Reputation: 21075

From the documentation of the JDBC Connect Configuration options you may read

If specified, the query to perform to select new or updated rows. Use this setting if you want to join tables, select subsets of columns in a table, or filter data. If used, this connector will only copy data using this query – whole-table copying will be disabled. Different query modes may still be used for incremental updates, but in order to properly construct the incremental query, it must be possible to append a WHERE clause to this query (i.e. no WHERE clauses may be used).

So if you realy want to consider only the part of the table with a given ID you must wrap the query as follows

 select * from (SELECT * FROM JENNY.WORKFLOW where ID = '565231')

But please be sure you checked the documentation of the Configuration Options and you know the role of the query parameter.

Upvotes: 2

Related Questions