Anadi Misra
Anadi Misra

Reputation: 2103

How to recieve kafka message when testing kafka outbound channel adapter

I'm sending webhook push from a third party app as a message to kafka adpater for further processing by a downstream microservice. The integration flow is as follows

@Slf4j
@Configuration
@AllArgsConstructor(onConstructor = @__(@Autowired))
public class WebhookConsumerFlowConfiguration extends MessageHeadersConfiguration {

    @Autowired
    private final KafkaProducerMessageHandler<String, String> kafkaHandler;

    @Bean
    public SubscribableChannel httpInAdapterPubSubChannel() {
        return MessageChannels.publishSubscribe("httpInAdapterPubSubChannel").get();
    }

    @Bean
    public IntegrationFlow webhookConsumerFlow(@Autowired MessagingGatewaySupport webhookNotificationHandler) {
        return IntegrationFlows.from(webhookNotificationHandler)
                .channel(httpInAdapterPubSubChannel())
                .handle(message -> kafkaHandler.handleMessage(message)).get();
    }
}

and then in a separate configuration class for Kafka

@Slf4j
@Configuration
public class KafkaHandlerConfiguration extends MessageHeadersConfiguration {

    @Value("${spring.kafka.bootstrap-servers:localhost:9092}")
    private String bootstrapServers;

    @Value("${spring.kafka.producer.key-serializer}")
    private String kafkaKeySerializer;

    @Value("${spring.kafka.producer.value-serializer}")
    private String kafkaValueSerializer;

    @Bean
    public KafkaProducerMessageHandler<String, String> kafkaProducerMessageHandler() {
        KafkaProducerMessageHandler<String, String> handler = new KafkaProducerMessageHandler<>(kafkaTemplate());
        handler.setHeaderMapper(mapper());
        handler.setLoggingEnabled(TRUE);
        handler.setTopicExpression(
                new SpelExpressionParser()
                        .parseExpression("headers['" + upstreamType + "'] + headers['" + upstreamTypeInstance + "']"));
        handler.setMessageKeyExpression(new SpelExpressionParser().parseExpression("payload['key']"));
        handler.setSendSuccessChannel(kafkaPublishSuccessChannel());
        handler.setSendFailureChannel(kafkaFailuresChannel());
        return handler;
    }

    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {

        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, kafkaKeySerializer);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, kafkaValueSerializer);
        return new KafkaTemplate<>(new DefaultKafkaProducerFactory<>(props));
    }

    private DefaultKafkaHeaderMapper mapper() {
        return new DefaultKafkaHeaderMapper();
    }
}

I had written a failing test for this flow which, well, is still failing :-)

