Vadim  Rudkov
Vadim Rudkov

Reputation: 42

Salesforce Bulk API Sink Connector creates too many batches

I am building a prototype to synch data from MySQL to Salesforce running Kafka in Docker containers on my laptop. I followed Quick Start for Confluent Platform to install and run Confluent Platform. To take data from MySQL and send over to Salesforce I am using Debezium MySQL CDC Source Connector and Salesforce Bulk API Connector. Everything works fine except that when new updates from MySQL come one by one with a small but bigger than half a second delay the connector creates a batch for each update. So, running the following bash script creates 3 batches in Salesforce:

for i in $(seq 3); do
    QUERY="UPDATE some_table SET state = 'state_$i' where id = 1;"
    mysql -u user_name -h host_name -e "$QUERY" -p<mysql_password>
    sleep .7
done

The problem here is that the Salesforce connector pushes records to Salesforce too often, while there is a limit in Salesforce for the max number of batches per 24 hours - 15,000. To guarantee to stay within this limit I’d like to enforce the connector to buffer updates before pushing them into Salesforce. To archive that I am trying to set fetch.min.bytes and fetch.max.wait.ms properties. There is a document Salesforce Bulk API Sink Connector Configuration Properties which says

You can override producer-specific properties by using the confluent.topic.producer. prefix and consumer-specific properties by using the confluent.topic.consumer. prefix.

So my attempt was to put these properties in the salesforce connector config (here I want the connector to buffer records for 10sec or until they reach 1000 bytes):

    "confluent.topic.consumer.fetch.max.wait.ms": "10000",
    "confluent.topic.consumer.fetch.min.bytes": "1000"

It seems like it has no effect as fetch.max.wait.ms stays to be 500ms which is the default value.

So, first I'd like to make sure that I am looking into the right direction trying to use fetch.min.bytes and fetch.max.wait.ms properties, that they are applicable to the connector at all. And second, how can I set these properties properly?

Upvotes: 1

Views: 693

Answers (1)

Vadim  Rudkov
Vadim Rudkov

Reputation: 42

Thank you @OneCricketeer, the link you provided helped me to solve the problem. Although I didn't change the connector.client.config.override.policy setting for the worker as it is described in the "Connector-level producer/consumer configuration overrides" section, just setting fetch.max.wait.ms and fetch.min.bytes for the connector was enough. So the final version of the connector's config now looks like:

{
  "name": "SalesforceBulkApiSinkConnectorConnector_0",
  "config": {
    "connector.class": "io.confluent.connect.salesforce.SalesforceBulkApiSinkConnector",
    "tasks.max": "1",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "errors.tolerance": "all",
    "topics": "sf_sink",
    "errors.deadletterqueue.topic.name": "sf_sink_dlq",
    "salesforce.instance": "https://<instance>.my.salesforce.com",
    "salesforce.username": "<username>",
    "salesforce.password": "<password>",
    "salesforce.password.token": "<token>",
    "salesforce.object": "KafkaEvent__e",   
    "salesforce.sink.object.operation": "insert",
    "override.event.type": "true",
    "salesforce.use.custom.id.field": "false",
    "behavior.on.api.errors": "log",
    "reporter.result.topic.replication.factor": "1",
    "reporter.error.topic.replication.factor": "1",
    "reporter.bootstrap.servers": "broker:29092",
    "consumer.override.fetch.max.wait.ms": "10000",
    "consumer.override.fetch.min.bytes": "5000"
  } 
} 

Upvotes: 2

Related Questions