Reputation: 87
I have the service, that sending message
@Service
class ExportTaskService {
@Autowired
private KafkaTemplate<String, Object> template;
public void exportNewTask(ImportTaskRequest req) {
template.send('my-topic-name', req)
}
}
I configured beans: consumerFactory, producerFactory, kafkaTemplate (src/main/java)
If I run application, and execute metod -- all ok and message in real message broker.
Then I need spring test, that uses ExportTaskService.exportNewTask(request) and waiting message from same topic.
My code, but not working (i cant receive message):
@RunWith(SpringRunner.class)
@SpringBootTest
@DirtiesContext
@TestPropertySource(locations="classpath:test.properties")
@EnableKafka
@EmbeddedKafka(
topics = "new-bitrix-leads", ports = 9092
)
public class ExportingLeadTests {
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
ExportTaskService exportTaskService;
@Autowired
ConsumerFactory<String, Object> consumerFactory;
@Test
public void test() throws InterruptedException {
assert(embeddedKafkaBroker != null);
assert(exportTaskService != null);
Consumer<String, Object> consumer = consumerFactory.createConsumer();
consumer.subscribe(Collections.singletonList("new-bitrix-leads"));
exportTaskService.exportNewTask(ImportTaskRequest.builder()
.description("descr")
.title("title")
.build());
ConsumerRecords<String, Object> records = consumer.poll(Duration.ofSeconds(3));
assert (records.count() == 1);
}
}
How I can read this message ? What I need to do ? I have no ideas...
BIG TNX :) !!
Upvotes: 0
Views: 1505
Reputation: 87
My simple solution is:
@SpringBootTest
@DirtiesContext
@TestPropertySource(locations="classpath:test.properties")
@EnableKafka
@EmbeddedKafka(
topics = "new-bitrix-leads", ports = 9092
)
@TestInstance(TestInstance.Lifecycle.PER_CLASS)
public class ExportingLeadTests {
private BlockingQueue<ConsumerRecord<String, Object>> records;
private KafkaMessageListenerContainer<String, String> container;
@Autowired
private EmbeddedKafkaBroker embeddedKafkaBroker;
@Autowired
ExportTaskService exportTaskService;
@BeforeAll
void setUp() {
DefaultKafkaConsumerFactory<String, Object> consumerFactory = new DefaultKafkaConsumerFactory<>(getConsumerProperties());
ContainerProperties containerProperties = new ContainerProperties("new-bitrix-leads");
container = new KafkaMessageListenerContainer<>(consumerFactory, containerProperties);
records = new LinkedBlockingQueue<>();
container.setupMessageListener((MessageListener<String, Object>) e -> records.add(e));
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
}
@AfterAll
void tearDown() {
container.stop();
}
private Map<String, Object> getConsumerProperties() {
Map<String, Object> map = new HashMap<>();
map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafkaBroker.getBrokersAsString());
map.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer");
map.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
map.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "10");
map.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "60000");
map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
map.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return map;
}
@Test
public void test() throws InterruptedException {
exportTaskService.exportNewTask(ImportTaskRequest.builder()
.description("descr")
.title("title")
.gclid("gclid")
.id("id")
.name("my name")
.website("http://website.com")
.yclid("yclid")
.source("Source")
.build());
ConsumerRecord<String, Object> record = records.poll(5, TimeUnit.SECONDS);
assert (record != null);
assertThat(record.value().toString(), containsString("gclid"));
}
}
really works. nice
Upvotes: 2