rookie
rookie

Reputation: 426

Spring kafka embedded testing

I am writing a junit test case using embedded kafka. We have a pipeline where producer > topic > consumer > do work() > produce . I am using a third party schema registry(a mock of this is used for my test by giving a fake url ) and the specific serdes tied to it .After discussing this on kafka user group the way to do this was to use a a mock registry to serialize the data manually and pass byte[] itself in the producer instead of the avro record . How ever my consumer will fail in this case since it is expecting a specific record payload.Any ideas on how to workaround this?

//Listener method


    */
        @KafkaListener(topics = test1,id="tesId1")
        public void onMessage(@Payload Log log, 
                @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
                @Header(KafkaHeaders.OFFSET) Long offset) throws Exception
                 {

               }


    // test class
    @RunWith(SpringRunner.class)
    @SpringBootTest
    @DirtiesContext
    @EmbeddedKafka(topics = { "test1" })
    @TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
    public class ConsumerTests {

    }

Upvotes: 2

Views: 7373

Answers (1)

Gary Russell
Gary Russell

Reputation: 174789

Just use a raw KafkaTemplate (no generics) and a byte array serializer.

For example, with JSON and a StringSerializer:

@SpringBootApplication
public class So53179695Application {

    public static void main(String[] args) {
        SpringApplication.run(So53179695Application.class, args);
    }

    @Bean
    public RecordMessageConverter converter() {
        return new StringJsonMessageConverter();
    }

    @KafkaListener(id = "foo", topics = "foo")
    public void listen(Foo in) {
        System.out.println(in);
    }

    public static class Foo {

        private String bar;

        public Foo() {
            super();
        }

        Foo(String bar) {
            this.bar = bar;
        }

        public String getBar() {
            return this.bar;
        }

        public void setBar(String bar) {
            this.bar = bar;
        }

        @Override
        public String toString() {
            return "Foo [bar=" + this.bar + "]";
        }

    }

}

and

@RunWith(SpringRunner.class)
@SpringBootTest
public class So53179695ApplicationTests {

    @ClassRule
    public static EmbeddedKafkaRule embeddedKafka = 
        new EmbeddedKafkaRule(1, false, "foo");

    @BeforeClass
    public static void setup() {
        System.setProperty("spring.kafka.bootstrap-servers",
            embeddedKafka.getEmbeddedKafka().getBrokersAsString());
    }

    @Autowired
    public KafkaTemplate<String, Foo> template;

    @SuppressWarnings("rawtypes")
    @Autowired
    public KafkaTemplate rawTemplate;

    @SuppressWarnings("unchecked")
    @Test
    public void test() throws Exception {
//      template.send("foo", new Foo("bar"));
        rawTemplate.send("foo", "{\"bar\":\"baz\"}");
        Thread.sleep(10_000);
    }

}

and

Foo [bar=baz]

Note that both templates point to the same physical object - it doesn't matter at runtime due to java's type erasure.

This assumes you are still using an Avro deserializer (or JSON in this example) on the consumer side.

Or you could use your mock deserializer on the consumer side to create a Log.

Upvotes: 2

Related Questions