KeepItSimple
KeepItSimple

Reputation: 23

how to use recovery fallback and error handler for message driven channel adapter

Can anyone please provide an example of how recovery fallback and error handler is used on int-kafka:message-driven-channel-adapter ? I am trying to use ConcurrentMessageListenerContainer while consuming messages for kafka where I need to commit offset manually where I am doing,

<int-kafka:message-driven-channel-adapter
            id="adapter-event"
            listener-container="testEventListenerContainer"
            auto-startup="true"
            phase="10"
            send-timeout="5000"
            error-message-strategy="ems"
            channel="kafka-input-channel-test-Event"
            error-channel="errorChannel"
    />

    <bean id="testEventListenerContainer" class="org.springframework.kafka.listener.ConcurrentMessageListenerContainer">
        <constructor-arg ref = "consumerFactoryTestEvent"/>
        <property name="concurrency" value="3"/>
        <constructor-arg>
            <bean class="org.springframework.kafka.listener.ContainerProperties">
                <constructor-arg name="topics" value="xyz"/>
                <property name="ackMode" value="MANUAL_IMMEDIATE"/>
            </bean>
        </constructor-arg>
    </bean>

I am also suppose to make listener acknowledge after every message is processed which I am figuring out how it can be done for user defined object instead of spring integration object. Can you please help?

Upvotes: 0

Views: 123

Answers (1)

Artem Bilan
Artem Bilan

Reputation: 121542

See docs for recovery-callback:

    <xsd:attribute name="recovery-callback" type="xsd:string">
        <xsd:annotation>
            <xsd:documentation>
                Used in conjunction with a 'retry-template'; in most cases this will be
                an 'ErrorMessageSendingRecoverer'. Omitting this element will cause an
                exception to be thrown to the listener container after retries are exhausted.
            </xsd:documentation>
            <xsd:appinfo>
                <tool:annotation kind="ref">
                    <tool:expected-type type="org.springframework.retry.RecoveryCallback" />
                </tool:annotation>
            </xsd:appinfo>
        </xsd:annotation>
    </xsd:attribute>

So, no retry-template - no recovery. Although you might talk about some other option which I fail to find on that channel adapter.

The error-channel is like a try..catch where the failed message is going to be sent for further consideration. See docs about error handling in Spring Integration: https://docs.spring.io/spring-integration/docs/current/reference/html/error-handling.html#error-handling

To ack messages manually you need to get a KafkaHeaders.ACKNOWLEDGMENT of type Acknowledgment from message headers you are processing downstream.

Upvotes: 0

Related Questions