Khumar
Khumar

Reputation: 336

schemas.enable=false config not applied to connector

I am using mongokafka sink connector from kafka to mongo and connector jar is mongo-kafka-connect-1.7.0-all.jar. I have added mongodb-driver-core-4.5.0.jar also in plugins path of connect cluster. I am using strimzi kafka and connect. In kafka I could successful send message but in connector I could see error.

curl -X POST -H "Content-Type: application/json" \
    --data '
    {"name": "mongo-sink",
    "config": {
        "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector",
        "connection.uri":"mongodb://mongoservice:27017/?replicaSet=rs0",
        "database":"quickstart",
        "collection":"topicData",
        "topics":"q4.s4"
    }
    }
    ' \
http://localhost:8083/connectors -w "\n"

Sample message:

{"id": 1, "name": "hello"}

Error in connector:

{
    "name": "mongo-sink",
    "connector": {
        "state": "RUNNING",
        "worker_id": "localhost:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "FAILED",
            "worker_id": "localhost:8083",
            "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:496)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:473)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:328)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:182)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:231)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires \"schema\" and \"payload\" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.\n\tat org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:328)\n\tat org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertValue(WorkerSinkTask.java:540)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$2(WorkerSinkTask.java:496)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)\n\t... 13 more\n"
        }
    ],
    "type": "sink"
}

I tried to set "schemas.enable":"false" in connector config but still same issue.

Do I need to start any other service like schema registry ? currently I am using strimzi kafka and connect with mongodbv5.0.3 and mongo-kafka-connect-1.7.0-all.jar ,mongodb-driver-core-4.5.0.jar.

Upvotes: 0

Views: 1661

Answers (2)

Jack
Jack

Reputation: 140

This might not be direct answer, but may be helpful. In my case "Confluent S3 Sink Connector", config in request parameter is cached in topic's config. So, I cannot overwrite config value without deleting that topic. This is not written in Confluent Docs...

Upvotes: 0

OneCricketeer
OneCricketeer

Reputation: 191671

I tried to set "schemas.enable":"false" in connector config but still same issue.

The error is for the converter, not the connector. That config is not a valid Mongo Connector property.

Since you are only interested in the value of the record, then set

"value.converter.schemas.enable" : "false"

Do I need to start any other service like schema registry

If you were using kafka-avro-console-producer, or similar tool to send the record, then yes.

currently I am using strimzi kafka

You can use KafkaConnector CRD rather than invoke the Connect API directly.

Upvotes: 0

Related Questions