Reputation: 6548
I have an a SpringBoot Application that is using Kafka with EhCache to perform Cache Synchronization among different MicroServices and Instances. I'm using SpringBoot 2.2.4 with the matching Kafka-Client version. How can I test that my Kafka Client is working correctly with an Embedded Kafka.
I've tried:
Test Class
@RunWith(SpringRunner.class)
@SpringBootTest()
@ActiveProfiles({"inmemory", "test", "kafka-test"})
@WebAppConfiguration
@DirtiesContext
public class CachePropagatorTest
{
private static final String topic = "com.allstate.d3.sh.test.cache";
//private static final String topic2 = "com.allstate.sh.test.alloc";
//@Rule
@ClassRule
public static final EmbeddedKafkaRule embeddedKafkaRule;
static
{
embeddedKafkaRule = new EmbeddedKafkaRule(1, true, topic);
embeddedKafkaRule
.getEmbeddedKafka().brokerListProperty("spring.kafka.bootstrap-servers");
}
//@Autowired
private EmbeddedKafkaBroker embeddedKafka;
@Autowired
KafkaTemplate<String, CacheMessage> kafkaTemplate;
@Autowired
KafkaSHProperties properties;
//@Autowired
@SpyBean
CachePropagator propagator;
//CachePropagationHelper propagator;
BlockingQueue<CacheMessage> records = new LinkedBlockingQueue<>();
/* read sent messages */
Consumer<Integer, CacheMessage> consumer;
private String topic1;
@Before
public void setUp() throws Exception
{
embeddedKafka = embeddedKafkaRule.getEmbeddedKafka();
topic1 = properties.getCacheTopic();
assertThat(topic1, is(topic));
//embeddedKafka.getEmbeddedKafka().addTopics(topic1);
try { embeddedKafka.addTopics(topic1); }
catch (KafkaException Ignored) { }
Mockito.doAnswer(new Answer<Void>()
{
@Override
public Void answer(InvocationOnMock invocation) throws Throwable
{
System.out.println("Cache Message Receive");
records.add((CacheMessage) invocation.getArgument(0));
return (Void)invocation.callRealMethod();
}
}).when(propagator).receive(ArgumentMatchers.any(),
ArgumentMatchers.anyString());
//prove raw template usage
CacheMessage cm = new CacheMessage("Test","Test","put",
true,"");
kafkaTemplate.send(topic1, cm);
Map<String, Object> consumerProps =
KafkaTestUtils.consumerProps(properties.getCacheConsumptionGroup(),
"false", embeddedKafka);
DefaultKafkaConsumerFactory<Integer, CacheMessage> cf =
new DefaultKafkaConsumerFactory<Integer, CacheMessage>(consumerProps);
consumer = cf.createConsumer();
embeddedKafka.consumeFromAllEmbeddedTopics(consumer);
Set<String> topics = embeddedKafka.getTopics();
assertThat(topics.size(),is(1) );
assertThat(topics,hasItem(topic1) );
//prove sent message received
ConsumerRecord<Integer, CacheMessage> received =
KafkaTestUtils.getSingleRecord(consumer, topic1, 30000);
assertThat(received.value(), is("Test"));
}
@After
public void tearDown() throws Exception { }
@Test
public void putExperiment() throws Exception
{
Date now = new Date();
JsonNode emptyNode = new ObjectMapper().readTree("");
List<BucketDetail> buckets = new ArrayList<>();
buckets.add(new BucketDetail("99-1", "Kafka Bucket 1",
0.5, emptyNode));
buckets.add(new BucketDetail("99-2", "Kafka Bucket 2",
0.5, emptyNode));
buckets.add(new BucketDetail());
ExperimentDetail exp = new ExperimentDetail("99", 1,
"KafkaTest",
"SH_TEST_PROFILE_9",
buckets, LifecycleStage.CONFIGURED,
now, null, "Mete Test Notes");
propagator.putExperiment(exp);
//TODO: test the allocation was correct
ConsumerRecord<Integer, CacheMessage> received =
KafkaTestUtils.getSingleRecord(consumer, topic1, 10000);
//TODO: how much should this verify in the message
assertThat(received.value().getAction(), is("put"));
assertThat(received.value().getItem().toString(),
containsString(exp.getExperimentID()));
}
}
Kafka in application-test.yml
spring:
kafka:
bootstrap-servers: localhost:2181
listener:
#add topics after start
missing-topics-fatal: false
properties:
sasl:
kerberos:
service:
name: kafka
security:
protocol: SASL_PLAINTEXT
consumer:
properties:
spring:
json:
trusted:
packages: com.allstate.d3.sh.commons.messaging
bootstrap-servers: localhost:2181
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
producer:
bootstrap-servers: localhost:2181
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
profiles:
active: inmemory,kafka-test
When run tests it fails in the Unit setup()
method with
java.lang.IllegalStateException: No records found for topic
at org.springframework.kafka.test.utils.KafkaTestUtils.getSingleRecord(KafkaTestUtils.java:187)
at com.allstate.d3.sh.execution.event.CachePropagatorTest.setUp(CachePropagatorTest.java:169)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:24)
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86)
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:251)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:97)
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61)
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70)
at org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:48)
at org.junit.rules.RunRules.evaluate(RunRules.java:20)
at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:190)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:106)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:66)
at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:117)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:155)
at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:137)
at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
at org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
at org.gradle.internal.concurrent.ManagedExecutorImpl$1.run(ManagedExecutorImpl.java:46)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at org.gradle.internal.concurrent.ThreadFactoryImpl$ManagedThreadRunnable.run(ThreadFactoryImpl.java:55)
at java.lang.Thread.run(Thread.java:748)
So why can't I find any sent records for the topic?
UPDATE
The Answer by @QuickSilver below pointed out running in parrallel.
Could my @SpyBean CachePropagator propagator;
be interfering with my tests.
Cache Propegator has a listener method defined like so:
@KafkaListener(topics = "#{kafkaSHProperties.cacheTopic}",
groupId = "#{kafkaSHProperties.cacheConsumptionGroup}")
public void receive(@Payload CacheMessage message,
@Header(KafkaHeaders.RECEIVED_MESSAGE_KEY) String key)
{
if (!envName.equals(message.getEnv())) { return; }
log.info("07e9d084-1b8c-4c4c-b9be-9e7bb2716c3c -- Cache sync message: {}, {}, {}",
key, message.getEnv(), message.getCacheName());
processMessage(message);
}
Could that be grabbing my messages? If so, shouldn't they still be available on the broker? If not is that a setting I can change?
Upvotes: 0
Views: 3792
Reputation: 4045
Can you please below items in your code,
Below test on my machine
@SpringBootTest
@RunWith(SpringRunner.class)
public class TestKafkaConfig {
@ClassRule
// By default it creates two partitions.
public static KafkaEmbedded embeddedKafka = new KafkaEmbedded(1, true, TOPIC_NAME);
private static String TOPIC_NAME = "testTopic";
@Test
public void testKafkaConfig() throws InterruptedException, ExecutionException {
Map<String, Object> senderProps = KafkaTestUtils.producerProps(embeddedKafka);
KafkaProducer<Integer, String> producer = new KafkaProducer<>(senderProps);
producer.send(new ProducerRecord<>(TOPIC_NAME, 0, 0, "ABC")).get();
producer.send(new ProducerRecord<>(TOPIC_NAME, 0, 1, "XYZ")).get();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps("testConsumer", "false", embeddedKafka);
consumerProps.put("auto.offset.reset", "earliest");
final List<String> receivedMessages = Lists.newArrayList();
final CountDownLatch latch = new CountDownLatch(2);
ExecutorService executorService = Executors.newSingleThreadExecutor();
executorService.execute(() -> {
KafkaConsumer<Integer, String> kafkaConsumer = new KafkaConsumer<>(consumerProps);
kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
try {
while (true) {
ConsumerRecords<Integer, String> records = kafkaConsumer.poll(100);
records.iterator().forEachRemaining(record -> {
receivedMessages.add(record.value());
latch.countDown();
});
}
} finally {
kafkaConsumer.close();
}
});
latch.await(10, TimeUnit.SECONDS);
assertTrue(receivedMessages.containsAll(Arrays.asList("ABC", "XYZ")));
}
}
Upvotes: 1