CBlew
CBlew

Reputation: 721

How do @Poller-s work in Spring Integration?

I am building an implementation of Sprint Integration with two PollableChannels:

  1. Regular channel
  2. Error channel

Messages are polled from the regular channel and processed. If there is an error during processing (e.g., an external service is unavailable), the message is sent into the error channel. From the error channel it is re-queued onto the regular channel, and the cycle continues until the message is successfully processed.

The idea is to poll the error channel infrequently, to give the processor some time to (hopefully) recover.

I have simulated this workflow in the following test:

package com.stackoverflow.questions.sipoller;

import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.MessageBuilder;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.FIVE_MINUTES;
import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;

@SpringBootTest
class SiPollerApplicationTests {

    private final static Logger LOG = LoggerFactory.getLogger(SiPollerApplicationTests.class);

    private final static String QUEUE_CHANNEL_REGULAR = "queueChannelRegular";
    private final static String QUEUE_CHANNEL_ERROR = "queueChannelError";

    private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
    private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds

    private final static AtomicInteger NUMBER_OF_ATTEMPTS = new AtomicInteger();
    private final static AtomicInteger NUMBER_OF_SUCCESSES = new AtomicInteger();
    private final static List<Instant> ATTEMPT_INSTANTS = Collections.synchronizedList(new ArrayList<>());

    @Autowired
    @Qualifier(QUEUE_CHANNEL_REGULAR)
    private PollableChannel channelRegular;

    @Test
    void testTimingOfMessageProcessing() {
        channelRegular.send(MessageBuilder.withPayload("Test message").build());

        await()
                .atMost(FIVE_MINUTES)
                .with()
                .pollInterval(ONE_HUNDRED_MILLISECONDS)
                .until(
                        () -> {
                            if (NUMBER_OF_SUCCESSES.intValue() == 1) {
                                reportGaps();
                                return true;
                            }
                            return false;
                        }
                );
    }

    private void reportGaps() {
        List<Long> gaps = IntStream
                .range(1, ATTEMPT_INSTANTS.size())
                .mapToObj(
                        i -> Duration
                                .between(
                                        ATTEMPT_INSTANTS.get(i - 1),
                                        ATTEMPT_INSTANTS.get(i)
                                )
                                .toMillis()
                )
                .collect(Collectors.toList());
        LOG.info("Gaps between attempts (in ms): {}", gaps);
    }

    @Configuration
    @EnableIntegration
    @Import(SiPollerApplicationTestEndpoint.class)
    static class SiPollerApplicationTestConfig {

        @Bean(name = QUEUE_CHANNEL_REGULAR)
        public PollableChannel queueChannelRegular() {
            return MessageChannels.queue(QUEUE_CHANNEL_REGULAR).get();
        }

        @Bean(name = QUEUE_CHANNEL_ERROR)
        public PollableChannel queueChannelError() {
            return MessageChannels.queue(QUEUE_CHANNEL_ERROR).get();
        }

        @Router(
                inputChannel = QUEUE_CHANNEL_ERROR,
                poller = @Poller(fixedRate = POLLER_PERIOD_ERROR)
        )
        public String retryProcessing() {
            return QUEUE_CHANNEL_REGULAR;
        }
    }

    @MessageEndpoint
    static class SiPollerApplicationTestEndpoint {

        @Autowired
        @Qualifier(QUEUE_CHANNEL_ERROR)
        private PollableChannel channelError;

        @ServiceActivator(
                inputChannel = QUEUE_CHANNEL_REGULAR,
                poller = @Poller(fixedRate = POLLER_PERIOD_REGULAR)
        )
        public void handleMessage(Message<String> message) {
            // Count and time attempts
            int numberOfAttempts = NUMBER_OF_ATTEMPTS.getAndIncrement();
            ATTEMPT_INSTANTS.add(Instant.now());

            // First few times - refuse to process message and bounce it into
            // error channel
            if (numberOfAttempts < 5) {
                channelError.send(message);
                return;
            }

            // After that - process message
            NUMBER_OF_SUCCESSES.getAndIncrement();
        }
    }

}

The pom.xml dependencies are:

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-core</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
            <exclusions>
                <exclusion>
                    <groupId>org.junit.vintage</groupId>
                    <artifactId>junit-vintage-engine</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.awaitility</groupId>
            <artifactId>awaitility</artifactId>
            <version>4.0.2</version>
            <scope>test</scope>
        </dependency>

    </dependencies>

Note the configuration for Pollers:

    private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
    private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds

The regular channel is supposed to be polled once in half a second, and the error channel — once in three seconds.

The test simulates outages during message processing: the first five attempts to process the message are rejected. Also, the test records the Instant of every processing attempt. In the end, on my machine, the test outputs:

Gaps between attempts (in ms): [1, 0, 0, 0, 0]

In other words, the message is re-tried almost immediately after each failure.

It seems to me that I fundamentally misunderstand how Pollers work in Spring Integration. So my questions are:

  1. Why is there such a dissonance between the poller configuration and the actual frequency of polling.
  2. Does Spring Integration provide a way to implement the pattern I have described?

Upvotes: 0

Views: 3674

Answers (1)

Gary Russell
Gary Russell

Reputation: 174554

There are two settings that can affect this behavior.

QueueChannel pollers will drain the queue by default; setMaxMessagesPerPoll(1) to only receive one message each poll.

Also, by default, the QueueChannel default timeout is 1 second (1000ms).

So the first poll may be sooner than you think; set it to 0 to immediately exit if there are no messages present in the queue.

Upvotes: 2

Related Questions