Reputation: 5050
I'm using the JDBC connector to move data from MySQL into Kafka. The data I'm interested in is coming from a select joining 3 tables, therefore I've configured my connector with mode:incrementing
and query
:
{
"name": "stats",
"config": {
"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
"key.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
"connection.url": "jdbc:mysql://DB_HOST:3306/SCHEMA?user=USER&password=PASSWORD&zeroDateTimeBehavior=CONVERT_TO_NULL&useSSL=false",
"mode": "incrementing",
"validate.non.null": "false",
"topic.prefix": "t",
"incrementing.column.name": "s.id",
"transforms": "createKey,extractString",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.createKey.fields": "uuid",
"transforms.extractString.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
"transforms.extractString.field": "uuid",
"quote.sql.identifiers":"never",
"query": "select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id",
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true",
"batch.max.rows": "100",
"poll.interval.ms": "60000"
}
}
When checking the connector status I get that is running:
curl http://conncet:8083/connectors/stats/status
{
"name": "stats",
"connector": {
"state": "RUNNING",
"worker_id": "connect-3:38083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "connect-1:18083"
}
],
"type": "source"
}
But after an hour I still don't see the topic created. I've checked in MySQL which queries are running with show full processlist;
and I see two queries like this:
select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id WHERE s.id > -1 ORDER BY s.id ASC
So basically the query is the same as the query I provided in query
in the connector configuration plus WHERE s.id > -1 ORDER BY s.id ASC
, since the query in this join produces a resultset of 21 millon rows MySQL is sending the Data for a long time. When I check again with show full processlist;
I see now 4 queries like this, and then 8, and then 16, and so on.
The questions are:
s.id > -1 ORDER BY s.id ASC
. "batch.max.rows": "100"
only controlling the batch size after the initial poll??Update:
There is an open topic for this issue. I think this question can be closed.
Upvotes: 3
Views: 4670
Reputation: 41
query.suffix was added in 5.5. I used it to add a limit statement and it worked great, it just appends the limit to the end of the query.
see issue
Upvotes: 4
Reputation: 6593
JDBC Source Connector with incrementing
mode
and passed query
,
execute that query with following where clause: WHERE incrementingColumnName > lastIncrementedValue ORDER BY incrementingColumnName ASC
.
(if you use incremental mode and query you can't pass where
clause there).
At first poll lastIncrementedValue
is -1, so it try to query all records. After extracting each records lastIncrementedValue increases, so next querying will only poll new data.
batch.max.rows
refers to how many records SourceTask::poll(...)
will return to Kafka Connect framework.
It is the max size of the batch that will be sent to Kafka at once.
I think, when you fetch data from single table it works faster, because the query executes faster (less complicated). If you execute those queries using other SQL tools it would perform similar.
Upvotes: 1