kotesh
kotesh

Reputation: 51

Confluent: ERROR Failed to run query for table TimestampIncrementingTableQuerier mysql-jdbc

I am trying to use mode timestamp with MySQL, with limited rows as my table size is 2.6 GB.

Here are the connector properties that I am using:

{
        "name": "jdbc_source_mysql_registration_query",
        "config": {
                 "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
                 "key.converter": "io.confluent.connect.avro.AvroConverter",
                 "key.converter.schema.registry.url": "http://localhost:8081",
                 "value.converter": "io.confluent.connect.avro.AvroConverter",
                 "value.converter.schema.registry.url": "http://localhost:8081",
                 "connection.url": "jdbc:mysql://localhost:3310/users?zeroDateTimeBehavior=ROUND&useCursorFetch=true&defaultFetchSize=1000&user=kotesh&password=kotesh",
                 "query": "SELECT matriid,DateUpdated  from users.employee WHERE date(DateUpdated)>='2018-11-28' ",
                 "mode": "timestamp",
                 "timestamp.column.name": "DateUpdated",
                 "validate.non.null": "false",
                 "topic.prefix": "mysql-prod-kot-"
        }
}

I getting as below:

INFO TimestampIncrementingTableQuerier{table=null, query='SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)>='2018-11-28'', topicPrefix='mysql-prod-kot-', incrementingColumn='', timestampColumns=[DateUpdated]} prepared SQL query: SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)>='2018-11-28' WHERE DateUpdated > ? AND DateUpdated < ? ORDER BY DateUpdated ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:161) [2018-11-29 17:29:00,981] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table=null, query='SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)>='2018-11-28'', topicPrefix='mysql-prod-kot-', incrementingColumn='', timestampColumns=[DateUpdated]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:328) java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHERE DateUpdated > '1970-01-01 00:00:00.0' AND DateUpdated < '2018-11-29 17' at line 1

Upvotes: 3

Views: 3240

Answers (2)

Giorgos Myrianthous
Giorgos Myrianthous

Reputation: 39850

This happens because you are trying to use both "mode": "timestamp" and query. TimestampIncrementingTableQuerier appends a WHERE clause to the query that conflicts with existing WHERE clauses in the query.

JDBC source connector docs is clear on this:

query

If specified, the query to perform to select new or updated rows. Use this setting if you want to join tables, select subsets of columns in a table, or filter data. If used, this connector will only copy data using this query -- whole-table copying will be disabled. Different query modes may still be used for incremental updates, but in order to properly construct the incremental query, it must be possible to append a WHERE clause to this query (i.e. no WHERE clauses may be used). If you use a WHERE clause, it must handle incremental queries itself.

As a workaround, you can modify your query to (depending on what SQL flavour are you using)

SELECT * FROM ( SELECT * FROM table WHERE ...)

or

WITH a AS
   SELECT * FROM b
    WHERE ...
SELECT * FROM a

For example, in your case the query should be

"query":"SELECT * FROM (SELECT matriid,DateUpdated  from users.employee WHERE date(DateUpdated)>='2018-11-28') o"

Upvotes: 6

Robin Moffatt
Robin Moffatt

Reputation: 32100

The error is as shown:

java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; 
check the manual that corresponds to your MySQL server version for the right syntax to use near 
'WHERE `DateUpdated` > '1970-01-01 00:00:00.0' AND `DateUpdated` < '2018-11-29 17' at line 1

This is because you're using query but also "mode": "timestamp" and thus the connector tries to append it's own WHERE clause when you have also specified one in the query, which results in the invalid SQL

Per docs for the JDBC source connector:

in order to properly construct the incremental query, it must be possible to append a WHERE clause to this query (i.e. no WHERE clauses may be used). If you use a WHERE clause, it must handle incremental queries itself.

Upvotes: 2

Related Questions