Reputation: 721
I am building an implementation of Sprint Integration with two PollableChannel
s:
Messages are polled from the regular channel and processed. If there is an error during processing (e.g., an external service is unavailable), the message is sent into the error channel. From the error channel it is re-queued onto the regular channel, and the cycle continues until the message is successfully processed.
The idea is to poll the error channel infrequently, to give the processor some time to (hopefully) recover.
I have simulated this workflow in the following test:
package com.stackoverflow.questions.sipoller;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Import;
import org.springframework.integration.annotation.MessageEndpoint;
import org.springframework.integration.annotation.Poller;
import org.springframework.integration.annotation.Router;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.MessageChannels;
import org.springframework.messaging.Message;
import org.springframework.messaging.PollableChannel;
import org.springframework.messaging.support.MessageBuilder;
import static org.awaitility.Awaitility.await;
import static org.awaitility.Durations.FIVE_MINUTES;
import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
@SpringBootTest
class SiPollerApplicationTests {
private final static Logger LOG = LoggerFactory.getLogger(SiPollerApplicationTests.class);
private final static String QUEUE_CHANNEL_REGULAR = "queueChannelRegular";
private final static String QUEUE_CHANNEL_ERROR = "queueChannelError";
private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds
private final static AtomicInteger NUMBER_OF_ATTEMPTS = new AtomicInteger();
private final static AtomicInteger NUMBER_OF_SUCCESSES = new AtomicInteger();
private final static List<Instant> ATTEMPT_INSTANTS = Collections.synchronizedList(new ArrayList<>());
@Autowired
@Qualifier(QUEUE_CHANNEL_REGULAR)
private PollableChannel channelRegular;
@Test
void testTimingOfMessageProcessing() {
channelRegular.send(MessageBuilder.withPayload("Test message").build());
await()
.atMost(FIVE_MINUTES)
.with()
.pollInterval(ONE_HUNDRED_MILLISECONDS)
.until(
() -> {
if (NUMBER_OF_SUCCESSES.intValue() == 1) {
reportGaps();
return true;
}
return false;
}
);
}
private void reportGaps() {
List<Long> gaps = IntStream
.range(1, ATTEMPT_INSTANTS.size())
.mapToObj(
i -> Duration
.between(
ATTEMPT_INSTANTS.get(i - 1),
ATTEMPT_INSTANTS.get(i)
)
.toMillis()
)
.collect(Collectors.toList());
LOG.info("Gaps between attempts (in ms): {}", gaps);
}
@Configuration
@EnableIntegration
@Import(SiPollerApplicationTestEndpoint.class)
static class SiPollerApplicationTestConfig {
@Bean(name = QUEUE_CHANNEL_REGULAR)
public PollableChannel queueChannelRegular() {
return MessageChannels.queue(QUEUE_CHANNEL_REGULAR).get();
}
@Bean(name = QUEUE_CHANNEL_ERROR)
public PollableChannel queueChannelError() {
return MessageChannels.queue(QUEUE_CHANNEL_ERROR).get();
}
@Router(
inputChannel = QUEUE_CHANNEL_ERROR,
poller = @Poller(fixedRate = POLLER_PERIOD_ERROR)
)
public String retryProcessing() {
return QUEUE_CHANNEL_REGULAR;
}
}
@MessageEndpoint
static class SiPollerApplicationTestEndpoint {
@Autowired
@Qualifier(QUEUE_CHANNEL_ERROR)
private PollableChannel channelError;
@ServiceActivator(
inputChannel = QUEUE_CHANNEL_REGULAR,
poller = @Poller(fixedRate = POLLER_PERIOD_REGULAR)
)
public void handleMessage(Message<String> message) {
// Count and time attempts
int numberOfAttempts = NUMBER_OF_ATTEMPTS.getAndIncrement();
ATTEMPT_INSTANTS.add(Instant.now());
// First few times - refuse to process message and bounce it into
// error channel
if (numberOfAttempts < 5) {
channelError.send(message);
return;
}
// After that - process message
NUMBER_OF_SUCCESSES.getAndIncrement();
}
}
}
The pom.xml
dependencies are:
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-core</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
<exclusions>
<exclusion>
<groupId>org.junit.vintage</groupId>
<artifactId>junit-vintage-engine</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.awaitility</groupId>
<artifactId>awaitility</artifactId>
<version>4.0.2</version>
<scope>test</scope>
</dependency>
</dependencies>
Note the configuration for Poller
s:
private final static String POLLER_PERIOD_REGULAR = "500"; // 0.5 second
private final static String POLLER_PERIOD_ERROR = "3000"; // 3 seconds
The regular channel is supposed to be polled once in half a second, and the error channel — once in three seconds.
The test simulates outages during message processing: the first five attempts to process the message are rejected. Also, the test records the Instant
of every processing attempt. In the end, on my machine, the test outputs:
Gaps between attempts (in ms): [1, 0, 0, 0, 0]
In other words, the message is re-tried almost immediately after each failure.
It seems to me that I fundamentally misunderstand how Poller
s work in Spring Integration. So my questions are:
Upvotes: 0
Views: 3674
Reputation: 174554
There are two settings that can affect this behavior.
QueueChannel
pollers will drain the queue by default; setMaxMessagesPerPoll(1)
to only receive one message each poll.
Also, by default, the QueueChannel
default timeout is 1 second (1000ms).
So the first poll may be sooner than you think; set it to 0
to immediately exit if there are no messages present in the queue.
Upvotes: 2