user3554599
user3554599

Reputation: 81

Salesforce Camel Kafka Connector converting message to weird format

We are producing events from Salesforce to Kafka via something in SF called platform events. To get these events into Kafka, we are using the Salesforce Kafka Source Connector from Camel. Documentation for this Kafka connector can be found here:

https://camel.apache.org/camel-kafka-connector/latest/reference/connectors/camel-salesforce-kafka-source-connector.html

The events generated from Salesforce are in JSON format. The format looks something like this:

{
 "data": {
    "schema": "NhgeDyLTvEyVPQ9uOzDqeQ",
    "payload": {
        "AccountId__c": "00119000013q2g3AAA",
        "AccountUUID__c": "4654fefb-e3d1-4b08-a4e2-5dabaa504abb",
        "GUID__c": null,
        "CreatedById": "0056g000005YeBAAA0",
        "CreatedDate": "2021-10-10T15:24:43.819Z"
    },
    "event": {
        "replayId": "1256093"
    }
},
 "channel": "/event/Order_Completed__e"
}

This leads us to our issue. We are using the following configuration for our source connector:

{
"name": "sf_order_p_event_connector",
"config": {
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": "true",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": "true",
    "connector.class": "org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSourceConnector",
    "camel.component.salesforce.loginUrl": "<redacted>",
    "camel.component.salesforce.instanceUrl": "<redacted>",
    "topics": "<redacted>",
    "camel.source.endpoint.rawPayload": "true",
    "camel.source.path.topicName": "/event/Order_Completed__e",
    "camel.source.endpoint.replayId": "-1",
    "camel.component.salesforce.authenticationType": "USERNAME_PASSWORD",
    "camel.component.salesforce.clientId": "<redacted",
    "camel.component.salesforce.clientSecret": "<redacted>",
    "camel.component.salesforce.password": "<redacted5",
    "camel.component.salesforce.userName": "<redacted>",
    "camel.source.endpoint.apiVersion": "52.0"
}
}

When using this value.converter from the configuration above, we get the following exception:

ERROR [sf_account_change_connector|task-0] WorkerSourceTask{id=sf_account_change_connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:184) org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:298) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:324) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:248) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Caused by: java.lang.NullPointerException at org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:677) at org.apache.kafka.connect.json.JsonConverter.convertToJsonWithEnvelope(JsonConverter.java:592) at org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:346) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:298) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) ... 11 more

It seems it cannot convert whatever value is coming through to Kafka into JSON. I changed value.converter to "org.apache.kafka.connect.storage.StringConverter" to see what I am actually getting in the topic. Once I did that, this is the message I saw coming through to Kafka from Salesforce:

{data={schema=NhgeDyLTvEyVPQ9uOzDqeQ, payload={AccountId__c=00119000013q2g3AAA, 
AccountUUID__c=4654fefb-e3d1-4b08-a4e2-5dabaa504abb, GUID__c=null, 
CreatedById=0056g000005YeBAAA0, CreatedDate=2021-10-10T15:24:43.819Z}, event= 
{replayId=1256093}}, channel=/event/Order_Completed__e}

It seems Kafka is not processing this as JSON but as a key=value (whatever this is) type of value. My question is, why am I seeing this type of payload and not JSON? Also, what configuration (if any) for the source connector can I use to get past this and perhaps convert the value into JSON? I need this value in JSON for my faust agent to properly process it. I have tried multiple different configurations for the source connector but nothing seems to be working.

Any help would be appreciated. Thank you in advance!

Upvotes: 1

Views: 685

Answers (1)

Zeus
Zeus

Reputation: 45

please use camel.source.endpoint.rawPayload=true in your connector config

Upvotes: -1

Related Questions