SmartTechie
SmartTechie

Reputation: 145

SpringBoot Embedded Kafka to produce Event using Avro Schema

I have created the below test class to produce an event using AvroSerializer.

@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
@TestPropertySource(locations = ("classpath:application-test.properties"))
@ContextConfiguration(classes = { TestAppConfig.class })
@DirtiesContext
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
class EntitlementEventsConsumerServiceImplTest {


    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;

    @Bean
    MockSchemaRegistryClient mockSchemaRegistryClient() {
        return new MockSchemaRegistryClient();
    }

    @Bean
    KafkaAvroSerializer kafkaAvroSerializer() {
        return new KafkaAvroSerializer(mockSchemaRegistryClient());
    }

    @Bean
    public DefaultKafkaProducerFactory producerFactory() {
        Map<String, Object> props = KafkaTestUtils.producerProps(embeddedKafkaBroker);
        props.put(KafkaAvroSerializerConfig.AUTO_REGISTER_SCHEMAS, false);
        return new DefaultKafkaProducerFactory(props, new StringSerializer(), kafkaAvroSerializer());
    }

    @Bean
    public KafkaTemplate<String, ApplicationEvent> kafkaTemplate() {
        KafkaTemplate<String, ApplicationEvent> kafkaTemplate = new KafkaTemplate(producerFactory());
        return kafkaTemplate;
    }
}

But when I send an event using kafkaTemplate().send(appEventsTopic, applicationEvent);I am getting the below exception.

Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema Not Found; error code: 404001
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getIdFromRegistry(MockSchemaRegistryClient.java:79)
    at io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient.getId(MockSchemaRegistryClient.java:273)
    at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:82)
    at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
    at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:902)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:862)
    at org.springframework.kafka.core.DefaultKafkaProducerFactory$CloseSafeProducer.send(DefaultKafkaProducerFactory.java:781)
    at org.springframework.kafka.core.KafkaTemplate.doSend(KafkaTemplate.java:562)
    at org.springframework.kafka.core.KafkaTemplate.send(KafkaTemplate.java:363)

When I use MockSchemaRegistryClient why it is trying to lookup the schema?

Upvotes: 6

Views: 5953

Answers (2)

shashi ranjan
shashi ranjan

Reputation: 437

The Avro schema registry supports a "mock" pseudo-protocol:

schema.registry.url= mock://localhost.something

Basically anything with mock as prefix will do the job.

Refer to the source code for AbstractKafkaAvroSerDeConfig:

  public static final String SCHEMA_REGISTRY_URL_DOC =
      "Comma-separated list of URLs for schema registry instances that can be used to register "
      + "or look up schemas. "
      + "If you wish to get a connection to a mocked schema registry for testing, "
      + "you can specify a scope using the 'mock://' pseudo-protocol. For example, "
      + "'mock://my-scope-name' corresponds to "
      + "'MockSchemaRegistry.getClientForScope(\"my-scope-name\")'.";

Also set:

auto.register.schemas=true

Upvotes: 10

Ran Lupovich
Ran Lupovich

Reputation: 1831

You are setting the producer not to try and auto register new schema on producing the message , so it just trying to fetch from the SR and did not find its schema on the SR.


also did not see you setup schema registry URL guess its taking default values


To your question the mock is imitating the work of real schema registry, but has its clear disadvantages

/**

  • Mock implementation of SchemaRegistryClient that can be used for tests. This version is NOT
  • thread safe. Schema data is stored in memory and is not persistent or shared across instances. */

You may look on the document for more information

https://github.com/confluentinc/schema-registry/blob/master/client/src/main/java/io/confluent/kafka/schemaregistry/client/MockSchemaRegistryClient.java#L47

Upvotes: 0

Related Questions