Reputation: 400
I use Kafka docker-compose stack. Confluent 7.7.0 and I want to use JDBC sink (confluentinc/kafka-connect-jdbc) my source connector is debezium/debezium-connector-postgresql
and I faced with this error: Failed to deserialize data for topic dev_internal.public.constant_wits to Avro
source.json
{
"name": "source",
"config": {
"topic.prefix": "dev_internal",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "*****",
"database.password": "******",
"database.dbname": "internal",
"slot.name": "internal",
"plugin.name": "pgoutput",
"publication.name": "publication_debezium",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones": "false",
"table.exclude.list": "public.django_migrations"
}
}
my sink.json:
{
"name":"sink",
"config":{
"auto.create":"false",
"auto.evolve":"false",
"connection.password":"<PASSWORD>",
"connection.url":"<JDBC_URL>",
"connection.user":"<USER>",
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"delete.enabled":"true",
"header.converter":"org.apache.kafka.connect.storage.SimpleHeaderConverter",
"insert.mode":"upsert",
"key.converter":"io.confluent.connect.avro.AvroConverter",
"key.converter.schema.registry.url":"http://schema-registry:8081",
"max.retries":"10",
"pk.fields":"id"
"pk.mode":"record_key",
"tasks.max":"1",
"topics.regex":"dev_internal.(?!public.django_session)(?:.*)",
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url":"http://schema-registry:8081",
"transforms":"dropTopicPrefix,UpdatedTimestamp",
"transforms.dropTopicPrefix.regex":"dev_internal.public(.*)",
"transforms.dropTopicPrefix.replacement":"$1",
"transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
"transforms.UpdatedAtTimestamp.field":"updated_at",
"transforms.UpdatedAtTimestamp.format":"yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'",
"transforms.UpdatedAtTimestamp.target.type":"Timestamp",
"transforms.UpdatedAtTimestamp.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
....
}
}
error:
ERROR WorkerSinkTask{id=dev_internal_sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:166)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:531)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:509)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:243)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:212)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic dev_internal.public.constant_wits to Avro:
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:148)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:531)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:224)
... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:600)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:390)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:264)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:199)
at io.confluent.connect.avro.AvroConverter.toConnectData(
Upvotes: 0
Views: 33