Ronan Hughes
Ronan Hughes

Reputation: 218

spring-cloud-stream-binder-kafka configuration for Confluent Cloud Schema Registry Unauthorized error

I'm having trouble configuring a connection to Confluent when using spring-cloud-stream-binder-kafka. Possibly somebody can see what is wrong?

When I use the example from https://www.confluent.io/blog/schema-registry-avro-in-spring-boot-application-tutorial/ Then it works fine and I can see messages on Confluent Cloud

However, when adding the same connection details using the spring-cloud-stream-binder-kafka config, it is returning Unauthorized error.

Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"MySchema","namespace":"org.test","fields":[{"name":"value","type":"double"}]}

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401

My Configuration below gives the above error. Not sure what is going wrong?

  cloud:
    stream:
      default:
        producer:
          useNativeEncoding: true
      kafka:
        binder:
          brokers: myinstance.us-east1.gcp.confluent.cloud:9092
          producer-properties:
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
            value.serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
            schema.registry.url: https://myinstance.us-central1.gcp.confluent.cloud
            basic.auth.credentials.source: USER_INFO
            schema.registry.basic.auth.user.info: mySchemaKey:mySchemaSecret
          configuration:
            ssl.endpoint.identification.algorithm: https
            sasl.mechanism: PLAIN
            request.timeout.ms: 20000
            retry.backoff.ms: 500
            sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="myKey" password="MySecret";
            security.protocol: SASL_SSL
      bindings:
        normals-out:
          destination: normals
          contentType: application/*+avro

Example from Confluent that is working fine:

  kafka:
    bootstrap-servers:
      - myinstance.us-east1.gcp.confluent.cloud:9092
    properties:
      ssl.endpoint.identification.algorithm: https
      sasl.mechanism: PLAIN
      request.timeout.ms: 20000
      retry.backoff.ms: 500
      sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="myKey" password="MySecret";
      security.protocol: SASL_SSL
      schema.registry.url: https://myinstance.us-central1.gcp.confluent.cloud
      basic.auth.credentials.source: USER_INFO
      schema.registry.basic.auth.user.info: mySchemaKey:mySchemaSecret
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
    template:
      default-topic:
logging:
  level:
    root: info

Upvotes: 3

Views: 1185

Answers (1)

Ronan Hughes
Ronan Hughes

Reputation: 218

My issue was only that I was missing a dependency in my pom.

I should delete my question, but I leave it here as a reference that the configuration does actually work as it is above.

<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry-client</artifactId>
  <version>5.3.0</version>
</dependency>

Upvotes: 1

Related Questions