Reputation: 207
hello I use debezium outbox router for postgres
this is my json in payload column
{
"userId": 107385,
"chatId": "beb8faec-b75f-4eca-ace0-57b8621c7ca0",
"fromEcho": true,
"results": [
{
"key": "agentSwitch",
"value": "Evet",
"type": "SWITCH"
},
{
"key": "agentStar",
"value": {
"chips": [],
"rate": 5
},
"type": "STARS"
},
{
"key": "agentDescription",
"value": null,
"type": "TEXT_AREA"
}
]
}
if I use org.apache.kafka.connect.json.JsonConverter value converter
it throws this error
{ "id": 0, "state": "FAILED", "worker_id": "10.233.66.78:8083", "trace": "org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:494)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:402)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)\n\tat java.base/java.lang.Thread.run(Thread.java:829)\nCaused by: org.apache.kafka.connect.errors.DataException: Invalid type for STRUCT: class java.lang.String\n\tat org.apache.kafka.connect.json.JsonConverter.convertToJson(JsonConverter.java:680)\n\tat org.apache.kafka.connect.json.JsonConverter.convertToJsonWithoutEnvelope(JsonConverter.java:563)\n\tat org.apache.kafka.connect.json.JsonConverter.fromConnectData(JsonConverter.java:313)\n\tat org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:67)\n\tat org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$6(AbstractWorkerSourceTask.java:494)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)\n\t... 13 more\n" }
Then I switched to org.apache.kafka.connect.storage.StringConverter
this time it is okay to send json above. But this time if delete one item from array
like this json
{
"userId": 107385,
"chatId": "beb8faec-b75f-4eca-ace0-57b8621c7ca0",
"chatUserSegment": "SellerMeal",
"agentNickName": "",
"rating": 5,
"comment": "",
"duration": 0,
"orderId": 0,
"timestamp": 1716448682,
"platform": "IosV2",
"fromEcho": true,
"results": [
{
"key": "agentStar",
"value": {
"chips": [],
"rate": 5
},
"type": "STARS"
},
{
"key": "agentDescription",
"value": null,
"type": "TEXT_AREA"
}
]
}
it produces this to kafka
Struct{chatId=beb8faec-b75f-4eca-ace0-57b8621c7ca0,rating=5,userId=107385,comment=,orderId=0,results=[Struct{key=agentStar,type=STARS,value=Struct{rate=5}}, Struct{key=agentDescription,type=TEXT_AREA}],duration=0,fromEcho=true,platform=IosV2,timestamp=1716448682,agentNickName=,chatUserSegment=SellerMeal}
it is not json. what is the solution ? in the array value field is object type. this the problem I think.
connector settings
{
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"transforms.outbox.type": "io.debezium.transforms.outbox.EventRouter",
"slot.name": "slotname",
"tasks.max": "1",
"publication.name": "publication_name",
"database.history.kafka.topic": "topic.history",
"transforms": "outbox",
"slot.max.retries": "3",
"slot.retry.delay.ms": "5000",
"transforms.outbox.table.expand.json.payload": "true",
"topic.prefix": "chat-assistant",
"transforms.outbox.route.topic.replacement": "${routedByValue}",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"database.user": "user",
"database.dbname": "db",
"transforms.outbox.table.fields.additional.placement": "header:header:customFields",
"transforms.outbox.table.field.event.key": "event_id",
"transforms.outbox.table.json.payload.null.behavior": "optional_bytes",
"database.server.name": "name",
"transforms.outbox.route.by.field": "topic",
"plugin.name": "pgoutput",
"database.port": "5432",
"key.converter.schemas.enable": "false",
"database.hostname": "ip",
"database.password": "password",
"name": "connector_name",
"value.converter.schemas.enable": "false",
"table.include.list": "public.outbox",
"transforms.outbox.table.field.event.payload.id": "event_id"
}
Upvotes: 0
Views: 188