lloiacono
lloiacono

Reputation: 5050

Kafka Connect JDBC connector query + incrementing mode chokes with large datasets at initial poll

I'm using the JDBC connector to move data from MySQL into Kafka. The data I'm interested in is coming from a select joining 3 tables, therefore I've configured my connector with mode:incrementing and query:

{
    "name": "stats",
    "config": {
        "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector",
        "key.converter": "io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
        "value.converter": "io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url": "http://schema-registry.kafka-broker:8081",
        "connection.url": "jdbc:mysql://DB_HOST:3306/SCHEMA?user=USER&password=PASSWORD&zeroDateTimeBehavior=CONVERT_TO_NULL&useSSL=false",
        "mode": "incrementing",
        "validate.non.null": "false",
        "topic.prefix": "t",
        "incrementing.column.name": "s.id",
        "transforms": "createKey,extractString",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "transforms.createKey.fields": "uuid",
        "transforms.extractString.type": "org.apache.kafka.connect.transforms.ExtractField$Key",
        "transforms.extractString.field": "uuid",
        "quote.sql.identifiers":"never",
        "query": "select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id",
        "errors.tolerance": "all",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "batch.max.rows": "100",
        "poll.interval.ms": "60000"
    }
}

When checking the connector status I get that is running:

curl http://conncet:8083/connectors/stats/status

{
    "name": "stats",
    "connector": {
        "state": "RUNNING",
        "worker_id": "connect-3:38083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "connect-1:18083"
        }
    ],
    "type": "source"
}

But after an hour I still don't see the topic created. I've checked in MySQL which queries are running with show full processlist; and I see two queries like this:

select s.id, concat(database(), '_', s.id) as uuid, c.email, s.type as type, s.created_at as log_date, a.type as a_type from stats s join concact c on c.id = s.recipient_id join address a on a.id = s.address_id WHERE s.id > -1 ORDER BY s.id ASC

So basically the query is the same as the query I provided in query in the connector configuration plus WHERE s.id > -1 ORDER BY s.id ASC, since the query in this join produces a resultset of 21 millon rows MySQL is sending the Data for a long time. When I check again with show full processlist; I see now 4 queries like this, and then 8, and then 16, and so on.

The questions are:

  1. Why is Kafka connect trying to get ALL the rows at once when adding: s.id > -1 ORDER BY s.id ASC.
  2. Is it possible to configure the connector to not do this, and instead fetch a smaller amount?
  3. Is "batch.max.rows": "100" only controlling the batch size after the initial poll??

Update:

There is an open topic for this issue. I think this question can be closed.

Upvotes: 3

Views: 4670

Answers (2)

Ben
Ben

Reputation: 41

query.suffix was added in 5.5. I used it to add a limit statement and it worked great, it just appends the limit to the end of the query.

see issue

Upvotes: 4

Bartosz Wardziński
Bartosz Wardziński

Reputation: 6593

JDBC Source Connector with incrementing mode and passed query, execute that query with following where clause: WHERE incrementingColumnName > lastIncrementedValue ORDER BY incrementingColumnName ASC. (if you use incremental mode and query you can't pass where clause there).

At first poll lastIncrementedValue is -1, so it try to query all records. After extracting each records lastIncrementedValue increases, so next querying will only poll new data. batch.max.rows refers to how many records SourceTask::poll(...) will return to Kafka Connect framework. It is the max size of the batch that will be sent to Kafka at once.

I think, when you fetch data from single table it works faster, because the query executes faster (less complicated). If you execute those queries using other SQL tools it would perform similar.

Upvotes: 1

Related Questions