Reputation: 111
We are building an application which will be using Kafka Streams. We are looking for sample example which shows us how to write a test case for a scenario, where we are calling an external service from Kafka topology. Basically that external call needs to be Mocked somehow, as service might not be running always. We are using TopologyTestDriver for writing test case. Due to this external call our test case is not executing. Getting error : org.springframework.web.client.ResourceAccessException: I/O error on POST request for "http://localhost:8080/endPointName": Connection refused: connect; nested exception is java.net.ConnectException: Connection refused: connect
Sample code for which we want to write test case:
@Bean
public RestTemplate restTemplate() {
return new RestTemplate();
}
public void method(StreamsBuilder builder) {
builder.stream(inTopic,Consumed.with(StreamsSerdes.String(),new StreamsSerdes.CustomSerde()))
.peek((s, customObj) -> LOG.info(customObj))
.mapValues(this::getResult)
.peek((s, result) -> LOG.info(result))
.to(outputTopic,Produced.with(StreamsSerdes.String(),new ResultSerde()));
}
private Result getResult(Custom customObj) {
HttpHeaders headers = new HttpHeaders();
headers.setAccept(Collections.singletonList(MediaType.APPLICATION_JSON));
HttpEntity<Custom> request = new HttpEntity<>(customObj, headers);
return restTemplate.postForEntity(restCompleteUri, request, Result.class).getBody();
}
Sample Test Case Example:
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
@SpringBootTest
public class TopologyTest {
private static TopologyTestDriver topologyTestDriver;
private static final Logger LOG = LogManager.getLogger();
@Autowired
private ConfigurableApplicationContext appContext;
@BeforeAll
void setUp() {
Properties properties = getProperties();
StreamsBuilder builder = new StreamsBuilder();
appContext.getBean(PublisherSubscriberTopology.class).withBuilder(builder);
Topology topology = builder.build();
topologyTestDriver = new TopologyTestDriver(topology, properties);
}
private Properties getProperties() {
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "test:9092");
properties.put(StreamsConfig.DEFAULT_DESERIALIZATION_EXCEPTION_HANDLER_CLASS_CONFIG, LogDeserializationExceptionHandler.class.getName());
properties.put(StreamsConfig.DEFAULT_PRODUCTION_EXCEPTION_HANDLER_CLASS_CONFIG, CustomProductionExceptionHandler.class.getName());
return properties;
}
@Test
void testAppFlow() {
Custom customObj = getCustomObj();
Result result = getResult();
ConsumerRecordFactory<String, Custom> resultFactory =
new ConsumerRecordFactory<>(inTopic,
StreamsSerdes.String().serializer(), StreamsSerdes.CustomSerde().serializer());
topologyTestDriver.pipeInput(resultFactory.create(
inTopic,
"1001",
customObj
));
ProducerRecord<String, Result> record =
topologyTestDriver.readOutput(
outputTopic,
StreamsSerdes.String().deserializer(),
StreamsSerdes.ResultSerde().deserializer()
);
assertAll(() -> assertEquals("abc123", record.value().getABC()));
}
private Custom getCustomObj() {
Custom customObj = new Custom();
//setting customObj
return customObj;
}
private Result getResult() {
Result result = new Result();
//setting resultObj
return result;
}
@AfterAll
static void tearDown() {
try {
topologyTestDriver.close();
} catch (Exception e) {
LOG.error(e.getMessage());
}
}
}
Upvotes: 3
Views: 2868
Reputation: 42551
In this particular case Consider Refactoring of the existing code - abstract out the call to HTTP to some interface and mock it. Since you're using spring anyway, inject the bean that will work with HTTP, and instead of invoking
public void method(StreamsBuilder builder) {
builder.stream(inTopic,Consumed.with(StreamsSerdes.String(),new StreamsSerdes.CustomSerde()))
.peek((s, customObj) -> LOG.info(customObj))
.mapValues(this::getResult)
.peek((s, result) -> LOG.info(result))
.to(outputTopic,Produced.with(StreamsSerdes.String(),new ResultSerde()));
}
private Result getResult(Custom customObj) {
... HTTP call here ...
}
Use something like this:
class Whatever {
@Autowired
private HttpFacade httpFacade;
public void method(StreamsBuilder builder) {...}
private Result getResult(Custom customObj) {
// httpFacade is int
httpFacade.callRemoteService(customObj);
}
}
@Component
class HttpFacade {
public ABC callRemoteService(CustomObj) {
... here comes the code that works with HttpClient
}
}
With this setup you can mock out the HttpFacade in the test (by using @MockBean or in plain mockito if you're running unit test without spring) And specify the expectations.
This is for you concrete case.
In general, if you have to test that the Http Request is populated with right URL, headers, body, etc. You can use WireMock.
For Kafka Streams, since it's a client library for Kafka, you can start up Kafka docker Test Container (and maybe Zookeeper) before the test, set it up to create the required topics and you're good to go.
This makes sense in case you want to test real kafka interaction and really want to make sure that the message gets to the kafka topic, then gets consumed by your consumer, etc, in other words, more complicated cases.
If you are using spring kafka, there is also an option to use Embedded Kafka, but I'm not really sure whether it will work with kafka streams - you should try, but at least its a "viable" direction
Update (based on op's comment):
When using mock bean in the spring driven test you will have to specify expectations on that bean:
@SpringBootTest // or whatever configuration you have to run spring driven test here
public class MyTest {
@MockBean
private HttpFacade httpFacade;
@Test
public void myTest() {
Mockito.when(httpFacade).callRemoteService(eq(<YOUR_EXPECTED_CUSTOM_OBJECT_1>)).thenReturn(<EXPECTED_RESPONSE_1);
... its possible to specify as many expectations as you wish...
... run the test code here that you probably already have...
}
}
The point is that you don't really need to make HTTP calls to test your kafka streams code!
Upvotes: 1
Reputation: 192023
The problem is not related to Kafka Streams. You're depending on an HTTP Client in the middle of the topology (not recommended, by the way), so you need to be asking how you would test that.
You need to inject a mocked RestTemplate
and restCompleteUri
variable to communicate with some fake HTTP endpoint.
For example, WireMock or see these posts
Upvotes: 0