Reputation: 41
I am creating a data pipeline using Kafka source and sink connector. Source connector is consuming from SQL database and publishing into topic and Sink connector subscribing to topic and putting into other SQL database. Table has 16 GB of data. Now the problem is, data is not getting transferred from one DB to another. However, if table size is small like 1000 rows then the data is getting successfully transferred.
Source connector config:
"config": {
"connector.class":
"io.confluent.connect.jdbc.JdbcSourceConnector",
"tasks.max": "1",
"connection.url": "",
"mode": "incrementing",
"incrementing.column.name": "ID",
"topic.prefix": "migration_",
"name": "jdbc-source",
"validate.non.null": false,
"batch.max.rows":5
}
Source connector logs:
INFO WorkerSourceTask{id=cmc-migration-source-0} flushing 0 outstanding messages for offset commit
[2019-03-08 16:48:45,402] INFO WorkerSourceTask{id=cmc-migration-source-0} Committing offsets
[2019-03-08 16:48:45,402] INFO WorkerSourceTask{id=cmc-migration-source-0} flushing 0 outstanding messages for offset commit
[2019-03-08 16:48:55,403] INFO WorkerSourceTask{id=cmc-migration-source-0} Committing offsets(org.apache.kafka.connect.runtime.WorkerSourceTask:397)
Can anyone guide me how to tune my Kafka source connector to transfer large data?
Upvotes: 4
Views: 3880
Reputation: 336
I have managed to overcome this problem by limiting the amount of records returned in a single query to a database e.g. 5000 at a time.
The solution will depend on the database and SQL dialect. Below examples will work and managed offsets properly for a single table. The incrementing ID column and Timestamp must be set as per instructions specified here: https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/index.html#incremental-query-modes
Example table myTable
has the following columns:
id
increments every time the new record is addedlastUpdatedTimestamp
- updated every time the record gets updatedid
and lastUpdatedTimestamp
must uniquely identify a record in the dataset.
The connector constructs the query as follows:
config.query
+ Kafka Connect WHERE clause for a selected mode
+ config.query.suffix
PostgreSQL / MySQL
"config": {
...
"poll.interval.ms" : 10000,
"mode":"timestamp+incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "lastUpdatedTimestamp",
"table.whitelist": "myTable",
"query.suffix": "LIMIT 5000"
...
}
Will result in:
SELECT *
FROM "myTable"
WHERE "myTable"."lastUpdatedTimestamp" < ?
AND (
("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
OR
"myTable"."lastUpdatedTimestamp" > ?
)
ORDER BY
"myTable"."lastUpdatedTimestamp",
"myTable"."id" ASC
LIMIT 5000
Or the following if you want to add additional condition in the WHERE clause.
"config": {
...
"poll.interval.ms" : 10000,
"mode":"timestamp+incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "lastUpdatedTimestamp",
"query": "SELECT * FROM ( SELECT id, lastUpdatedTimestamp, name, age FROM myTable WHERE Age > 18) myQuery",
"query.suffix": "LIMIT 5000"
...
}
Will result in:
SELECT *
FROM (
SELECT id, lastUpdatedTimestamp, name, age
FROM myTable
WHERE Age > 18
) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
AND (
("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
OR
"myTable"."lastUpdatedTimestamp" > ?
)
ORDER BY
"myTable"."lastUpdatedTimestamp",
"myTable"."id" ASC
LIMIT 5000
SQL Server
"config": {
...
"poll.interval.ms" : 10000,
"mode":"timestamp+incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "lastUpdatedTimestamp",
"query": "SELECT TOP 5000 * FROM (SELECT id, lastUpdatedTimestamp, name, age FROM myTable) myQuery",
...
}
Will result in:
SELECT TOP 5000 *
FROM (
SELECT id, lastUpdatedTimestamp, name, age
FROM myTable
WHERE Age > 18
) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
AND (
("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
OR
"myTable"."lastUpdatedTimestamp" > ?
)
ORDER BY
"myTable"."lastUpdatedTimestamp",
"myTable"."id" ASC
Oracle
"config": {
...
"poll.interval.ms" : 10000,
"mode":"timestamp+incrementing",
"incrementing.column.name": "id",
"timestamp.column.name": "lastUpdatedTimestamp",
"query": "SELECT * FROM (SELECT id, lastUpdatedTimestamp, name, age FROM myTable WHERE ROWNUM <= 5000) myQuery",
...
}
Will result in:
SELECT *
FROM (
SELECT id, lastUpdatedTimestamp, name, age
FROM myTable
WHERE ROWNUM <= 5000
) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
AND (
("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
OR
"myTable"."lastUpdatedTimestamp" > ?
)
ORDER BY
"myTable"."lastUpdatedTimestamp",
"myTable"."id" ASC
This approach will not work with bulk
mode. It works with timestamp+incrementing
mode and it may work with timestamp
or incrementing
modes depending on the table characteristics.
Joining many tables - idea that I have not tested!
It gets more complicated if the query performs the joins across many tables. It would require the following:
There is a limit of of how long the ID can be due to Java long datatype used by Kafka Connect i.e. 9,223,372,036,854,775,807
Upvotes: 1