Reputation: 426
I am writing a junit test case using embedded kafka. We have a pipeline where producer > topic > consumer > do work() > produce . I am using a third party schema registry(a mock of this is used for my test by giving a fake url ) and the specific serdes tied to it .After discussing this on kafka user group the way to do this was to use a a mock registry to serialize the data manually and pass byte[] itself in the producer instead of the avro record . How ever my consumer will fail in this case since it is expecting a specific record payload.Any ideas on how to workaround this?
//Listener method
*/
@KafkaListener(topics = test1,id="tesId1")
public void onMessage(@Payload Log log,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) Long offset) throws Exception
{
}
// test class
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(topics = { "test1" })
@TestPropertySource(properties = { "spring.kafka.bootstrap-servers=${spring.embedded.kafka.brokers}" })
public class ConsumerTests {
}
Upvotes: 2
Views: 7373
Reputation: 174789
Just use a raw KafkaTemplate
(no generics) and a byte array serializer.
For example, with JSON and a StringSerializer
:
@SpringBootApplication
public class So53179695Application {
public static void main(String[] args) {
SpringApplication.run(So53179695Application.class, args);
}
@Bean
public RecordMessageConverter converter() {
return new StringJsonMessageConverter();
}
@KafkaListener(id = "foo", topics = "foo")
public void listen(Foo in) {
System.out.println(in);
}
public static class Foo {
private String bar;
public Foo() {
super();
}
Foo(String bar) {
this.bar = bar;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
@Override
public String toString() {
return "Foo [bar=" + this.bar + "]";
}
}
}
and
@RunWith(SpringRunner.class)
@SpringBootTest
public class So53179695ApplicationTests {
@ClassRule
public static EmbeddedKafkaRule embeddedKafka =
new EmbeddedKafkaRule(1, false, "foo");
@BeforeClass
public static void setup() {
System.setProperty("spring.kafka.bootstrap-servers",
embeddedKafka.getEmbeddedKafka().getBrokersAsString());
}
@Autowired
public KafkaTemplate<String, Foo> template;
@SuppressWarnings("rawtypes")
@Autowired
public KafkaTemplate rawTemplate;
@SuppressWarnings("unchecked")
@Test
public void test() throws Exception {
// template.send("foo", new Foo("bar"));
rawTemplate.send("foo", "{\"bar\":\"baz\"}");
Thread.sleep(10_000);
}
}
and
Foo [bar=baz]
Note that both templates point to the same physical object - it doesn't matter at runtime due to java's type erasure.
This assumes you are still using an Avro deserializer (or JSON in this example) on the consumer side.
Or you could use your mock deserializer on the consumer side to create a Log
.
Upvotes: 2