edi
edi

Reputation: 233

Spring integration kafka outbound adapter error handle

How can I handle message failed to produce to kafka in spring integration?

I didn't see 'error-channel' is an option at 'int-kafka:outbound-channel-adapter', wondering where should I add the error-channel information so that my ErrorHandler can get "failed to produce to kafka" type of error. (including all type of failure, configuration, network and etc)

Also, inputToKafka is queued channel, where should I add error-channel to handle potential queue full error?

<int:gateway id="myGateway" 
            service-interface="someGateway" 
            default-request-channel="transformChannel" 
            error-channel="errorChannel"  
            default-reply-channel="replyChannel" 
            async-executor="MyThreadPoolTaskExecutor"/>

<int:transformer id="transformer" input-channel="transformChannel" method="transform" output-channel="inputToKafka">
    <bean class="Transformer"/>  
</int:transformer>

<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
                                    kafka-template="template"
                                    auto-startup="false"
                                    channel="inputToKafka"
                                    topic="foo"
                                    message-key-expression="'bar'"
                                    partition-id-expression="2">
    <int:poller fixed-delay="200" time-unit="MILLISECONDS" receive-timeout="0"
                    task-executor="kafkaExecutor"/>
</int-kafka:outbound-channel-adapter>

<bean id="kafkaExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
    ....
</bean>

<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
    <constructor-arg>
        <bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
            <constructor-arg>
                <map>
                    <entry key="bootstrap.servers" value="localhost:9092" />
                    ...
                </map>
            </constructor-arg>
        </bean>
    </constructor-arg>
</bean>

<int:service-activator input-channel='errorChannel' output-channel="replyChannel" method='process'>
    <bean class="ErrorHandler"/>
</int:service-activator>

Edit

<property name="producerListener">
    <bean id="producerListener" class="org.springframework.kafka.support.ProducerListenerAdapter"/>
</property>

Upvotes: 1

Views: 924

Answers (1)

Gary Russell
Gary Russell

Reputation: 174799

Any errors on the downstream flow will be sent to the error-channel on your gateway. However, since kafka is async by default, you won't get any errors that way. You can set sync=true on the outbound adapter and then an exception will be thrown if there's a problem.

Bear in mind, though, it will be much slower.

You can get async exceptions by adding a ProducerListener to your KafkaTemplate.

Upvotes: 3

Related Questions