cstack
cstack

Reputation: 405

Spring State Machine Sometimes Spams Retry Exhausted Error

I have setup what I think is a relatively simple Spring State Machine. Sometimes (maybe every time) something triggers an external event, I get a ton of these errors. Any idea what I am doing wrong?

I get the following error about 10 times in a row when it happens.

[parallel-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 10/10
Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 10/10
    at reactor.core.Exceptions.retryExhausted(Exceptions.java:290)
    at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67)
    at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:374)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:295)
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:884)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:270)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:285)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
    at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:56)
    at org.springframework.statemachine.support.ReactiveStateMachineExecutor$1.lambda$null$0(ReactiveStateMachineExecutor.java:461)
    at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:56)
    at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.resubscribe(FluxRetryWhen.java:215)
    at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onNext(FluxRetryWhen.java:268)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281)
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)

My state machine looks like this:


public abstract class DBStateMachine extends StateMachineConfigurerAdapter<DBStates, DBStateChanges> {

    public abstract Guard<DBStates, DBStateChanges> isDown();

    public abstract Guard<DBStates, DBStateChanges> isReady();

    @Override
    public void configure(StateMachineStateConfigurer<DBStates, DBStateChanges> states)
            throws Exception {

        states
                .withStates()
                .initial(DBStates.INITIALIZING)
                .junction(DBStates.TESTING)
                // .end(DBStates.READY)
                .states(EnumSet.allOf(DBStates.class));

    }

    @Override
    public void configure(
            StateMachineTransitionConfigurer<DBStates, DBStateChanges> transitions)
            throws Exception {
        //@formatter:off
        transitions.withInternal()
                        .source(DBStates.INITIALIZING)
                        .action(timerAction())
                        .timer(100).and()
                    .withJunction()
                        .source(DBStates.TESTING)
                        .first(DBStates.DOWN, isDown())
                        .then(DBStates.READY, isReady())
                        .last(DBStates.BAD_PASSWORD).and()
                    .withExternal()
                        .source(DBStates.BAD_PASSWORD)
                        .target(DBStates.TESTING)
                        .event(DBStateChanges.PASSWORD_RECEIVED).and()
                    .withInternal()
                        .source(DBStates.DOWN)
                        .action(timerAction())
                        .timer(10000).and()
                    .withExternal()
                        .source(DBStates.DOWN)
                        .target(DBStates.TESTING)
                        .event(DBStateChanges.RETRY).and()
                        .withExternal()
                        .source(DBStates.INITIALIZING)
                        .target(DBStates.TESTING)
                        .event(DBStateChanges.RETRY).and()
                    .withExternal()
                        .source(DBStates.READY)
                        .target(DBStates.TESTING)
                        .event(DBStateChanges.ERROR);
            //@formatter:on
    }

    @Bean
    public TimerAction timerAction() {
        return new TimerAction();
    }

    public class TimerAction implements Action<DBStates, DBStateChanges> {

        @Override
        public void execute(StateContext<DBStates, DBStateChanges> context) {
            // do something in every 1 sec
            // context.getStateMachine().
            // TODO make this reactive?
            context.getStateMachine().sendEvent(DBStateChanges.RETRY);
        }
    }
    
    
}

I think the code that is triggering the error is this:

@Autowired
    private StateMachine<DBStates, DBStateChanges> statemachine;
public void triggerStateChange() {
        statemachine.sendEvent(DBStateChanges.ERROR);
    }

Edit

I also changed the triggerStateChange method to look like this and still get the error.

public void triggerStateChange() {
    
        Message<DBStateChanges> m = new GenericMessage<>(DBStateChanges.ERROR); 
        statemachine.sendEvent(Mono.just(m)).subscribe();
//      statemachine.sendEvent(DBStateChanges.ERROR);
    }

I should mention that I'm on Java 16.0.1, Spring Boot 2.5, Spring Framework 5.3.7, and Spring State Machine 3.0.1.

Upvotes: 2

Views: 1287

Answers (2)

Kalganov Roman
Kalganov Roman

Reputation: 1

I ran into the same problem and found that it was timer(). If the value is too small (in your case, only 100 milliseconds), then ReactiveStateMachineExecutor.registerTriggerListener cause this error. See also this github ticket

In my case, it was possible to raise the value for the timers to a few seconds and the errors went away

Upvotes: 0

Jun Tang
Jun Tang

Reputation: 11

The Sinks.many() is not thread safe, the sucriber must be signaled serially.A similar error occured like this How to call Sinks.Many<T>.tryEmitNext from multiple threads?

Upvotes: 1

Related Questions