mahradbt
mahradbt

Reputation: 400

JDBC sink connector failed - Unknown Magic Byte

I use Kafka docker-compose stack. Confluent 7.7.0 and I want to use JDBC sink (confluentinc/kafka-connect-jdbc) my source connector is debezium/debezium-connector-postgresql

and I faced with this error: Failed to deserialize data for topic dev_internal.public.constant_wits to Avro

source.json

{
        "name": "source",
        "config": {
                "topic.prefix": "dev_internal",
                "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
                "database.hostname": "postgres",
                "database.port": "5432",
                "database.user": "*****",
                "database.password": "******",
                "database.dbname": "internal",
                "slot.name": "internal",
                "plugin.name": "pgoutput",
                "publication.name": "publication_debezium",
                "transforms": "unwrap",
                "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
                "transforms.unwrap.drop.tombstones": "false",
                "table.exclude.list": "public.django_migrations"
        }
}

my sink.json:

{
    "name":"sink",
    "config":{
        "auto.create":"false",
        "auto.evolve":"false",
        "connection.password":"<PASSWORD>",
        "connection.url":"<JDBC_URL>",
        "connection.user":"<USER>",
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "delete.enabled":"true",
        "header.converter":"org.apache.kafka.connect.storage.SimpleHeaderConverter",
        "insert.mode":"upsert",
        "key.converter":"io.confluent.connect.avro.AvroConverter",
        "key.converter.schema.registry.url":"http://schema-registry:8081",
        "max.retries":"10",
        "pk.fields":"id"
        "pk.mode":"record_key",
        "tasks.max":"1",
        "topics.regex":"dev_internal.(?!public.django_session)(?:.*)",
        "value.converter":"io.confluent.connect.avro.AvroConverter",
        "value.converter.schema.registry.url":"http://schema-registry:8081",
        "transforms":"dropTopicPrefix,UpdatedTimestamp",
        "transforms.dropTopicPrefix.regex":"dev_internal.public(.*)",
        "transforms.dropTopicPrefix.replacement":"$1",
        "transforms.dropTopicPrefix.type":"org.apache.kafka.connect.transforms.RegexRouter",
        "transforms.UpdatedAtTimestamp.field":"updated_at",
        "transforms.UpdatedAtTimestamp.format":"yyyy-MM-dd'T'HH:mm:ss.SSSSSS'Z'",
        "transforms.UpdatedAtTimestamp.target.type":"Timestamp",
        "transforms.UpdatedAtTimestamp.type":"org.apache.kafka.connect.transforms.TimestampConverter$Value",
        ....
    }
}

error:

ERROR WorkerSinkTask{id=dev_internal_sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:244)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:166)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:531)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:509)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:345)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:243)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:212)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:229)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:284)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
        at java.base/java.lang.Thread.run(Thread.java:829)
 Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic dev_internal.public.constant_wits to Avro:
        at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:148)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:531)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:190)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:224)
        ... 14 more
 Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:600)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.<init>(AbstractKafkaAvroDeserializer.java:390)
        at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:264)
        at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:199)
        at io.confluent.connect.avro.AvroConverter.toConnectData(

Upvotes: 0

Views: 33

Answers (0)

Related Questions