Reputation: 51
I am trying to use mode timestamp with MySQL, with limited rows as my table size is 2.6 GB.
Here are the connector properties that I am using:
{
"name": "jdbc_source_mysql_registration_query",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://localhost:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://localhost:8081",
"connection.url": "jdbc:mysql://localhost:3310/users?zeroDateTimeBehavior=ROUND&useCursorFetch=true&defaultFetchSize=1000&user=kotesh&password=kotesh",
"query": "SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)>='2018-11-28' ",
"mode": "timestamp",
"timestamp.column.name": "DateUpdated",
"validate.non.null": "false",
"topic.prefix": "mysql-prod-kot-"
}
}
I getting as below:
INFO TimestampIncrementingTableQuerier{table=null, query='SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)>='2018-11-28'', topicPrefix='mysql-prod-kot-', incrementingColumn='', timestampColumns=[DateUpdated]} prepared SQL query: SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)>='2018-11-28' WHERE
DateUpdated
> ? ANDDateUpdated
< ? ORDER BYDateUpdated
ASC (io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier:161) [2018-11-29 17:29:00,981] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table=null, query='SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)>='2018-11-28'', topicPrefix='mysql-prod-kot-', incrementingColumn='', timestampColumns=[DateUpdated]}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask:328) java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near 'WHEREDateUpdated
> '1970-01-01 00:00:00.0' ANDDateUpdated
< '2018-11-29 17' at line 1
Upvotes: 3
Views: 3240
Reputation: 39850
This happens because you are trying to use both "mode": "timestamp"
and query
. TimestampIncrementingTableQuerier
appends a WHERE
clause to the query that conflicts with existing WHERE
clauses in the query
.
JDBC source connector docs is clear on this:
query
If specified, the query to perform to select new or updated rows. Use this setting if you want to join tables, select subsets of columns in a table, or filter data. If used, this connector will only copy data using this query -- whole-table copying will be disabled. Different query modes may still be used for incremental updates, but in order to properly construct the incremental query, it must be possible to append a WHERE clause to this query (i.e. no WHERE clauses may be used). If you use a WHERE clause, it must handle incremental queries itself.
As a workaround, you can modify your query to (depending on what SQL flavour are you using)
SELECT * FROM ( SELECT * FROM table WHERE ...)
or
WITH a AS
SELECT * FROM b
WHERE ...
SELECT * FROM a
For example, in your case the query should be
"query":"SELECT * FROM (SELECT matriid,DateUpdated from users.employee WHERE date(DateUpdated)>='2018-11-28') o"
Upvotes: 6
Reputation: 32100
The error is as shown:
java.sql.SQLSyntaxErrorException: You have an error in your SQL syntax;
check the manual that corresponds to your MySQL server version for the right syntax to use near
'WHERE `DateUpdated` > '1970-01-01 00:00:00.0' AND `DateUpdated` < '2018-11-29 17' at line 1
This is because you're using query
but also "mode": "timestamp"
and thus the connector tries to append it's own WHERE
clause when you have also specified one in the query, which results in the invalid SQL
Per docs for the JDBC source connector:
in order to properly construct the incremental query, it must be possible to append a WHERE clause to this query (i.e. no WHERE clauses may be used). If you use a WHERE clause, it must handle incremental queries itself.
Upvotes: 2