
Reputation: 1025

Kafka schema registry RestClientException: Unauthorized; error code: 401

I am trying to read data from a kafka avro topic using the avro schema from the confluent client registry. I am using io.confluent library version 5.4.1. This is the entry in the gradle file

    compile (group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.4.1') {
        exclude group: 'org.apache.avro', module: 'avro'

I receive the following error.

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Unauthorized; error code: 401
public PCollection<KV<String, GenericRecord>> apply(Pipeline p) {
       ConfluentSchemaRegistryDeserializerProvider<GenericRecord> valDeserializerProvider =
                ConfluentSchemaRegistryDeserializerProvider.of(params.schemaUrl, "topic-value");

        PCollection<KafkaRecord<String, GenericRecord>> records = p.apply("GetDataFromKafka", KafkaIO.<String, GenericRecord>read()

        return records.apply("TopicAndDataInput", MapElements.via(new SimpleFunction<KafkaRecord<String, GenericRecord>, KV<String, GenericRecord>>() {
            public KV<String, GenericRecord> apply(KafkaRecord<String, GenericRecord> input) {
                String topic = input.getTopic();
                GenericRecord data = input.getKV().getValue();
                return KV.of(topic, data);

What am I missing here? Could someone point me in the right direction. Thanks in advance.

This is the function to get consumer properties

    public Map<String, Object> getConsumerProps() {
        Map<String, Object> props = new HashMap<> ();

        props.put("group.id", groupId);
        props.put("auto.offset.reset", "earliest");
        props.put("max.partition.fetch.bytes", 8388608);
        props.put("basic.auth.credentials.source", "USER_INFO");
        props.put("basic.auth.user.info", "registry_key:secret");
        props.put("sasl.jaas.config","org.apache.kafka.common.security.plain.PlainLoginModule required username='"+ apiKey +"' password='" + apiSecret +"';");
        return props;

Tried also with the following props and still get the same unauthorized error.

props.put("basic.auth.credentials.source", "USER_INFO");
props.put("schema.registry.basic.auth.user.info", "<registry key>:<value>");
props.put("schema.registry.url", schemaUrl);

Upvotes: 6

Views: 16993

Answers (2)

Konstantin B.
Konstantin B.

Reputation: 485

I used to have the same issue with libraries 5.3.0. I resolved it updating to


I am using the following props to connect Schema Registry:

"schema.registry.url": "<URL>"
"schema.registry.basic.auth.credentials.source": "USER_INFO"
"schema.registry.basic.auth.user.info": "<API_KEY>:<API_SECRET>"

Upvotes: 4

Gerard Klijs
Gerard Klijs

Reputation: 401

Seems like you should add the specific schema registry key and secret as

props.put("schema.registry.basic.auth.user.info", "<SCHEMA_REGISTRY_API_KEY>:<SCHEMA_REGISTRY_API_SECRET>");

to the properties. (from https://docs.confluent.io/cloud/current/cp-component/streams-cloud-config.html)

Upvotes: 0

Related Questions