Reputation: 405
I have setup what I think is a relatively simple Spring State Machine. Sometimes (maybe every time) something triggers an external event, I get a ton of these errors. Any idea what I am doing wrong?
I get the following error about 10 times in a row when it happens.
[parallel-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 10/10
Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 10/10
at reactor.core.Exceptions.retryExhausted(Exceptions.java:290)
at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67)
at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:374)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:295)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:884)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:270)
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:285)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:56)
at org.springframework.statemachine.support.ReactiveStateMachineExecutor$1.lambda$null$0(ReactiveStateMachineExecutor.java:461)
at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:56)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.resubscribe(FluxRetryWhen.java:215)
at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onNext(FluxRetryWhen.java:268)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
My state machine looks like this:
public abstract class DBStateMachine extends StateMachineConfigurerAdapter<DBStates, DBStateChanges> {
public abstract Guard<DBStates, DBStateChanges> isDown();
public abstract Guard<DBStates, DBStateChanges> isReady();
@Override
public void configure(StateMachineStateConfigurer<DBStates, DBStateChanges> states)
throws Exception {
states
.withStates()
.initial(DBStates.INITIALIZING)
.junction(DBStates.TESTING)
// .end(DBStates.READY)
.states(EnumSet.allOf(DBStates.class));
}
@Override
public void configure(
StateMachineTransitionConfigurer<DBStates, DBStateChanges> transitions)
throws Exception {
//@formatter:off
transitions.withInternal()
.source(DBStates.INITIALIZING)
.action(timerAction())
.timer(100).and()
.withJunction()
.source(DBStates.TESTING)
.first(DBStates.DOWN, isDown())
.then(DBStates.READY, isReady())
.last(DBStates.BAD_PASSWORD).and()
.withExternal()
.source(DBStates.BAD_PASSWORD)
.target(DBStates.TESTING)
.event(DBStateChanges.PASSWORD_RECEIVED).and()
.withInternal()
.source(DBStates.DOWN)
.action(timerAction())
.timer(10000).and()
.withExternal()
.source(DBStates.DOWN)
.target(DBStates.TESTING)
.event(DBStateChanges.RETRY).and()
.withExternal()
.source(DBStates.INITIALIZING)
.target(DBStates.TESTING)
.event(DBStateChanges.RETRY).and()
.withExternal()
.source(DBStates.READY)
.target(DBStates.TESTING)
.event(DBStateChanges.ERROR);
//@formatter:on
}
@Bean
public TimerAction timerAction() {
return new TimerAction();
}
public class TimerAction implements Action<DBStates, DBStateChanges> {
@Override
public void execute(StateContext<DBStates, DBStateChanges> context) {
// do something in every 1 sec
// context.getStateMachine().
// TODO make this reactive?
context.getStateMachine().sendEvent(DBStateChanges.RETRY);
}
}
}
I think the code that is triggering the error is this:
@Autowired
private StateMachine<DBStates, DBStateChanges> statemachine;
public void triggerStateChange() {
statemachine.sendEvent(DBStateChanges.ERROR);
}
Edit
I also changed the triggerStateChange method to look like this and still get the error.
public void triggerStateChange() {
Message<DBStateChanges> m = new GenericMessage<>(DBStateChanges.ERROR);
statemachine.sendEvent(Mono.just(m)).subscribe();
// statemachine.sendEvent(DBStateChanges.ERROR);
}
I should mention that I'm on Java 16.0.1, Spring Boot 2.5, Spring Framework 5.3.7, and Spring State Machine 3.0.1.
Upvotes: 2
Views: 1287
Reputation: 1
I ran into the same problem and found that it was timer()
. If the value is too small (in your case, only 100 milliseconds), then ReactiveStateMachineExecutor.registerTriggerListener
cause this error. See also this github ticket
In my case, it was possible to raise the value for the timers to a few seconds and the errors went away
Upvotes: 0
Reputation: 11
The Sinks.many() is not thread safe, the sucriber must be signaled serially.A similar error occured like this How to call Sinks.Many<T>.tryEmitNext from multiple threads?
Upvotes: 1