Rishika Modi
Rishika Modi

Reputation: 41

Kafka Source and Sink connector not working for large data

I am creating a data pipeline using Kafka source and sink connector. Source connector is consuming from SQL database and publishing into topic and Sink connector subscribing to topic and putting into other SQL database. Table has 16 GB of data. Now the problem is, data is not getting transferred from one DB to another. However, if table size is small like 1000 rows then the data is getting successfully transferred.

Source connector config:

"config": {
       "connector.class": 
"io.confluent.connect.jdbc.JdbcSourceConnector",
       "tasks.max": "1",
       "connection.url": "",
       "mode": "incrementing",
       "incrementing.column.name": "ID",
       "topic.prefix": "migration_",
       "name": "jdbc-source",
       "validate.non.null": false,
       "batch.max.rows":5
     }

Source connector logs:

INFO WorkerSourceTask{id=cmc-migration-source-0} flushing 0 outstanding messages for offset commit 
[2019-03-08 16:48:45,402] INFO WorkerSourceTask{id=cmc-migration-source-0} Committing offsets
[2019-03-08 16:48:45,402] INFO WorkerSourceTask{id=cmc-migration-source-0} flushing 0 outstanding messages for offset commit
[2019-03-08 16:48:55,403] INFO WorkerSourceTask{id=cmc-migration-source-0} Committing offsets(org.apache.kafka.connect.runtime.WorkerSourceTask:397)

Can anyone guide me how to tune my Kafka source connector to transfer large data?

Upvotes: 4

Views: 3880

Answers (1)

kaminzo
kaminzo

Reputation: 336

I have managed to overcome this problem by limiting the amount of records returned in a single query to a database e.g. 5000 at a time.

The solution will depend on the database and SQL dialect. Below examples will work and managed offsets properly for a single table. The incrementing ID column and Timestamp must be set as per instructions specified here: https://docs.confluent.io/kafka-connect-jdbc/current/source-connector/index.html#incremental-query-modes

Example table myTable has the following columns:

  • id increments every time the new record is added
  • lastUpdatedTimestamp - updated every time the record gets updated
  • some other attributes

id and lastUpdatedTimestamp must uniquely identify a record in the dataset.

The connector constructs the query as follows:

config.query + Kafka Connect WHERE clause for a selected mode + config.query.suffix

PostgreSQL / MySQL

"config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "table.whitelist": "myTable",
    "query.suffix": "LIMIT 5000"
    ...
    }

Will result in:

SELECT *
FROM "myTable"
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC
LIMIT 5000

Or the following if you want to add additional condition in the WHERE clause.

"config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "query": "SELECT * FROM ( SELECT id, lastUpdatedTimestamp, name, age FROM myTable WHERE Age > 18) myQuery",
    "query.suffix": "LIMIT 5000"
    ...
    }

Will result in:

SELECT *
FROM (
    SELECT id, lastUpdatedTimestamp, name, age
    FROM myTable
    WHERE Age > 18
    ) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC
LIMIT 5000

SQL Server

"config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "query": "SELECT TOP 5000 * FROM (SELECT id, lastUpdatedTimestamp, name, age FROM myTable) myQuery",
    ...
    }

Will result in:

SELECT TOP 5000 *
FROM (
    SELECT id, lastUpdatedTimestamp, name, age
    FROM myTable
    WHERE Age > 18
    ) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC

Oracle

"config": {
    ...
    "poll.interval.ms" : 10000,
    "mode":"timestamp+incrementing",
    "incrementing.column.name": "id",
    "timestamp.column.name": "lastUpdatedTimestamp",
    "query": "SELECT * FROM (SELECT id, lastUpdatedTimestamp, name, age FROM myTable WHERE ROWNUM <= 5000) myQuery",
    ...
    }

Will result in:

SELECT *
FROM (
    SELECT id, lastUpdatedTimestamp, name, age
    FROM myTable
    WHERE ROWNUM <= 5000
    ) myQuery
WHERE "myTable"."lastUpdatedTimestamp" < ?
    AND (
        ("myTable"."lastUpdatedTimestamp" = ? AND "myTable"."id" > ?)
        OR
        "myTable"."lastUpdatedTimestamp" > ?
        )
ORDER BY
    "myTable"."lastUpdatedTimestamp",
    "myTable"."id" ASC

This approach will not work with bulk mode. It works with timestamp+incrementing mode and it may work with timestamp or incrementing modes depending on the table characteristics.

Joining many tables - idea that I have not tested!

It gets more complicated if the query performs the joins across many tables. It would require the following:

  • provide an id that would uniquely identify a row e.g. concatenated tableA.id + tableB.id
  • provide a timestamp of a last updated table record, whichever the latest
  • order query records appropriately

There is a limit of of how long the ID can be due to Java long datatype used by Kafka Connect i.e. 9,223,372,036,854,775,807

Upvotes: 1

Related Questions