Reputation: 145
I have created the below test class to produce an event using AvroSerializer.
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
@TestPropertySource(locations = ("classpath:application-test.properties"))
@ContextConfiguration(classes = { TestAppConfig.class })
@DirtiesContext
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class EntitlementEventsConsumerServiceImplTest {
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
@Bean
MockSchemaRegistryClient mockSchemaRegistryClient() {
return new MockSchemaRegistryClient();
}
@Bean
KafkaAvroSerializer kafkaAvroSerializer() {
return new KafkaAvroSerializer(mockSchemaRegistryClient());
}
@Bean
public DefaultKafkaProducerFactory producerFactory() {
Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafkaBroker);
props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
return new DefaultKafkaProducerFactory(props, new StringSerializer(), kafkaAvroSerializer());
}
@Bean
public KafkaTemplate<String, ApplicationEvent> kafkaTemplate() {
KafkaTemplate<String, ApplicationEvent> kafkaTemplate = new KafkaTemplate(producerFactory());
return kafkaTemplate;
}
}
But when I send an event using kafkaTemplate().send(appEventsTopic, applicationEvent);
I am getting the below exception.
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema Not Found; error code: 404001
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getIdFromRegistry(MockSchemaRegistryClient.java:79)
at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getId(MockSchemaRegistryClient.java:273)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:781)
at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:562)
at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363)
When I use MockSchemaRegistryClient why it is trying to lookup the schema?
Upvotes: 6
Views: 5953
Reputation: 437
The Avro schema registry supports a "mock" pseudo-protocol:
schema.registry.url= mock://localhost.something
Basically anything with mock as prefix will do the job.
Refer to the source code for AbstractKafkaAvroSerDeConfig
:
public static final String SCHEMA_REGISTRY_URL_DOC = "Comma-separated list of URLs for schema registry instances that can be used to register " + "or look up schemas. " + "If you wish to get a connection to a mocked schema registry for testing, " + "you can specify a scope using the 'mock://' pseudo-protocol. For example, " + "'mock://my-scope-name' corresponds to " + "'MockSchemaRegistry.getClientForScope(\"my-scope-name\")'.";
Also set:
auto.register.schemas=true
Upvotes: 10
Reputation: 1831
You are setting the producer not to try and auto register new schema on producing the message , so it just trying to fetch from the SR and did not find its schema on the SR.
also did not see you setup schema registry URL guess its taking default values
To your question the mock is imitating the work of real schema registry, but has its clear disadvantages
/**
You may look on the document for more information
Upvotes: 0