magiccrafter
magiccrafter

Reputation: 5482

Spring Integration DSL KafkaProducerContext configuration

I am trying to adapt the following example: https://github.com/joshlong/spring-and-kafka

with latest stable versions of the following libraries:

org.apache.kafka > kafka_2.10 > 0.8.2.2
org.springframework.integration > spring-integration-kafka > 1.2.1.RELEASE
org.springframework.integration > spring-integration-java-dsl > 1.1.0.RELEASE

The integration dsl library seems to have gone through a refactoring probably driven by the introduction of the new KafkaProducer.

Here is the code of my Producer configuration:

@Bean(name = OUTBOUND_ID)
IntegrationFlow producer() {
    log.info("starting producer flow..");

    return flowDefinition -> {
        ProducerMetadata<String, String> getProducerMetadata = new ProducerMetadata<>(this.kafkaConfig.getTopic(),
                    String.class, String.class, new StringSerializer(), new StringSerializer());


        KafkaProducerMessageHandler kafkaProducerMessageHandler = Kafka.outboundChannelAdapter(props ->
                props.put("timeout.ms", "35000"))
                .messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
                .addProducer(getProducerMetadata, this.kafkaConfig.getBrokerAddress())
                .get();

        flowDefinition
                .handle(kafkaProducerMessageHandler);
    };
}

And the code for message generation:

@Bean
@DependsOn(OUTBOUND_ID)
CommandLineRunner kickOff(@Qualifier(OUTBOUND_ID + ".input") MessageChannel in) {
    return args -> {              
        for (int i = 0; i < 1000; i++) {
            in.send(MessageBuilder.withPayload("#" + i).setHeader(KafkaHeaders.TOPIC, this.kafkaConfig.getTopic()).build());
                log.info("sending message #" + i);
        }
    };
}

Thats the exception I get:

Caused by: org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler#0]; nested exception is java.lang.NullPointerException
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:84)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:101)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:97)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:287)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:245)
at jc.DemoApplication$ProducerConfiguration.lambda$kickOff$0(DemoApplication.java:104)
at org.springframework.boot.SpringApplication.runCommandLineRunners(SpringApplication.java:673)
... 10 more
Caused by: java.lang.NullPointerException
at org.springframework.integration.kafka.support.KafkaProducerContext.getTopicConfiguration(KafkaProducerContext.java:67)
at org.springframework.integration.kafka.support.KafkaProducerContext.send(KafkaProducerContext.java:201)
at org.springframework.integration.kafka.outbound.KafkaProducerMessageHandler.handleMessageInternal(KafkaProducerMessageHandler.java:88)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
... 18 more

UPDATE:
The full working source can be found in my fork:
https://github.com/magiccrafter/spring-and-kafka

Upvotes: 2

Views: 732

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121542

Sorry for delay.

Your problem is around the early IntegrationComponentSpec instantiation:

KafkaProducerMessageHandler kafkaProducerMessageHandler = Kafka.outboundChannelAdapter(props ->
          props.put("timeout.ms", "35000"))
          .messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
          .addProducer(getProducerMetadata, this.kafkaConfig.getBrokerAddress())
          .get();

you should not call .get() yourself. The KafkaProducerMessageHandlerSpec is ComponentsRegistration and only SI Java DSL can resolve it correctly. The code there looks like:

public Collection<Object> getComponentsToRegister() {
    this.kafkaProducerContext.setProducerConfigurations(this.producerConfigurations);
    return Collections.<Object>singleton(this.kafkaProducerContext);
}

Since this code isn't invoked the this.producerConfigurations isn't populated to the this.kafkaProducerContext. Although the last one must be registered as a bean anyway.

So, to fix your problem you should deal only with the IntegrationComponentSpec in DSL definition.

Just obtain the KafkaProducerMessageHandlerSpec and use it for the .handle() below. Not sure if there is a reason to extract this object if we can use Kafka.outboundChannelAdapter() directly from the .handle().

Upvotes: 1

Related Questions