eren arslan
eren arslan

Reputation: 207

Debezium - value.converter serializing problem

hello I use debezium outbox router for postgres

this is my json in payload column

{
"userId": 107385,
"chatId": "beb8faec-b75f-4eca-ace0-57b8621c7ca0",
"fromEcho": true,
"results": [
    {
        "key": "agentSwitch",
        "value": "Evet",
        "type": "SWITCH"
    },
    {
        "key": "agentStar",
        "value": {
            "chips": [],
            "rate": 5
        },
        "type": "STARS"
    },
    {
        "key": "agentDescription",
        "value": null,
        "type": "TEXT_AREA"
    }
]

}

if I use org.apache.kafka.connect.json.JsonConverter value converter

it throws this error

{ "id": 0, "state": "FAILED", "worker_id": "10.233.66.78:8083", "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:494)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:402)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class java.lang.String\n\tat org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:680)\n\tat org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:563)\n\tat org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:313)\n\tat org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:67)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$6(AbstractWorkerSourceTask.java:494)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)\n\t... 13 more\n" }

Then I switched to org.apache.kafka.connect.storage.StringConverter

this time it is okay to send json above. But this time if delete one item from array

like this json

{
"userId": 107385,
"chatId": "beb8faec-b75f-4eca-ace0-57b8621c7ca0",
"chatUserSegment": "SellerMeal",
"agentNickName": "",
"rating": 5,
"comment": "",
"duration": 0,
"orderId": 0,
"timestamp": 1716448682,
"platform": "IosV2",
"fromEcho": true,
"results": [
    {
        "key": "agentStar",
        "value": {
            "chips": [],
            "rate": 5
        },
        "type": "STARS"
    },
    {
        "key": "agentDescription",
        "value": null,
        "type": "TEXT_AREA"
    }
]

}

it produces this to kafka

Struct{chatId=beb8faec-b75f-4eca-ace0-57b8621c7ca0,rating=5,userId=107385,comment=,orderId=0,results=[Struct{key=agentStar,type=STARS,value=Struct{rate=5}}, Struct{key=agentDescription,type=TEXT_AREA}],duration=0,fromEcho=true,platform=IosV2,timestamp=1716448682,agentNickName=,chatUserSegment=SellerMeal}

it is not json. what is the solution ? in the array value field is object type. this the problem I think.

connector settings

{
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
    "slot.name": "slotname",
    "tasks.max": "1",
    "publication.name": "publication_name",
    "database.history.kafka.topic": "topic.history",
    "transforms": "outbox",
    "slot.max.retries": "3",
    "slot.retry.delay.ms": "5000",
    "transforms.outbox.table.expand.json.payload": "true",
    "topic.prefix": "chat-assistant",
    "transforms.outbox.route.topic.replacement": "${routedByValue}",
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "database.user": "user",
    "database.dbname": "db",
    "transforms.outbox.table.fields.additional.placement": "header:header:customFields",
    "transforms.outbox.table.field.event.key": "event_id",
    "transforms.outbox.table.json.payload.null.behavior": "optional_bytes",
    "database.server.name": "name",
    "transforms.outbox.route.by.field": "topic",
    "plugin.name": "pgoutput",
    "database.port": "5432",
    "key.converter.schemas.enable": "false",
    "database.hostname": "ip",
    "database.password": "password",
    "name": "connector_name",
    "value.converter.schemas.enable": "false",
    "table.include.list": "public.outbox",
    "transforms.outbox.table.field.event.payload.id": "event_id"
}

Upvotes: 0

Views: 188

Answers (0)

Related Questions