@Test
public void testPushNotificationIsSavedToMongo(
        @Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {

    consumerRecords = new LinkedBlockingQueue<>();
    ContainerProperties containerProperties = new ContainerProperties("alm_jira");
    Map<String, Object> consumerProperties = KafkaTestUtils.consumerProps(
            "test", "false", embeddedKafkaBroker);
    consumerProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    ConsumerFactory<String, String> consumer = new DefaultKafkaConsumerFactory<>(consumerProperties);
    container = new KafkaMessageListenerContainer<>(consumer, containerProperties);
    container.setupMessageListener((MessageListener<String, String>) record -> {
        log.debug("Listened message='{}'", record.toString());
        consumerRecords.add(record);
    });
    container.start();
    ContainerTestUtils.waitForAssignment(container, embeddedKafkaBroker.getPartitionsPerTopic());

    //Post Payload to Gateway
    String payloadJson = new String(Files.readAllBytes(Paths.get(jiraWebhookPayload.getFile().getAbsolutePath())));
    HttpHeaders headers = new HttpHeaders();
    headers.setContentType(APPLICATION_JSON);
    HttpEntity<String> entity = new HttpEntity<>(payloadJson, headers);
    restTemplate.postForEntity("http://localhost:8080/webhooks/alm/jira/some-project", entity, String.class);

    //assert message published in Kafka
    ConsumerRecord<String, String> received = consumerRecords.poll(10, SECONDS);

    assertThat(received).isNotNull();
    assertThat(received.key()).isEqualTo("JRASERVER-2000");

    container.stop();
}

Am I wiring Kafka adapter in a wrong way? or am I listening the to published messages in a wrong way?

Update:

The test class containing this method

@Slf4j
@SpringJUnitConfig
@SpringBootTest(webEnvironment = DEFINED_PORT)
@EmbeddedKafka(
        bootstrapServersProperty = "spring.kafka.bootstrap-servers",
        topics = {"alm_jira"}
)
@TestPropertySource(properties = {
        "spring.main.banner-mode=OFF",
        "spring.data.mongodb.database=cloud_stream_int",
        "spring.data.mongodb.port=27017",
        "spring.data.mongodb.host=localhost",
        "spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer",
        "spring.kafka.producer.value-serializer=org.springframework.kafka.support.serializer.JsonSerializer"
})
public class WebhooksConsumerApplicationTests {

    final RestTemplate restTemplate = new RestTemplateBuilder().rootUri(
            "http://localhost:8080/webhooks").build();

    @Autowired
    MongoTemplate mongoTemplate;

    @Autowired
    EmbeddedKafkaBroker embeddedKafkaBroker;
    ...

Upvotes: 1

Views: 1000

Answers (2)

Anadi Misra
Anadi Misra

Reputation: 2103

The problem was in topic expression I was expecting value of two headers joined with _, but the expression did not have an underscore hence producer and consumer were listening to different topics.

Changing the topic expression to following fixed it

handler.setTopicExpression(
                new SpelExpressionParser()
                        .parseExpression(
                                "headers['" + upstreamType + "'] + '_' + headers['" + upstreamTypeInstance + "']"));

and using KafkaTestUtils as suggested by @Artem made the test code compact

@Test
public void testPushNotificationIsSavedToMongo(
        @Value("classpath:webhooks/jira/test-payload.json") Resource jiraWebhookPayload) throws IOException, InterruptedException {

    Map<String, Object> configs = new HashMap<>(
            KafkaTestUtils.consumerProps("consumer", "false", embeddedKafkaBroker));
    configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    Consumer<String, Object> consumer = new DefaultKafkaConsumerFactory<>(configs, new StringDeserializer(),
                                                                          new JsonDeserializer<>()).createConsumer();
    consumer.subscribe(singleton("alm_jira"));

    //Post Payload to Gateway
    String payloadJson = new String(Files.readAllBytes(Paths.get(jiraWebhookPayload.getFile().getAbsolutePath())));
    HttpHeaders headers = new HttpHeaders();
    headers.setContentType(APPLICATION_JSON);
    HttpEntity<String> entity = new HttpEntity<>(payloadJson, headers);
    restTemplate.postForEntity("http://localhost:8080/webhooks/alm/jira/some-project", entity, String.class);

    //assert message published in Kafka
    ConsumerRecord<String, Object> received = KafkaTestUtils.getSingleRecord(consumer, "alm_jira");

    assertThat(received).isNotNull();
    assertThat(received.key()).isEqualTo("JRASERVER-2000");
    assertThat(((Map) received.value()).get("id")).isEqualTo("13953");
    consumer.close();
}

Upvotes: 0

Artem Bilan
Artem Bilan

Reputation: 121272

See this callback in the KafkaTestUtils instead of the whole listener container for testing:

/**
 * Get a single record for the group from the topic/partition. Optionally, seeking to the current last record.
 * @param brokerAddresses the broker address(es).
 * @param group the group.
 * @param topic the topic.
 * @param partition the partition.
 * @param seekToLast true to fetch an existing last record, if present.
 * @param commit commit offset after polling or not.
 * @param timeout the timeout.
 * @return the record or null if no record received.
 * @since 2.3
 */
@Nullable
@SuppressWarnings({ "rawtypes", "unchecked" })
public static ConsumerRecord<?, ?> getOneRecord(String brokerAddresses, String group, String topic, int partition,
        boolean seekToLast, boolean commit, long timeout) {

Also see how to configure an @EmbeddedKafka to deal with Spring Boot auto-configuration properties: https://docs.spring.io/spring-boot/docs/current/reference/html/spring-boot-features.html#boot-features-embedded-kafka

Upvotes: 2

Related Questions