Reputation: 2103
I'm sending webhook push from a third party app as a message to kafka adpater for further processing by a downstream microservice. The integration flow is as follows
@Slf4j
@Configuration
@AllArgsConstructor(onConstructor = @__(@Autowired))
public class WebhookConsumerFlowConfiguration extends MessageHeadersConfiguration {
@Autowired
private final KafkaProducerMessageHandler<String, String> kafkaHandler;
@Bean
public SubscribableChannel httpInAdapterPubSubChannel() {
return MessageChannels.publishSubscribe("httpInAdapterPubSubChannel").get();
}
@Bean
public IntegrationFlow webhookConsumerFlow(@Autowired MessagingGatewaySupport webhookNotificationHandler) {
return IntegrationFlows.from(webhookNotificationHandler)
.channel(httpInAdapterPubSubChannel())
.handle(message -> kafkaHandler.handleMessage(message)).get();
}
}
and then in a separate configuration class for Kafka
@Slf4j
@Configuration
public class KafkaHandlerConfiguration extends MessageHeadersConfiguration {
@Value("${spring.kafka.bootstrap-servers:localhost:9092}")
private String bootstrapServers;
@Value("${spring.kafka.producer.key-serializer}")
private String kafkaKeySerializer;
@Value("${spring.kafka.producer.value-serializer}")
private String kafkaValueSerializer;
@Bean
public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler() {
KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
handler.setHeaderMapper(mapper());
handler.setLoggingEnabled(TRUE);
handler.setTopicExpression(
new SpelExpressionParser()
.parseExpression("headers['" + upstreamType + "'] + headers['" + upstreamTypeInstance + "']"));
handler.setMessageKeyExpression(new SpelExpressionParser().parseExpression("payload['key']"));
handler.setSendSuccessChannel(kafkaPublishSuccessChannel());
handler.setSendFailureChannel(kafkaFailuresChannel());
return handler;
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaKeySerializer);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaValueSerializer);
return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
}
private DefaultKafkaHeaderMapper mapper() {
return new DefaultKafkaHeaderMapper();
}
}
I had written a failing test for this flow which, well, is still failing :-)
@Test
public void testPushNotificationIsSavedToMongo(
@Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {
consumerRecords = new LinkedBlockingQueue<>();
ContainerProperties containerProperties = new ContainerProperties("alm_jira");
Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(
"test", "false", embeddedKafkaBroker);
consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
ConsumerFactory<String, String> consumer = new DefaultKafkaConsumerFactory<>(consumerProperties);
container = new KafkaMessageListenerContainer<>(consumer, containerProperties);
container.setupMessageListener((MessageListener<String, String>) record -> {
log.debug("Listened message='{}'", record.toString());
consumerRecords.add(record);
});
container.start();
ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());
//Post Payload to Gateway
String payloadJson = new String(Files.readAllBytes(Paths.get(jiraWebhookPayload.getFile().getAbsolutePath())));
HttpHeaders headers = new HttpHeaders();
headers.setContentType(APPLICATION_JSON);
HttpEntity<String> entity = new HttpEntity<>(payloadJson, headers);
restTemplate.postForEntity("http://localhost:8080/webhooks/alm/jira/some-project", entity, String.class);
//assert message published in Kafka
ConsumerRecord<String, String> received = consumerRecords.poll(10, SECONDS);
assertThat(received).isNotNull();
assertThat(received.key()).isEqualTo("JRASERVER-2000");
container.stop();
}
Am I wiring Kafka adapter in a wrong way? or am I listening the to published messages in a wrong way?
Update:
The test class containing this method
@Slf4j
@SpringJUnitConfig
@SpringBootTest(webEnvironment = DEFINED_PORT)
@EmbeddedKafka(
bootstrapServersProperty = "spring.kafka.bootstrap-servers",
topics = {"alm_jira"}
)
@TestPropertySource(properties = {
"spring.main.banner-mode=OFF",
"spring.data.mongodb.database=cloud_stream_int",
"spring.data.mongodb.port=27017",
"spring.data.mongodb.host=localhost",
"spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
"spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer"
})
public class WebhooksConsumerApplicationTests {
final RestTemplate restTemplate = new RestTemplateBuilder().rootUri(
"http://localhost:8080/webhooks").build();
@Autowired
MongoTemplate mongoTemplate;
@Autowired
EmbeddedKafkaBroker embeddedKafkaBroker;
...
Upvotes: 1
Views: 1000
Reputation: 2103
The problem was in topic expression I was expecting value of two headers joined with _, but the expression did not have an underscore hence producer and consumer were listening to different topics.
Changing the topic expression to following fixed it
handler.setTopicExpression(
new SpelExpressionParser()
.parseExpression(
"headers['" + upstreamType + "'] + '_' + headers['" + upstreamTypeInstance + "']"));
and using KafkaTestUtils as suggested by @Artem made the test code compact
@Test
public void testPushNotificationIsSavedToMongo(
@Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {
Map<String, Object> configs = new HashMap<>(
KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
Consumer<String, Object> consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(),
new JsonDeserializer<>()).createConsumer();
consumer.subscribe(singleton("alm_jira"));
//Post Payload to Gateway
String payloadJson = new String(Files.readAllBytes(Paths.get(jiraWebhookPayload.getFile().getAbsolutePath())));
HttpHeaders headers = new HttpHeaders();
headers.setContentType(APPLICATION_JSON);
HttpEntity<String> entity = new HttpEntity<>(payloadJson, headers);
restTemplate.postForEntity("http://localhost:8080/webhooks/alm/jira/some-project", entity, String.class);
//assert message published in Kafka
ConsumerRecord<String, Object> received = KafkaTestUtils.getSingleRecord(consumer, "alm_jira");
assertThat(received).isNotNull();
assertThat(received.key()).isEqualTo("JRASERVER-2000");
assertThat(((Map) received.value()).get("id")).isEqualTo("13953");
consumer.close();
}
Upvotes: 0
Reputation: 121272
See this callback in the KafkaTestUtils
instead of the whole listener container for testing:
/**
* Get a single record for the group from the topic/partition. Optionally, seeking to the current last record.
* @param brokerAddresses the broker address(es).
* @param group the group.
* @param topic the topic.
* @param partition the partition.
* @param seekToLast true to fetch an existing last record, if present.
* @param commit commit offset after polling or not.
* @param timeout the timeout.
* @return the record or null if no record received.
* @since 2.3
*/
@Nullable
@SuppressWarnings({ "rawtypes", "unchecked" })
public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition,
boolean seekToLast, boolean commit, long timeout) {
Also see how to configure an @EmbeddedKafka
to deal with Spring Boot auto-configuration properties: https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-embedded-kafka
Upvotes: 2