gt1
gt1

Reputation: 17

problems with Schema Registry associated with Azure Event Hub from KafkaConnect using Strimzi

I am trying to serialize payload values being sent to Azure Event Hub in Avro format. I'm deploying KafkaConnect to Kubernetes using Strimzi, and using Event Hub as the broker in place of native Kafka. When I deploy the container, the plugins load correctly but the schema registry produces errors.

I've already proven out messages being sent to Event Hub using the below configuration without avro serialization. The problem has been isolated to the schema registry

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  annotations:
    strimzi.io/use-connector-resources: "false"
spec:
  version: 3.7.1
  replicas: 1
  bootstrapServers: mynamespace.servicebus.windows.net:9093 
  authentication:
    type: plain 
    username: $MyConnectionString
    passwordSecret:
      secretName: mysecret
      password: mypassword
  config: 
    value.converter: io.confluent.connect.avro.AvroConverter 
    value.converter.schema.registry.url: https://mynamespace.servicebus.windows.net/myschemagroup
    value.converter.enhanced.avro.schema.support: true

    group.id: dev.mygroup.cdc.hub 
    offset.storage.topic: dev.mygroup.offsets.hub 
    config.storage.topic: dev.mygroup.configs.hub 
    status.storage.topic: dev.mygroup.status.hub 
    config.storage.replication.factor: -1
    offset.storage.replication.factor: -1
    status.storage.replication.factor: -1

  build:
    output:
      type: docker
      image: mycontainerregistry
      pushSecret: mysecret
    plugins:
      - name: mongodb
        artifacts:
          - type: jar
            url: https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.5.1/mongo-kafka-connect-1.5.1-all.jar 
          - type: jar
            url: https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar           
      - name: avro-converter
        artifacts:
          - type: zip
            url: https://d2p6pa21dvn84.cloudfront.net/api/plugins/confluentinc/kafka-connect-avro-converter/versions/7.7.2/confluentinc-kafka-connect-avro-converter-7.7.2.zip
          

I also tried unsuccessfully to use http://localhost:8081 for the value.converter.schema.registry.url

org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
        at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:494)
        at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:402)
        at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
        at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
        at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
        at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
        at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
        at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
        at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
        at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
        at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic mytopic:
        at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:107)
        at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$6(AbstractWorkerSourceTask.java:494)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
        ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema"string"
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:917)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:187)
        at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:177)
        at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:94)
        ... 16 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unable to parse error message from schema registry: '(<Fault xmlns="http://schemas.microsoft.com/ws/2005/05/envelope/none"><Code><Value>Receiver</Value><Subcode><Value xmlns:a="http://schemas.microsoft.com/net/2005/12/windowscommunicationfoundation/dispatcher">a:InternalServiceFault</Value></Subcode></Code><Reason><Text xml:lang="en-US">The server was unable to process the request due to an internal error.  For more information about the error, either turn on IncludeExceptionDetailInFaults (either from ServiceBehaviorAttribute or from the &lt;serviceDebug&gt; configuration behavior) on the server in order to send the exception information back to the client, or turn on tracing as per the Microsoft .NET Framework SDK documentation and inspect the server trace logs.</Text></Reason></Fault>)'; error code: 50005
        at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:345)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:418)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:634)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:618)
        at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:610)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:337)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerWithResponse(CachedSchemaRegistryClient.java:458)
        at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerWithResponse(CachedSchemaRegistryClient.java:434)
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.registerWithResponse(AbstractKafkaSchemaSerDe.java:550)
        at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:120)
        ... 18 more

Upvotes: 0

Views: 61

Answers (1)

OneCricketeer
OneCricketeer

Reputation: 191671

Unclear what you're expecting to happen, exactly, but your Kafka producer clients must use KafkaAvroSerializer with a matching schema.registry.url

You cannot retroactively deserialize different data formats

Upvotes: 1

Related Questions