Reputation: 23
Can anyone please provide an example of how recovery fallback and error handler is used on int-kafka:message-driven-channel-adapter ? I am trying to use ConcurrentMessageListenerContainer while consuming messages for kafka where I need to commit offset manually where I am doing,
<int-kafka:message-driven-channel-adapter
id="adapter-event"
listener-container="testEventListenerContainer"
auto-startup="true"
phase="10"
send-timeout="5000"
error-message-strategy="ems"
channel="kafka-input-channel-test-Event"
error-channel="errorChannel"
/>
<bean id="testEventListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer">
<constructor-arg ref = "consumerFactoryTestEvent"/>
<property name="concurrency" value="3"/>
<constructor-arg>
<bean class="org.springframework.kafka.listener.ContainerProperties">
<constructor-arg name="topics" value="xyz"/>
<property name="ackMode" value="MANUAL_IMMEDIATE"/>
</bean>
</constructor-arg>
</bean>
I am also suppose to make listener acknowledge after every message is processed which I am figuring out how it can be done for user defined object instead of spring integration object. Can you please help?
Upvotes: 0
Views: 123
Reputation: 121542
See docs for recovery-callback
:
<xsd:attribute name="recovery-callback" type="xsd:string">
<xsd:annotation>
<xsd:documentation>
Used in conjunction with a 'retry-template'; in most cases this will be
an 'ErrorMessageSendingRecoverer'. Omitting this element will cause an
exception to be thrown to the listener container after retries are exhausted.
</xsd:documentation>
<xsd:appinfo>
<tool:annotation kind="ref">
<tool:expected-type type="org.springframework.retry.RecoveryCallback" />
</tool:annotation>
</xsd:appinfo>
</xsd:annotation>
</xsd:attribute>
So, no retry-template
- no recovery. Although you might talk about some other option which I fail to find on that channel adapter.
The error-channel
is like a try..catch
where the failed message is going to be sent for further consideration. See docs about error handling in Spring Integration: https://docs.spring.io/spring-integration/docs/current/reference/html/error-handling.html#error-handling
To ack messages manually you need to get a KafkaHeaders.ACKNOWLEDGMENT
of type Acknowledgment
from message headers you are processing downstream.
Upvotes: 0