Reputation: 233
How can I handle message failed to produce to kafka in spring integration?
I didn't see 'error-channel' is an option at 'int-kafka:outbound-channel-adapter', wondering where should I add the error-channel information so that my ErrorHandler can get "failed to produce to kafka" type of error. (including all type of failure, configuration, network and etc)
Also, inputToKafka is queued channel, where should I add error-channel to handle potential queue full error?
<int:gateway id="myGateway"
service-interface="someGateway"
default-request-channel="transformChannel"
error-channel="errorChannel"
default-reply-channel="replyChannel"
async-executor="MyThreadPoolTaskExecutor"/>
<int:transformer id="transformer" input-channel="transformChannel" method="transform" output-channel="inputToKafka">
<bean class="Transformer"/>
</int:transformer>
<int-kafka:outbound-channel-adapter id="kafkaOutboundChannelAdapter"
kafka-template="template"
auto-startup="false"
channel="inputToKafka"
topic="foo"
message-key-expression="'bar'"
partition-id-expression="2">
<int:poller fixed-delay="200" time-unit="MILLISECONDS" receive-timeout="0"
task-executor="kafkaExecutor"/>
</int-kafka:outbound-channel-adapter>
<bean id="kafkaExecutor" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
....
</bean>
<bean id="template" class="org.springframework.kafka.core.KafkaTemplate">
<constructor-arg>
<bean class="org.springframework.kafka.core.DefaultKafkaProducerFactory">
<constructor-arg>
<map>
<entry key="bootstrap.servers" value="localhost:9092" />
...
</map>
</constructor-arg>
</bean>
</constructor-arg>
</bean>
<int:service-activator input-channel='errorChannel' output-channel="replyChannel" method='process'>
<bean class="ErrorHandler"/>
</int:service-activator>
Edit
<property name="producerListener">
<bean id="producerListener" class="org.springframework.kafka.support.ProducerListenerAdapter"/>
</property>
Upvotes: 1
Views: 924
Reputation: 174799
Any errors on the downstream flow will be sent to the error-channel
on your gateway. However, since kafka is async by default, you won't get any errors that way. You can set sync=true
on the outbound adapter and then an exception will be thrown if there's a problem.
Bear in mind, though, it will be much slower.
You can get async exceptions by adding a ProducerListener
to your KafkaTemplate
.
Upvotes: 3