Reputation: 259
I'm quite new to Camel and despite having read the Apache Camel documentation, I'm stuck on what I hope is a trivial issue which I've overlooked.
I have a Spring Boot application that defines a Camel route which consumes live prices in csv format from an HTTP call, converts the CSV to a POJO (LivePrice) using Bindy, then persists the data to a store.
Here's the route definition:
@Component
public class LivePricesCSVRoute extends RouteBuilder {
@Override
public void configure() throws Exception {
from("kafka:" + "{{kafka.topic.live.prices.csv}}" + "{{kafka.broker.location}}")
.routeId("live.prices-persistence-route")
.transacted()
.unmarshal()
.bindy(BindyType.Csv, LivePrice.class).id("convertToCsv")
.process(exchange -> {
List<LivePrice> object = (List<LivePrice>) exchange.getIn().getBody();
object.remove(0); // omit the header
logger.info(object);
})
.bean("livePriceServiceImpl", "populateLivePrices").id("populateLivePrices");
}
}
I want to create an integration test for this route where I supply a test csv file containing two lines and a header as an input rather than expecting messages on the topic kafka.topic.live.prices.csv.
Date,Symbol,Open,
2019-07-09,BTCUSD,12347.18
2019-07-08,BTCUSD,11475.07
I also want to intercept the Exchange before it is persisted and send it to the endpoint mock:output where I can perform assertions.
Here's the test I've written:
@RunWith(CamelSpringBootRunner.class)
@SpringBootTest
@MockEndpoints
@UseAdviceWith
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class LivePricesPersistenceRouteTest {
@Autowired
CamelContext camelContext;
@EndpointInject(uri = "mock:output")
private MockEndpoint mockOutput;
@Test
public void testSendLivePricesCsvToTopic() throws Exception {
camelContext.getRouteDefinition("live-prices-persistence-route")
.adviceWith(camelContext, new AdviceWithRouteBuilder() {
@Override
public void configure() throws Exception {
replaceFromWith("file://testCsvFile.csv");
intercept()
.to("mock:output");
}
});
camelContext.start();
Exchange exchange = mockOutput.assertExchangeReceived(0);
List<LivePrice> livePrices = (List<LivePrice>)exchange.getIn().getBody();
assertThat(livePrices.get(0).getDate(), is("2019-07-09"));
// TODO ADD MORE ASSERTIONS
mockOutput.assertIsSatisfied();
}
}
When I run the test, the following is logged by Camel:
2019-07-13 14:35:16.587 INFO 90356 --- [ main] org.apache.camel.model.RouteDefinition : Adviced route before/after as XML:
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<route xmlns="http://camel.apache.org/schema/spring" customId="true" id="live-prices-persistence-route">
<from uri="kafka:{{kafka.topic.live.prices.csv}}{{kafka.broker.location}}"/>
<transacted>
<unmarshal customId="true" id="convertToCsv">
<bindy type="Csv"/>
</unmarshal>
<process/>
<bean customId="true" id="populateLivePrices" method="populateLivePrices" ref="livePriceServiceImpl"/>
</transacted>
</route>
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<route xmlns="http://camel.apache.org/schema/spring" customId="true" id="live-prices-persistence-route">
<from uri="file://testCsvFile.csv"/>
<intercept>
<to uri="mock:output"/>
</intercept>
<transacted>
<unmarshal customId="true" id="convertToCsv">
<bindy type="Csv"/>
</unmarshal>
<process/>
<bean customId="true" id="populateLivePrices" method="populateLivePrices" ref="livePriceServiceImpl"/>
</transacted>
</route>
However the test fails with the following output:
java.lang.AssertionError: mock://output Not enough messages received. Was: 0
at org.apache.camel.component.mock.MockEndpoint.fail(MockEndpoint.java:1494)
at org.apache.camel.component.mock.MockEndpoint.assertTrue(MockEndpoint.java:1482)
at org.apache.camel.component.mock.MockEndpoint.assertExchangeReceived(MockEndpoint.java:1078)
at com.xxx.liveprices.routes.LivePricesPersistenceRouteTest.testSendLivePricesCsvToTopic(LivePricesPersistenceRouteTest.java:78)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.springframework.test.context.junit4.statements.RunBeforeTestExecutionCallbacks.evaluate(RunBeforeTestExecutionCallbacks.java:74)
at org.springframework.test.context.junit4.statements.RunAfterTestExecutionCallbacks.evaluate(RunAfterTestExecutionCallbacks.java:84)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)
Can anyone please guide me as to why the file isn't being used and why the Exchange isn't being intercepted and sent to the mock endpoint?
Upvotes: 0
Views: 4743
Reputation: 2317
I know it's an old question but in case anyone else runs into the same issue...
I looked at the implementation of MockEndpoint.assertExchangeReceived()
. This method does NOT have any wait mechanism built in, unlike MockEndpoint.assertIsSatisfied()
, which will wait for a configurable amount of time to receive the expected number of exchanges (configurable via resultWaitTime
). The microsecond it takes to go from camelContext.start()
to mockOutput.assertExchangeReceived(0)
is clearly not enough for Camel to process your input file.
You almost got it right but the position of assertIsSatisfied()
is too late and an actual expectation is missing. Here is what I think should work:
mockOutput.expectedMessageCount(1);
camelContext.start();
mockOutput.assertIsSatisfied();
// if the statement above passes, the endpoint is guaranteed
// to have received at least one exchange
Exchange exchange = mockOutput.assertExchangeReceived(0);
List<LivePrice> livePrices = (List<LivePrice>)exchange.getIn().getBody();
assertThat(livePrices.get(0).getDate(), is("2019-07-09"));
// TODO ADD MORE ASSERTIONS
Upvotes: 0
Reputation: 259
After much more reading, I was still unable to determine why the following code didn't read and swap out my input data:
replaceFromWith("file://testCsvFile.csv");
I opted instead for supplying the CSV file's contents as a String and using weaveById to replace the input data.
Here's the test with my objective accomplished:
@RunWith(CamelSpringBootRunner.class)
@SpringBootTest
@MockEndpoints
@UseAdviceWith
@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)
public class LivePricesPersistenceRouteTest {
@Autowired
CamelContext camelContext;
@Autowired
ProducerTemplate producerTemplate;
@EndpointInject(uri = "mock:output")
private MockEndpoint mockOutput;
@Test
public void testSendLivePricesCsvToTopic() throws Exception {
camelContext.getRouteDefinition("live-prices-persistence-route")
.adviceWith(camelContext, new AdviceWithRouteBuilder() {
@Override
public void configure() throws Exception {
replaceFromWith("direct:test");
weaveById("populateLivePrices").replace().inOut("mock:output");
}
});
camelContext.start();
String message = "Date,Symbol,Open,\n" +
"2019-07-09,BTCUSD,12347.18\n" +
"2019-07-08,BTCUSD,11475.07";
producerTemplate.sendBody("direct:test", message);
Exchange exchange = mockOutput.assertExchangeReceived(0);
List<LivePrice> livePrices = (List<LivePrice>)exchange.getIn().getBody();
assertThat(livePrices.get(0).getDate(), is("2019-07-09"));
assertThat(livePrices.get(0).getOpen(), is("12347.18"));
assertThat(livePrices.get(1).getDate(), is("2019-07-08"));
assertThat(livePrices.get(1).getOpen(), is("11475.07"));
mockOutput.assertIsSatisfied();
}
}
Upvotes: 1