Reputation: 42
I am building a prototype to synch data from MySQL to Salesforce running Kafka in Docker containers on my laptop. I followed Quick Start for Confluent Platform to install and run Confluent Platform. To take data from MySQL and send over to Salesforce I am using Debezium MySQL CDC Source Connector and Salesforce Bulk API Connector. Everything works fine except that when new updates from MySQL come one by one with a small but bigger than half a second delay the connector creates a batch for each update. So, running the following bash script creates 3 batches in Salesforce:
for i in $(seq 3); do
QUERY="UPDATE some_table SET state = 'state_$i' where id = 1;"
mysql -u user_name -h host_name -e "$QUERY" -p<mysql_password>
sleep .7
done
The problem here is that the Salesforce connector pushes records to Salesforce too often, while there is a limit in Salesforce for the max number of batches per 24 hours - 15,000. To guarantee to stay within this limit I’d like to enforce the connector to buffer updates before pushing them into Salesforce. To archive that I am trying to set fetch.min.bytes
and fetch.max.wait.ms
properties. There is a document Salesforce Bulk API Sink Connector Configuration Properties which says
You can override producer-specific properties by using the
confluent.topic.producer.
prefix and consumer-specific properties by using theconfluent.topic.consumer.
prefix.
So my attempt was to put these properties in the salesforce connector config (here I want the connector to buffer records for 10sec or until they reach 1000 bytes):
"confluent.topic.consumer.fetch.max.wait.ms": "10000",
"confluent.topic.consumer.fetch.min.bytes": "1000"
It seems like it has no effect as fetch.max.wait.ms
stays to be 500ms which is the default value.
So, first I'd like to make sure that I am looking into the right direction trying to use fetch.min.bytes
and fetch.max.wait.ms
properties, that they are applicable to the connector at all. And second, how can I set these properties properly?
Upvotes: 1
Views: 693
Reputation: 42
Thank you @OneCricketeer, the link you provided helped me to solve the problem. Although I didn't change the connector.client.config.override.policy
setting for the worker as it is described in the "Connector-level producer/consumer configuration overrides" section, just setting fetch.max.wait.ms
and fetch.min.bytes
for the connector was enough. So the final version of the connector's config now looks like:
{
"name": "SalesforceBulkApiSinkConnectorConnector_0",
"config": {
"connector.class": "io.confluent.connect.salesforce.SalesforceBulkApiSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"errors.tolerance": "all",
"topics": "sf_sink",
"errors.deadletterqueue.topic.name": "sf_sink_dlq",
"salesforce.instance": "https://<instance>.my.salesforce.com",
"salesforce.username": "<username>",
"salesforce.password": "<password>",
"salesforce.password.token": "<token>",
"salesforce.object": "KafkaEvent__e",
"salesforce.sink.object.operation": "insert",
"override.event.type": "true",
"salesforce.use.custom.id.field": "false",
"behavior.on.api.errors": "log",
"reporter.result.topic.replication.factor": "1",
"reporter.error.topic.replication.factor": "1",
"reporter.bootstrap.servers": "broker:29092",
"consumer.override.fetch.max.wait.ms": "10000",
"consumer.override.fetch.min.bytes": "5000"
}
}
Upvotes: 2