Reputation: 5014
I am using Kafka connect (confluentinc/cp-kafka-connect:5.4.0) and have MySQL connector installed in it. Basically the following Dockerfile:
FROM confluentinc/cp-kafka-connect:5.4.0
RUN echo "===> Installing MySQL connector" \
&& curl -k -SL "https://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.39.tar.gz" | tar -xzf - -C /usr/share/java/kafka-connect-jdbc/ --strip-components=1 mysql-connector-java-5.1.39/mysql-connector-java-5.1.39-bin.jar
I created a connector with the following configurations:
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{
"name": "test-connector",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://10.xx.yy.z:3306/demo_db?user=user&password=pass",
"topic.prefix": "test-connector-",
"mode":"incrementing",
"query":"$QUERY",
"incrementing.column.name": "eventId",
"validate.non.null": false
}
}'
I tried replacing the "$QUERY" mentioned above with the following query (pasting a formatted version for better readability):
SELECT * FROM (
select DISTINCT t1.id,
t1.name,
t1.email,
t1.department,
t1.modified
from
test as t1
LEFT OUTER JOIN test as t2 ON t1.id > -1
WHERE
t1.id > -1) something
The above connector gets created and works as expected. However, the below query fails. and
SELECT * FROM(
SELECT
HSC.id eventId,
HSC.actions,
HSC.cause,
HSC.metaData,
CRED.uid uidCard,
CRED.is_primary isPrimary,
CRED.type cardType,
PR.keys_ prin,
HOUSE.uid orgUid,
CHILD.uid childUid,
USER.first_name fName,
USER.last_name lName,
CHILD.biz_phone bizPh,
CHILD.cell_phone cellPh,
AOF.state aof_state,
RECORD.uid replacedCardUid
FROM
house_of_cards as HSC
INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id
INNER JOIN prin PR ON CRED.prin_id = PR.id
INNER JOIN child CHILD ON CRED.employee_id = CHILD.id
INNER JOIN user USER ON USER.id = CHILD.user_id
INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id
LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id
LEFT JOIN address AOF ON CRED.address_on_file = AOF.id
LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id
WHERE
HSC.type = 'cardActionKamen' ) playcards
EDIT: 1. Posting full error log (with modified sensitive data of course)
[2020-02-16 20:31:46,463] INFO AbstractConfig values:
batch.max.rows = 100
catalog.pattern = null
connection.attempts = 3
connection.backoff.ms = 10000
connection.password = null
connection.url = jdbc:mysql://10.xx.yy.z:3306/demo_db?user=user&password=pass
connection.user = null
db.timezone = UTC
dialect.name =
incrementing.column.name = eventId
mode = incrementing
numeric.mapping = null
numeric.precision.mapping = false
poll.interval.ms = 5000
query = SELECT * FROM( SELECT HSC.id eventId, HSC.actions, HSC.cause, HSC.metaData, CRED.uid uidCard, CRED.is_primary isPrimary, CRED.type cardType, PR.keys_ prin, HOUSE.uid orgUid, CHILD.uid childUid, USER.first_name fName, USER.last_name lName, CHILD.biz_phone bizPh, CHILD.cell_phone cellPh, AOF.state aof_state, RECORD.uid replacedCardUid FROM house_of_cards as HSC INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id INNER JOIN prin PR ON CRED.prin_id = PR.id INNER JOIN child CHILD ON CRED.employee_id = CHILD.id INNER JOIN user USER ON USER.id = CHILD.user_id INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id LEFT JOIN address AOF ON CRED.address_on_file = AOF.id LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id WHERE HSC.type = 'cardActionKamen' ) playcards
quote.sql.identifiers = ALWAYS
schema.pattern = null
table.blacklist = []
table.poll.interval.ms = 60000
table.types = [TABLE]
table.whitelist = []
timestamp.column.name = []
timestamp.delay.interval.ms = 0
topic.prefix = card-view-test
validate.non.null = false
(org.apache.kafka.common.config.AbstractConfig)
[2020-02-16 20:31:46,595] INFO AbstractConfig values:
(org.apache.kafka.common.config.AbstractConfig)
[2020-02-16 20:31:46,601] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Connector card-view-test config updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2020-02-16 20:31:46,602] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Rebalance started (org.apache.kafka.connect.runtime.distributed.WorkerCoordinator)
[2020-02-16 20:31:46,602] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] (Re-)joining group (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-02-16 20:31:46,609] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Successfully joined group with generation 295 (org.apache.kafka.clients.consumer.internals.AbstractCoordinator)
[2020-02-16 20:31:46,609] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Joined group at generation 295 with protocol version 2 and got assignment: Assignment{error=0, leader='connect-1-d82cc98e-5
619-4972-ad78-3b0a53c3b5bb', leaderUrl='http://10.xx.yy.36:8083/', offset=433, connectorIds=[card-view-test], taskIds=[card-view-test-0], revokedConnectorIds=[], revokedTaskIds=[], delay=0} with rebalance delay: 0 (org.apache
.kafka.connect.runtime.distributed.DistributedHerder)
[2020-02-16 20:31:46,609] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Starting connectors and tasks using config offset 433 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2020-02-16 20:31:46,609] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Starting connector card-view-test (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2020-02-16 20:31:46,609] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Starting task card-view-test-0 (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2020-02-16 20:31:46,610] INFO Creating task card-view-test-0 (org.apache.kafka.connect.runtime.Worker)
[2020-02-16 20:31:46,610] INFO ConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = card-view-test
tasks.max = 1
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig)
[2020-02-16 20:31:46,610] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = card-view-test
tasks.max = 1
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2020-02-16 20:31:46,610] INFO Creating connector card-view-test of type io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker)
[2020-02-16 20:31:46,610] INFO ConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = card-view-test
tasks.max = 1
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig)
[2020-02-16 20:31:46,610] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = card-view-test
tasks.max = 1
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig)
[2020-02-16 20:31:46,610] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = card-view-test
tasks.max = 1
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2020-02-16 20:31:46,610] INFO TaskConfig values:
task.class = class io.confluent.connect.jdbc.source.JdbcSourceTask
(org.apache.kafka.connect.runtime.TaskConfig)
[2020-02-16 20:31:46,610] INFO Instantiated task card-view-test-0 with version 5.4.0 of type io.confluent.connect.jdbc.source.JdbcSourceTask (org.apache.kafka.connect.runtime.Worker)
[2020-02-16 20:31:46,610] INFO Instantiated connector card-view-test with version 5.4.0 of type class io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker)
[2020-02-16 20:31:46,611] INFO Starting JDBC Source Connector (io.confluent.connect.jdbc.JdbcSourceConnector)
[2020-02-16 20:31:46,611] INFO JdbcSourceConnectorConfig values:
batch.max.rows = 100
catalog.pattern = null
connection.attempts = 3
connection.backoff.ms = 10000
connection.password = null
connection.url = jdbc:mysql://10.xx.yy.z:3306/demo_db?user=user&password=pass
connection.user = null
db.timezone = UTC
dialect.name =
incrementing.column.name = eventId
mode = incrementing
numeric.mapping = null
numeric.precision.mapping = false
poll.interval.ms = 5000
query = SELECT * FROM( SELECT HSC.id eventId, HSC.actions, HSC.cause, HSC.metaData, CRED.uid uidCard, CRED.is_primary isPrimary, CRED.type cardType, PR.keys_ prin, HOUSE.uid orgUid, CHILD.uid childUid, USER.first_name fName, USER.last_name lName, CHILD.biz_phone bizPh, CHILD.cell_phone cellPh, AOF.state aof_state, RECORD.uid replacedCardUid FROM house_of_cards as HSC INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id INNER JOIN prin PR ON CRED.prin_id = PR.id INNER JOIN child CHILD ON CRED.employee_id = CHILD.id INNER JOIN user USER ON USER.id = CHILD.user_id INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id LEFT JOIN address AOF ON CRED.address_on_file = AOF.id LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id WHERE HSC.type = 'cardActionKamen' ) playcards
quote.sql.identifiers = ALWAYS
schema.pattern = null
table.blacklist = []
table.poll.interval.ms = 60000
table.types = [TABLE]
table.whitelist = []
tables = []
timestamp.column.name = []
timestamp.delay.interval.ms = 0
topic.prefix = card-view-test-25
validate.non.null = false
(io.confluent.connect.jdbc.source.JdbcSourceTaskConfig)
[2020-02-16 20:31:46,621] INFO Using JDBC dialect MySql (io.confluent.connect.jdbc.source.JdbcSourceTask)
[2020-02-16 20:31:46,681] INFO Started JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask)
[2020-02-16 20:31:46,681] INFO WorkerSourceTask{id=card-view-test-0} Source task finished initialization and start (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-02-16 20:31:46,681] INFO Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider)
[2020-02-16 20:31:46,723] INFO [Producer clientId=connector-producer-card-view-test-0] Cluster ID: XXOJHDDIEHID-YYGDIEDi82eeh (org.apache.kafka.clients.Metadata)
[2020-02-16 20:31:46,732] INFO Starting thread to monitor tables. (io.confluent.connect.jdbc.source.TableMonitorThread)
[2020-02-16 20:31:46,735] INFO Finished creating connector card-view-test (org.apache.kafka.connect.runtime.Worker)
[2020-02-16 20:31:46,736] INFO SourceConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = card-view-test
tasks.max = 1
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.SourceConnectorConfig)
[2020-02-16 20:31:46,737] INFO EnrichedConnectorConfig values:
config.action.reload = restart
connector.class = io.confluent.connect.jdbc.JdbcSourceConnector
errors.log.enable = false
errors.log.include.messages = false
errors.retry.delay.max.ms = 60000
errors.retry.timeout = 0
errors.tolerance = none
header.converter = null
key.converter = null
name = card-view-test
tasks.max = 1
transforms = []
value.converter = null
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig)
[2020-02-16 20:31:46,949] INFO Begin using SQL query: SELECT * FROM( SELECT HSC.id eventId, HSC.actions, HSC.cause, HSC.metaData, CRED.uid uidCard,
CRED.is_primary isPrimary, CRED.type cardType, PR.keys_ prin, HOUSE.uid orgUid, CHILD.uid childUid, USER.first_name fName,
USER.last_name lName, CHILD.biz_phone bizPh, CHILD.cell_phone cellPh, AOF.state aof_state, RECORD.uid replacedCardUid
FROM house_of_cards as HSC INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id INNER JOIN prin PR ON CRED.prin_id = PR.id
INNER JOIN child CHILD ON CRED.employee_id = CHILD.id INNER JOIN user USER ON USER.id = CHILD.user_id INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id
LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id LEFT JOIN address AOF ON CRED.address_on_file = AOF.id
LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id WHERE HSC.type = 'cardActionKamen' ) playcards events WHERE `eventId` > ? ORDER BY `eventId` ASC (io.confluent.connect.jdbc.source.TableQuerier)
[2020-02-16 20:31:46,954] ERROR Failed to run query for table TimestampIncrementingTableQuerier{table=null, query='SELECT * FROM( SELECT HSC.id eventId, HSC.actions, HSC.cause, HSC.metaData, CRED.uid uidCard,
CRED.is_primary isPrimary, CRED.type cardType, PR.keys_ prin, HOUSE.uid orgUid, CHILD.uid childUid, USER.first_name fName,
USER.last_name lName, CHILD.biz_phone bizPh, CHILD.cell_phone cellPh, AOF.state aof_state, RECORD.uid replacedCardUid
FROM house_of_cards as HSC INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id INNER JOIN prin PR ON CRED.prin_id = PR.id
INNER JOIN child CHILD ON CRED.employee_id = CHILD.id INNER JOIN user USER ON USER.id = CHILD.user_id INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id
LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id LEFT JOIN address AOF ON CRED.address_on_file = AOF.id
LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id WHERE HSC.type = "cardActionKamen" )', topicPrefix='card-view-test-25', incrementingColumn='eventId', timestampColumns=[]
}: {} (io.confluent.connect.jdbc.source.JdbcSourceTask)
com.mysql.jdbc.exceptions.jdbc4.MySQLSyntaxErrorException: Table 'demo_db.HSC' doesn't exist
at sun.reflect.GeneratedConstructorAccessor48.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.jdbc.Util.handleNewInstance(Util.java:404)
at com.mysql.jdbc.Util.getInstance(Util.java:387)
at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:942)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3966)
at com.mysql.jdbc.MysqlIO.checkErrorPacket(MysqlIO.java:3902)
at com.mysql.jdbc.MysqlIO.sendCommand(MysqlIO.java:2526)
at com.mysql.jdbc.MysqlIO.sqlQueryDirect(MysqlIO.java:2673)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2545)
at com.mysql.jdbc.ConnectionImpl.execSQL(ConnectionImpl.java:2503)
at com.mysql.jdbc.StatementImpl.executeQuery(StatementImpl.java:1369)
at com.mysql.jdbc.Field.getCollation(Field.java:448)
at com.mysql.jdbc.ResultSetMetaData.isCaseSensitive(ResultSetMetaData.java:552)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumn(GenericDatabaseDialect.java:713)
at io.confluent.connect.jdbc.dialect.GenericDatabaseDialect.describeColumns(GenericDatabaseDialect.java:655)
at io.confluent.connect.jdbc.source.SchemaMapping.create(SchemaMapping.java:63)
at io.confluent.connect.jdbc.source.TableQuerier.maybeStartQuery(TableQuerier.java:94)
at io.confluent.connect.jdbc.source.TimestampIncrementingTableQuerier.maybeStartQuery(TimestampIncrementingTableQuerier.java:61)
at io.confluent.connect.jdbc.source.JdbcSourceTask.poll(JdbcSourceTask.java:315)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:265)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:232)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:177)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:227)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2020-02-16 20:31:47,109] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Tasks [card-view-test-0] configs updated (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2020-02-16 20:31:47,611] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Finished starting connectors and tasks (org.apache.kafka.connect.runtime.distributed.DistributedHerder)
[2020-02-16 20:31:47,613] INFO [Worker clientId=connect-1, groupId=alpha-core-kafka-connect-wrapper] Handling task config update by restarting tasks [card-view-test-0] (org.apache.kafka.connect.runtime.distributed.Distribute
dHerder)
[2020-02-16 20:31:47,613] INFO Stopping task card-view-test-0 (org.apache.kafka.connect.runtime.Worker)
[2020-02-16 20:31:47,614] INFO Stopping JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask)
[2020-02-16 20:31:47,656] INFO Closing resources for JDBC source task (io.confluent.connect.jdbc.source.JdbcSourceTask)
[2020-02-16 20:31:47,656] INFO Closing connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider)
[2020-02-16 20:31:47,657] INFO WorkerSourceTask{id=card-view-test-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-02-16 20:31:47,657] INFO WorkerSourceTask{id=card-view-test-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
[2020-02-16 20:31:47,657] INFO [Producer clientId=connector-producer-card-view-test-0] Closing the Kafka producer with timeoutMillis = 30000 ms. (org.apache.kafka.clients.producer.KafkaProducer)
EDIT 2. Adding connetor config
{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "eventId",
"topic.prefix": "test-connector",
"validate.non.null": "false",
"query": "SELECT * FROM( SELECT HSC.id eventId, HSC.actions, HSC.cause, HSC.metaData, CRED.uid uidCard, CRED.is_primary isPrimary, CRED.type cardType, PR.keys_ prin, HOUSE.uid orgUid, CHILD.uid childUid, USER.first_name fName, USER.last_name lName, CHILD.biz_phone bizPh, CHILD.cell_phone cellPh, AOF.state aof_state, RECORD.uid replacedCardUid FROM house_of_cards as HSC INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id INNER JOIN prin PR ON CRED.prin_id = PR.id INNER JOIN child CHILD ON CRED.employee_id = CHILD.id INNER JOIN user USER ON USER.id = CHILD.user_id INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id LEFT JOIN address AOF ON CRED.address_on_file = AOF.id LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id WHERE HSC.type = 'cardActionKamen' ) playcards",
"name": "test-connector-",
"connection.url": "dbc:mysql://10.xx.yy.z:3306/demo_db?user=user&password=pass"
}
Both the queries have WHERE clause, use aliases, have JOINS, etc. Their functionalities seem identical to me, how come one works and the other doesn't? Is there any limit on the length of the query? Is this related to the version of Mysql (I am using 5.7 provided by GCP) or the connector version (I have downloaded 5.1.39 as you can see in the above Dockerfile)?
I have also tried creating a MySQL view of both the subqueries (the sub-SELECT queries) mentioned in the above queries and that works okay for both the queries. Any idea of what might be the issue here?
Upvotes: 1
Views: 2037
Reputation: 4375
I tried to reproduce your case.
demo_db
:mysql> show tables;
+-------------------+
| Tables_in_demo_db |
+-------------------+
| address |
| child |
| house |
| house_of_cards |
| play_card |
| prin |
| user |
+-------------------+
7 rows in set (0.00 sec)
curl --location --request PUT 'http://myhost:10900/connectors/test-connector/config' \
--header 'Accept: application/json' \
--header 'Content-Type: application/json' \
--data-raw '{
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"mode": "incrementing",
"incrementing.column.name": "eventId",
"topic.prefix": "test-connector",
"validate.non.null": "false",
"query": "SELECT * FROM( SELECT HSC.id eventId, HSC.actions, HSC.cause, HSC.metaData, CRED.uid uidCard, CRED.is_primary isPrimary, CRED.type cardType, PR.keys_ prin, HOUSE.uid orgUid, CHILD.uid childUid, USER.first_name fName, USER.last_name lName, CHILD.biz_phone bizPh, CHILD.cell_phone cellPh, AOF.state aof_state, RECORD.uid replacedCardUid FROM house_of_cards as HSC INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id INNER JOIN prin PR ON CRED.prin_id = PR.id INNER JOIN child CHILD ON CRED.employee_id = CHILD.id INNER JOIN user USER ON USER.id = CHILD.user_id INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id LEFT JOIN address AOF ON CRED.address_on_file = AOF.id LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id WHERE HSC.type = '\''cardActionKamen'\'' ) playcards",
"connection.url": "jdbc:mysql://myhost:3306/demo_db?user=isk&password=123"
}'
[2020-02-17 02:17:17,589] INFO [test-connector|worker] Creating connector test-connector of type io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker:251)
[2020-02-17 02:17:17,592] INFO [test-connector|worker] Instantiated connector test-connector with version 5.3.1 of type class io.confluent.connect.jdbc.JdbcSourceConnector (org.apache.kafka.connect.runtime.Worker:254)
[2020-02-17 02:17:17,592] INFO [test-connector|worker] Starting JDBC Source Connector (io.confluent.connect.jdbc.JdbcSourceConnector:69)
[2020-02-17 02:17:17,592] INFO [test-connector|worker] JdbcSourceConnectorConfig values:
batch.max.rows = 100
catalog.pattern = null
connection.attempts = 3
connection.backoff.ms = 10000
connection.password = null
connection.url = jdbc:mysql://myhost:3306/demo_db?user=isk&password=123
connection.user = null
db.timezone = UTC
dialect.name =
incrementing.column.name = eventId
mode = incrementing
numeric.mapping = null
numeric.precision.mapping = false
poll.interval.ms = 5000
query = SELECT * FROM( SELECT HSC.id eventId, HSC.actions, HSC.cause, HSC.metaData, CRED.uid uidCard, CRED.is_primary isPrimary, CRED.type cardType, PR.keys_ prin, HOUSE.uid orgUid, CHILD.uid childUid, USER.first_name fName, USER.last_name lName, CHILD.biz_phone bizPh, CHILD.cell_phone cellPh, AOF.state aof_state, RECORD.uid replacedCardUid FROM house_of_cards as HSC INNER JOIN play_card as CRED ON HSC.debit_card_id = CRED.id INNER JOIN prin PR ON CRED.prin_id = PR.id INNER JOIN child CHILD ON CRED.employee_id = CHILD.id INNER JOIN user USER ON USER.id = CHILD.user_id INNER JOIN house HOUSE ON CHILD.house_id = HOUSE.id LEFT JOIN address SHPADDR ON CRED.shipping_address = SHPADDR.id LEFT JOIN address AOF ON CRED.address_on_file = AOF.id LEFT JOIN play_card RECORD ON CRED.replaced_card_id = RECORD.id WHERE HSC.type = 'cardActionKamen' ) playcards
quote.sql.identifiers = ALWAYS
schema.pattern = null
table.blacklist = []
table.poll.interval.ms = 60000
table.types = [TABLE]
table.whitelist = []
timestamp.column.name = []
timestamp.delay.interval.ms = 0
topic.prefix = test-connector
validate.non.null = false
(io.confluent.connect.jdbc.source.JdbcSourceConnectorConfig:347)
[2020-02-17 02:17:17,594] INFO [test-connector|worker] Attempting to open connection #1 to MySql (io.confluent.connect.jdbc.util.CachedConnectionProvider:87)
[2020-02-17 02:17:17,640] INFO [test-connector|worker] Starting thread to monitor tables. (io.confluent.connect.jdbc.source.TableMonitorThread:73)
[2020-02-17 02:17:17,641] INFO [test-connector|worker] Finished creating connector test-connector (org.apache.kafka.connect.runtime.Worker:273)
So there is no error and everything works fine.
Used versions:
curl -X GET http://localhost:10900 | jq '.version'
"2.4.0"
curl -X GET http://localhost:10900/connector-plugins | jq -c '.[] | select( .class == "io.confluent.connect.jdbc.JdbcSourceConnector") | .version'
"5.3.1"
8.0.19
Upvotes: 2