YVS1997
YVS1997

Reputation: 680

How to deploy kafka sink connection with multiple topics and table destination

From my previous question, I have decided to more consent about consumer deployment for database real-time synchronization with Kafka distributed. Same case; I have more than hundreds of tables that I want to pull from PostgreSQL to SQL Server. From PostgreSQL to Kafka I used Debezium connectors with wal2json plugins. And from Kafka to SQL Server I use JDBC Connectors. I have three identical setting brokers (different address):

broker.id=0
broker.rack=1
port=9093
listeners=PLAINTEXT://0.0.0.0:9093
advertised.listeners=PLAINTEXT://localhost:9093
log.dir=/home/admin/kafka/tmp/kafka_log1
offsets.topic.num.partition=1
offsets.topic.replication.factor=3
min.isnyc.replicas=2
default.replication.factor=3
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
zookeeper.connection.timeout.ms=7200000
delete.topic.enable=true
message.max.bytes=50497182 
replica.fetch.max.bytes=50497182
group.max.session.timeout.ms=7200000

I have try some possible solution like this:

  1. Set topics to use 1 partition and 3 replicas. Since my table has _ in its name, I get warn about it.
kafka-topics.sh -create --bootstrap-server localhost:9093,localhost:9094,localhost:9095  --replication-factor 3 --partitions 1 --topic $topic_name --config retention.ms=5400000
  1. I separate debezium and jdbc connectors with different workers. I have two workers with same configuration (except for the host port, 8085 for debezium and 8084 for sink) like this:
bootstrap.servers=localhost:9093,localhost:9094,localhost:9095
group.id=debezium-cluster
key.converter.schemas.enable=true
value.converter.schemas.enable=true
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
offset.storage.topic=connect-offsets-debezium
offset.storage.replication.factor=3
config.storage.topic=connect-configs-debezium
status.storage.topic=connect-status-debezium
producer.buffer.memory=29999999
producer.max.buffered.records=19999999
producer.max.request.size=51497182 
producer.retries=100
producer.max.in.flight.requests.per.connection=1
producer.request.timeout.ms=20000
producer.enable.idempotence=true
producer.retry.backoff.ms=500
producer.send.buffer.bytes=50497182
producer.receive.buffer.bytes=50497182
producer.ack=1
offset.flush.timeout.ms=300000
producer.buffer.memory=51497182
consumer.enable.auto.commit=true
consumer.retries=100
consumer.auto.commit.interval.ms=100000
consumer.max.partition.fetch.bytes=50497182
consumer.max.poll.records=10000
consumer.request.timeout.ms=20000
consumer.retry.backoff.ms=50000
consumer.session.timeout.ms=50000
consumer.auto.offset.reset=latest
consumer.isolation.level=read_committed
consumer.max.poll.interval.ms=5400000
fetch_max_bytes=50497182
rest.port=8085
plugin.path=/home/admin/kafka/connectors
  1. Loop sink connectors one by one without:
#!/bin/bash
CSV_LIST="/home/admin/kafka/main/config/tables/table_lists.csv"
DATA=${CSV_LIST}

while IFS=',' read table pk mode; do
topic_name=${table} 
curl -X POST http://localhost:8084/connectors -H 'Content-Type:application/json' -d '{"name" :"sqlservercon_'$topic_name'",
    "config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
            "topics":"'$table'",
            "connection.url":"jdbc:sqlserver://-:1433",
            "connection.user":"-",
            "connection.password":"-",
            "transforms":"unwrap",
            "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
            "transforms.unwrap.drop.tombstones":"false",
            "auto.create":"true",
            "insert.mode":"'$mode'",
            "pk.fields":" '$pk'",
            "pk.mode":"record_value",
            "destination.table.format":"db.dbo.'$table'"
}}' | jq
done < ${DATA}

Here how I deploy it:

  1. Start zookeeper and kafka server
  2. Create the topics
  3. Start kafka workers for Debezium source
  4. Add debezium connectors (since 1 db need only one connectors)
  5. Start kafka workers for sink
  6. Add jdbc connectors by loop

Unfortunately, it still doesn't satisfied me to moved all data to new SQL Server Database, due several deadlock case and consumer unawareness. I want to know if there's good suggestion for optimal consumer deployment. Is I need to add one workers for each connectors or do some like switching between each topics.

Upvotes: 3

Views: 3136

Answers (1)

YVS1997
YVS1997

Reputation: 680

I have check that i think due Kafka connect jdbc use batch.record to organized number of record that should sent to SQL server, it seems problem when i use upsert with large size of record. I assume i must to reduce batch to 1, both in source and sink. This still preliminary answer. And also, if someone know how to show the SQL query used to insert in Kafka connect JDBC, it will helpful to me to learn mechanism about JDBC behavior and how to tackle the deadlock.

And the best practice as far from my experience, if the target db is exist but no table inside, is to prioritize which table must inserted first and wait till it done and not use insert. For the table less than 100000 rows can be grouped to be one group, but large dimension table must pulled separately.

Upvotes: 0

Related Questions