Reputation: 17
I am trying to serialize payload values being sent to Azure Event Hub in Avro format. I'm deploying KafkaConnect to Kubernetes using Strimzi, and using Event Hub as the broker in place of native Kafka. When I deploy the container, the plugins load correctly but the schema registry produces errors.
I've already proven out messages being sent to Event Hub using the below configuration without avro serialization. The problem has been isolated to the schema registry
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
name: my-connect-cluster
annotations:
strimzi.io/use-connector-resources: "false"
spec:
version: 3.7.1
replicas: 1
bootstrapServers: mynamespace.servicebus.windows.net:9093
authentication:
type: plain
username: $MyConnectionString
passwordSecret:
secretName: mysecret
password: mypassword
config:
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: https://mynamespace.servicebus.windows.net/myschemagroup
value.converter.enhanced.avro.schema.support: true
group.id: dev.mygroup.cdc.hub
offset.storage.topic: dev.mygroup.offsets.hub
config.storage.topic: dev.mygroup.configs.hub
status.storage.topic: dev.mygroup.status.hub
config.storage.replication.factor: -1
offset.storage.replication.factor: -1
status.storage.replication.factor: -1
build:
output:
type: docker
image: mycontainerregistry
pushSecret: mysecret
plugins:
- name: mongodb
artifacts:
- type: jar
url: https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.5.1/mongo-kafka-connect-1.5.1-all.jar
- type: jar
url: https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar
- name: avro-converter
artifacts:
- type: zip
url: https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.7.2/confluentinc-kafka-connect-avro-converter-7.7.2.zip
I also tried unsuccessfully to use http://localhost:8081 for the value.converter.schema.registry.url
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:494)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:402)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic mytopic:
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:107)
at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$6(AbstractWorkerSourceTask.java:494)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema"string"
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:917)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:187)
at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:177)
at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:94)
... 16 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unable to parse error message from schema registry: '(<Fault xmlns="http://schemas.microsoft.com/ws/2005/05/envelope/none"><Code><Value>Receiver</Value><Subcode><Value xmlns:a="http://schemas.microsoft.com/net/2005/12/windowscommunicationfoundation/dispatcher">a:InternalServiceFault</Value></Subcode></Code><Reason><Text xml:lang="en-US">The server was unable to process the request due to an internal error. For more information about the error, either turn on IncludeExceptionDetailInFaults (either from ServiceBehaviorAttribute or from the <serviceDebug> configuration behavior) on the server in order to send the exception information back to the client, or turn on tracing as per the Microsoft .NET Framework SDK documentation and inspect the server trace logs.</Text></Reason></Fault>)'; error code: 50005
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:345)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:418)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:634)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:618)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:610)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:337)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerWithResponse(CachedSchemaRegistryClient.java:458)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerWithResponse(CachedSchemaRegistryClient.java:434)
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.registerWithResponse(AbstractKafkaSchemaSerDe.java:550)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:120)
... 18 more
Upvotes: 0
Views: 61
Reputation: 191671
Unclear what you're expecting to happen, exactly, but your Kafka producer clients must use KafkaAvroSerializer
with a matching schema.registry.url
You cannot retroactively deserialize different data formats
Upvotes: 1