Reputation: 37
I have a multi module maven project with several modules (parent, service, updater1, updater2). The @SpringBootApplication is in 'service' module and the others doesn't have artifacts.
'updater1' is a module which have a Kafka listener and a http client, and when receives a kafka event launches a request to an external API. I want to create integration tests in this module with testcontainers
, so I've created the containers and a Kafka producer to send a KafkaTemplate to my consumer.
My problem is the Kafka producer is autowiring to null, so the tests throws a NullPointerException. I think it should be a Spring configuration problem, but I can't find the problem. Can you help me? Thank's!
This is my test class:
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = {KafkaConfiguration.class, CacheConfiguration.class, ClientConfiguration.class})
public class InvoicingTest {
@ClassRule
public static final Containers containers = Containers.Builder.aContainer()
.withKafka()
.withServer()
.build();
private final MockHttpClient mockHttpClient =
new MockHttpClient(containers.getHost(SERVER),
containers.getPort(SERVER));
@Autowired
private KafkaEventProducer kafkaEventProducer;
@BeforeEach
@Transactional
void setUp() {
mockHttpClient.reset();
}
@Test
public void createElementSuccesfullResponse() throws ExecutionException, InterruptedException, TimeoutException {
mockHttpClient.whenPost("/v1/endpoint")
.respond(HttpStatusCode.OK_200);
kafkaEventProducer.produce("src/test/resources/event/invoiceCreated.json");
mockHttpClient.verify();
}
And this is the event producer:
@Component
public class KafkaEventProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
private final String topic;
@Autowired
KafkaInvoicingEventProducer(KafkaTemplate<String, String> kafkaTemplate,
@Value("${kafka.topic.invoicing.name}") String topic){
this.kafkaTemplate = kafkaTemplate;
this.topic = topic;
}
public void produce(String event){
kafkaTemplate.send(topic, event);
}
}
Upvotes: 0
Views: 889
Reputation: 423
You haven't detailed how KafkaEventProducer
is implemented (is it a @Component
?), neither your test class is annotated with @SpringBootTest
and the runner @RunWith
.
Check out this sample, using Apache KakfaProducer:
import org.apache.kafka.clients.producer.KafkaProducer;
public void sendRecord(String topic, String event) {
try (KafkaProducer<String, byte[]> producer = new KafkaProducer<>(producerProps(bootstrapServers, false))) {
send(producer, topic, event);
}
}
where
public void send(KafkaProducer<String, byte[]> producer, String topic, String event) {
try {
ProducerRecord<String, byte[]> record = new ProducerRecord<>(topic, event.getBytes());
producer.send(record).get();
} catch (InterruptedException | ExecutionException e) {
fail("Not expected exception: " + e.getMessage());
}
}
protected Properties producerProps(String bootstrapServer, boolean transactional) {
Properties producerProperties = new Properties();
producerProperties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
producerProperties.put(KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProperties.put(VALUE_SERIALIZER_CLASS_CONFIG, ByteArraySerializer.class.getName());
if (transactional) {
producerProperties.put(TRANSACTIONAL_ID_CONFIG, "my-transactional-id");
}
return producerProperties;
}
and bootstrapServers
is taken from kafka container:
KafkaContainer kafka = new KafkaContainer();
kafka.start();
bootstrapServers = kafka.getBootstrapServers();
Upvotes: 1