Reputation: 1
Curling this https://right-boa-11231-eu1-rest-kafka.upstash.io/schema-registry/schemas/ids/8?fetchMaxId=false&subject=test1-value returns
{"schema": "{\"type\":\"record\",\"name\":\"User\",\"namespace\":\"fr.potato\",\"fields\":[{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"favorite_number\",\"type\":\"long\"},{\"name\":\"timestamp_string\",\"type\":\"string\"}]}"}
hence it seems like the problem is with my Kstreams app and not the schema registry. I have tried every configuration under the sun, but I am still getting this exception.
2024-07-15 12:50:00 DEBUG StreamThread:1201 - stream-thread [enrichement-app-14-7e65669d-662f-44a6-b47a-30af74085b4e-StreamThread-1] Main Consumer poll completed in 133 ms and fetched 1 records from partitions [test1-0]
2024-07-15 12:50:00 DEBUG RestService:292 - Sending GET with input null to https://right-boa-11231-eu1-rest-kafka.upstash.io/schema-registry/schemas/ids/8?fetchMaxId=false&subject=test1-value
2024-07-15 12:50:00 ERROR LogAndFailExceptionHandler:39 -
Exception
caught during Deserialization, taskId: 0_0, topic: test1, partition: 0, offset: 0
org.apache.kafka.common.errors.SerializationException
: Error retrieving Avro value schema for id 8
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:805) ~[kafka-schema-serializer-7.6.1.jar:?]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:415) ~[kafka-avro-serializer-7.6.1.jar:?]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserialize(AbstractKafkaAvroDeserializer.java:188) ~[kafka-avro-serializer-7.6.1.jar:?]
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:107) ~[kafka-avro-serializer-7.6.1.jar:?]
at io.confluent.kafka.serializers.KafkaAvroDeserializer.deserialize(KafkaAvroDeserializer.java:102) ~[kafka-avro-serializer-7.6.1.jar:?]
at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:63) ~[kafka-streams-avro-serde-5.2.1.jar:?]
at io.confluent.kafka.streams.serdes.avro.GenericAvroDeserializer.deserialize(GenericAvroDeserializer.java:39) ~[kafka-streams-avro-serde-5.2.1.jar:?]
at org.apache.kafka.common.serialization.Deserializer.deserialize(Deserializer.java:62) ~[kafka-clients-3.7.1.jar:?]
at org.apache.kafka.streams.processor.internals.SourceNode.deserializeValue(SourceNode.java:58) ~[kafka-streams-3.7.1.jar:?]
at org.apache.kafka.streams.processor.internals.RecordDeserializer.deserialize(RecordDeserializer.java:66) [kafka-streams-3.7.1.jar:?]
at org.apache.kafka.streams.processor.internals.RecordQueue.updateHead(RecordQueue.java:204) [kafka-streams-3.7.1.jar:?]
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:128) [kafka-streams-3.7.1.jar:?]
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:285) [kafka-streams-3.7.1.jar:?]
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:1039) [kafka-streams-3.7.1.jar:?]
at org.apache.kafka.streams.processor.internals.TaskManager.addRecordsToTasks(TaskManager.java:1782) [kafka-streams-3.7.1.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:1208) [kafka-streams-3.7.1.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runOnceWithoutProcessingThreads(StreamThread.java:909) [kafka-streams-3.7.1.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:686) [kafka-streams-3.7.1.jar:?]
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:645) [kafka-streams-3.7.1.jar:?]
Caused by:
io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException
: null; error code: 0
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:336) ~[kafka-schema-registry-client-7.6.1.jar:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:409) ~[kafka-schema-registry-client-7.6.1.jar:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:916) ~[kafka-schema-registry-client-7.6.1.jar:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:900) ~[kafka-schema-registry-client-7.6.1.jar:?]
at io.confluent.kafka.schemaregistry.client.rest.RestService.getId(RestService.java:880) ~[kafka-schema-registry-client-7.6.1.jar:?]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaByIdFromRegistry(CachedSchemaRegistryClient.java:333) ~[kafka-schema-registry-client-7.6.1.jar:?]
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getSchemaBySubjectAndId(CachedSchemaRegistryClient.java:464) ~[kafka-schema-registry-client-7.6.1.jar:?]
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer$DeserializationContext.schemaFromRegistry(AbstractKafkaAvroDeserializer.java:398) ~[kafka-avro-serializer-7.6.1.jar:?]
... 17 more
this is my app
package fr.potato;
import java.util.Collections;
import java.util.Properties;
import java.util.Map;
import org.apache.avro.generic.GenericRecord;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.errors.LogAndContinueExceptionHandler;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;
import io.confluent.kafka.serializers.AbstractKafkaSchemaSerDeConfig;
import io.confluent.kafka.streams.serdes.avro.GenericAvroSerde;
import io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
public class Enrichement {
static String auth = "username:pw";
static String sourceTopicName = "test1";
static String targetTopicName = "test2";
static String schemaRegistryUrl = "https://right-boa-11231-eu1-rest-kafka.upstash.io/schema-registry";
private static final Logger logger = LogManager.getLogger(Enrichement.class);
// @SuppressWarnings("deprecation")
public static void main(String[] args) {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "enrichement-app-14");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "https://right-boa-11231-eu1-kafka.upstash.io:9092");
props.put("sasl.mechanism", "SCRAM-SHA-256");
props.put("security.protocol", "SASL_SSL");
props.put("sasl.jaas.config",
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"username\" password=\"pw\";");
props.put("basic.auth.credentials.source", "USER_INFO");
props.put("basic.auth.user.info", auth);
// https://right-boa-11231-eu1-rest-kafka.upstash.io/schema-registry/schemas/ids/8?fetchMaxId=false&subject=test1-value
props.put("schema.registry.url", "right-boa-11231-eu1-rest-kafka.upstash.io/schema-registry");
props.put("debug", "true");
props.put("ssl.endpoint.identification.algorithm", "https");
props.put("key.converter", "org.apache.kafka.connect.storage.StringConverter");
props.put("value.converter", "io.confluent.connect.avro.AvroConverter");
props.put("value.converter.schema.registry.url", "right-boa-11231-eu1-rest-kafka.upstash.io/schema-registry");
props.put("key.converter.basic.auth.credentials.source", "USER_INFO");
props.put("key.converter.basic.auth.user.info", auth);
props.put("value.converter.basic.auth.credentials.source", "USER_INFO");
props.put("value.converter.basic.auth.user.info", auth);
props.put("auto.register.schemas", false);
props.put("use.latest.version", true);
final Map<String, String> serdeConfig = Collections.singletonMap(
"schema.registry.url", schemaRegistryUrl);
final Serde<GenericRecord> valueGenericAvroSerde = new GenericAvroSerde();
valueGenericAvroSerde.configure(serdeConfig, false); // `false` for record values
StreamsBuilder builder = new StreamsBuilder();
KStream<String, GenericRecord> inputStream = builder.stream(sourceTopicName,
Consumed.with(Serdes.String(), valueGenericAvroSerde));
inputStream
.peek((key, value) -> System.out.println("Key: " + key + ", Value: " + value))
.to(targetTopicName, Produced.with(Serdes.String(), valueGenericAvroSerde));
try {
KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.setUncaughtExceptionHandler((Thread thread, Throwable throwable) -> {
logger.error("Uncaught exception in thread " + thread, throwable);
});
streams.start();
System.out.println("Kafka Streams app started successfully.");
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
} catch (Exception e) {
System.err.println("Error starting Kafka Streams app: " + e.getMessage());
e.printStackTrace();
}
}
}
Upvotes: 0
Views: 90
Reputation: 39
It may be to late however I think you forgot to give a full URL of schema registry
Upvotes: 0