Reputation: 19906
I have a Spring Boot (2.6.6) @Scheduled
method, which fetches data from an external service every 10 minutes.
Under normal condition, I am happy with the interval. However I'd like to shorten it on a one-time basis if the data fetch fails (the service is temporarily not available) and force the method to be scheduled earlier.
Something like this:
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.MINUTES)
public void fetchData() {
try {
this.data = myServiceConnector.fetchData();
}
catch (MyServiceNotAvailableException ex) {
// temporarily set the scheduling delay so it will happen again in 5 seconds - HOWTO?
}
}
N.B. If it is relevant for the problem, the act of data fetching is a reactive code, so in fact it looks like
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.MINUTES)
public void fetchData() {
myServiceConnector.fetchData()
.doOnSuccess(fetchedData -> this.data = fetchedData)
.doOnError(throwable ->
// temporarily set the scheduling delay so it will happen again in 5 seconds - HOWTO?
)
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
Upvotes: 2
Views: 3368
Reputation: 6985
I'll leave another option here for completeness sake, but OP's solution looks the cleanest for his reactive case. Spring's Trigger is used to calculate the next execution time. In this case it has to know if the task was successful or not. I did not find any way to notify the trigger through the API, my best idea was to use instance variable.
public class DynamicTrigger implements Trigger {
private boolean isTaskSuccessFul = true;
@Override
public Date nextExecutionTime(TriggerContext triggerContext) {
Date last = triggerContext.lastCompletionTime();
Instant lastCompleted = last != null ? last.toInstant() : Instant.now();
int seconds = this.isTaskSuccessFul ? 10 : 3;
return Date.from(lastCompleted.plusSeconds(seconds));
}
public void setTaskSuccessFul(boolean taskSuccessFul) {
this.isTaskSuccessFul = taskSuccessFul;
}
}
The task, with extremely simple simulation of every second task failing:
public class ThrowingRunnable implements Runnable {
private int count = 1;
@Override
public void run() {
int current = this.count;
if (this.count++ % 2 == 0) {
System.out.println("Task failed - " + LocalDateTime.now() + " - " + current);
throw new RuntimeException();
}
System.out.println("Task success - " + LocalDateTime.now() + " - " + current);
}
}
Another runnable, to wrap the task itself, handle the exception and notify trigger about success or failure:
public class WrappingRunnable implements Runnable {
private final Runnable wrappedRunnable;
private final DynamicTrigger trigger;
public WrappingRunnable(Runnable wrappedRunnable, DynamicTrigger trigger) {
this.wrappedRunnable = wrappedRunnable;
this.trigger = trigger;
}
@Override
public void run() {
try {
this.wrappedRunnable.run();
this.trigger.setTaskSuccessFul(true);
} catch (RuntimeException exc) {
this.trigger.setTaskSuccessFul(false);
}
}
}
Another possibility is to notify the trigger for failure through the error handler, but that would require transferring it with a custom exception and casting, i don't think it's worth it.
And scheduling the trigger manually:
@Configuration
@EnableScheduling
public class SchedulerConfig implements SchedulingConfigurer {
@Override
public void configureTasks(ScheduledTaskRegistrar taskRegistrar) {
ConcurrentTaskScheduler taskScheduler = new ConcurrentTaskScheduler(Executors.newSingleThreadScheduledExecutor());
taskScheduler.setErrorHandler(Throwable::printStackTrace);
taskRegistrar.setTaskScheduler(taskScheduler);
DynamicTrigger trigger = new DynamicTrigger();
WrappingRunnable runnable = new WrappingRunnable(new ThrowingRunnable(), trigger);
taskRegistrar.addTriggerTask(runnable, trigger);
}
}
Edit: Wrapping reactive code into Runnable
can be like this:
public class MonoRunnable implements Runnable {
private final MyService myService;
private final DynamicTrigger trigger;
public MonoRunnable(MyService myService, DynamicTrigger trigger) {
this.myService = myService;
this.trigger = trigger;
}
@Override
public void run() {
this.myService.fetchData().doOnSuccess(data -> {
//do stuff with data
this.trigger.setTaskSuccessFul(true);
}).doOnError(throwable -> {
//do stuff with error
this.trigger.setTaskSuccessFul(false);
}).subscribe();
}
}
I skipped subscribeOn()
, cause i am not sure of the effects the Worker
will have, considering the task will be scheduled with spring scheduler. On success and on error functions will take care of notifying the Trigger
, and WrappingRunnable
won't be needed, and we can directly register the mono task:
MonoRunnable runnable = new MonoRunnable(this.myService, trigger);
taskRegistrar.addTriggerTask(runnable, trigger);
Upvotes: 1
Reputation: 19906
I have come with the following solution.
fetchData()
task to run immediately.fetchData()
Mono
, based on the result of the fetch operation, I manually schedule the next run. As this happens at the end of the operation, it is an end-to-start interval and I don't have to solve potential problems of overlapped method runs.@Service
@Slf4j
@RequiredArgsConstructor
public class DataUpdaterService {
@Autowired
private MyServiceConnector myServiceConnector;
private TaskScheduler scheduler;
private final Clock clock = Clock.systemUTC();
@Getter
private volatile Data data = null;
@PostConstruct
void init() {
final ScheduledExecutorService localExecutor = Executors.newSingleThreadScheduledExecutor();
scheduler = new ConcurrentTaskScheduler(localExecutor);
// Schedule the task to run for the first time right now:
scheduleFetchTask(Duration.ZERO);
}
private void scheduleFetchTask(Duration interval) {
scheduler.schedule(this::fetchData, clock.instant().plus(interval));
}
private void fetchData() {
myServiceConnector.fetchData()
.doOnSuccess(fetchedData -> {
this.data = fetchedData;
// Schedule the task in the normal mode:
scheduleFetchTask(Duration.ofMinutes(10));
})
.doOnError(throwable -> {
log.error("Error fetching data from myService", throwable);
// Schedule the task in the retry mode:
scheduleFetchTask(Duration.ofSeconds(5));
})
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
}
The signature of the myServiceConnector.fetchData()
is
Mono<Data> fetchData()
Upvotes: 0
Reputation: 131
Edit: Updated to show use of always AlwaysRetryPolicy (retry until successful).
Seems like what you really need is just retry logic integrated into your @Scheduled
method. Spring Retry is a Spring AOP project that could help you here. You can configure things like a number of retries, and a delay to wait before retrying. Broadly it would look like this (there is additional configuration necessary to enable this):
@Bean
public RetryTemplate retryTemplate() {
RetryTemplate retryTemplate = new RetryTemplate();
AlwaysRetryPolicy policy = new AlwaysRetryPolicy();
retryTemplate.setRetryPolicy(retryPolicy);
return retryTemplate;
}
...
@Scheduled(fixedDelay = 10, timeUnit = TimeUnit.MINUTES)
public void fetchData() {
retryTemplate.execute(context -> {
this.data = myServiceConnector.fetchData();
});
}
The project has more information on Github and here is a guide from Baeldung.
Upvotes: 2