Reputation: 379
I setup a Kafka JDBC sink to send events to a PostgreSQL. I wrote this simple producer that sends JSON with schema (avro) data to a topic as follows:
producer.py (kafka-python)
biometrics = {
"heartbeat": self.pulse, # integer
"oxygen": self.oxygen,# integer
"temprature": self.temprature, # float
"time": time # string
}
avro_value = {
"schema": open(BASE_DIR+"/biometrics.avsc").read(),
"payload": biometrics
}
producer.send("biometrics",
key="some_string",
value=avro_value
)
Value Schema:
{
"type": "record",
"name": "biometrics",
"namespace": "athlete",
"doc": "athletes biometrics"
"fields": [
{
"name": "heartbeat",
"type": "int",
"default": 0
},
{
"name": "oxygen",
"type": "int",
"default": 0
},
{
"name": "temprature",
"type": "float",
"default": 0.0
},
{
"name": "time",
"type": "string"
"default": ""
}
]
}
Connector config (without hosts, passwords etc)
{
"name": "jdbc_sink",
"connector.class": "io.aiven.connect.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter ",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"topics": "biometrics",
"insert.mode": "insert",
"auto.create": "true"
}
But my connector fails hard, with three errors and I am unable to spot the reason for any of them:
TL;DR; log Version
(Error 1) Caused by: org.apache.kafka.connect.errors.DataException: biometrics
(Error 2) Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
(Error 3) Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Full log
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:498)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:475)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:325)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:185)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:235)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.connect.errors.DataException: biometrics
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:98)
at org.apache.kafka.connect.storage.Converter.toConnectData(Converter.java:87)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:498)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Avro message for id -1
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
Could someone help me understand those errors and the underlying reason?
Upvotes: 0
Views: 860
Reputation: 191743
The error is because you need to use the JSONConverter class w/ value.converter.schemas.enabled=true
in your Connector since that was what was produced, but the schema
payload is not an Avro schema represntation of the payload
, so it might still fail with those changes alone...
If you want to actually send Avro, then use the AvroProducer in confluent-kafka
library, which requires running the Schema Registry.
Upvotes: 1