Reputation: 73
I'm working with the Spring State Machine and am trying to configure some nested functionality. Essentially, I'm trying to run two processes as individual machines nested within a state. I have the following code for states and transitions:
public enum States {
READY, FORK, JOIN, TASKS, TERMINATE,
T1_INIT, C1_INIT, T1_READY, T1_POLL, T1_PROCESS, T1_STORE, T1_DELAY, C1_CONTINUE, T1_TERMINATE,
T2_INIT, C2_INIT, T2_READY, T2_POLL, T2_PROCESS, T2_STORE, T2_DELAY, C2_CONTINUE, T2_TERMINATE }
public enum Events {INITIALIZE, RUN, STOP, FALLBACK, CONTINUE, FIX, PROC_COMPLETE}
@Override
public void configure(StateMachineStateConfigurer<States, Events> states)
throws Exception {
states
.withStates()
.initial(States.READY)
.fork(States.FORK)
.state(States.TASKS)
.join(States.JOIN)
.end(States.TERMINATE)
.and()
.withStates()
.parent(States.TASKS)
.initial(States.T1_INIT, initT1Action())
.state(States.T1_READY)
.state(States.T1_POLL, pollT1Action(), null)
.state(States.T1_PROCESS, processT1Action(), null)
.state(States.T1_STORE, storeT1Action(), null)
.state(States.T1_DELAY)
.choice(States.C1_CONTINUE)
.end(States.T1_TERMINATE)
.and()
.withStates()
.parent(States.TASKS)
.initial(States.T2_INIT, initT2Action())
.state(States.T2_READY)
.state(States.T2_POLL, pollT2Action(), null)
.state(States.T2_PROCESS, processT2Action(), null)
.state(States.T2_STORE, storeT2Action(), null)
.state(States.T2_DELAY)
.choice(States.C2_CONTINUE)
.end(States.T2_TERMINATE);
}
@Override
public void configure(StateMachineTransitionConfigurer<States, Events> transitions)
throws Exception {
transitions
.withExternal()
.source(States.READY).target(States.FORK)
.and()
.withFork()
.source(States.FORK).target(States.TASKS)
.and()
.withExternal()
.source(States.T1_INIT).target(States.T1_READY)
.and()
.withExternal()
.source(States.T1_READY).target(States.T1_POLL)
.and()
.withExternal()
.source(States.T1_POLL).target(States.T1_PROCESS).guard(pollT1Guard())
.and()
.withExternal()
.source(States.T1_PROCESS).target(States.T1_STORE).guard(processT1Guard())
.and()
.withExternal()
.source(States.T1_STORE).target(States.T1_DELAY).guard(storeT1Guard())
.and()
.withExternal()
.source(States.T1_DELAY).target(States.C1_CONTINUE)
.and()
.withChoice()
.source(States.C1_CONTINUE)
.first(States.T1_POLL, continueChoiceT1Guard())
.last(States.T1_TERMINATE)
.and()
.withExternal()
.source(States.T2_INIT).target(States.T2_READY)
.and()
.withExternal()
.source(States.T2_READY).target(States.T2_POLL)
.and()
.withExternal()
.source(States.T2_POLL).target(States.T2_PROCESS).guard(pollT2Guard())
.and()
.withExternal()
.source(States.T2_PROCESS).target(States.T2_STORE).guard(processT2Guard())
.and()
.withExternal()
.source(States.T2_STORE).target(States.T2_DELAY).guard(storeT2Guard())
.and()
.withExternal()
.source(States.T2_DELAY).target(States.C2_CONTINUE)
.and()
.withChoice()
.source(States.C2_CONTINUE)
.first(States.T2_POLL, continueChoiceT2Guard())
.last(States.T2_TERMINATE)
.and()
.withJoin()
.source(States.TASKS).target(States.JOIN)
.and()
.withExternal()
.source(States.JOIN).target(States.TERMINATE);
}
I was expecting the machine to fork the two machines identified in the Task state and run them as individual machines. T1 and T2 actions are set up to have a delay in each state within Task. T1 is delayed 1 second, and T2 is delayed half a second.
What I find is T1 runs through all of it's states and ends before T2 even starts. Any thoughts on making them run as they should with a fork/join??
Upvotes: 2
Views: 3493
Reputation: 73
In fact, here's the code for the entire FSM configuration class:
@Configuration
@EnableStateMachine
@WithStateMachine
public class FSMFactoryConfig extends EnumStateMachineConfigurerAdapter<States, Events> {
@Autowired
StateMachine<States, Events> stateMachine;
ArrayList<IDataProcessor> dp = new ArrayList<>();
private void createProcessors() {
dp.add(0, (IDataProcessor) new DataProcessor());
dp.add(1, (IDataProcessor) new DataProcessor2());
}
@Override
public void configure(StateMachineConfigurationConfigurer<States, Events> config)
throws Exception {
createProcessors();
config
.withConfiguration()
.autoStartup(false)
.taskExecutor(taskExecutor())
.taskScheduler(new ConcurrentTaskScheduler())
.listener(new StateMachineEventListener());
}
@Override
public void configure(StateMachineStateConfigurer<States, Events> states)
throws Exception {
states
.withStates()
.initial(States.READY)
.fork(States.FORK)
.state(States.TASKS)
.join(States.JOIN)
.end(States.TERMINATE)
.and()
.withStates()
.parent(States.TASKS)
.initial(States.T1_INIT, initT1Action())
.state(States.T1_READY)
.state(States.T1_POLL, pollT1Action(), null)
.state(States.T1_PROCESS, processT1Action(), null)
.state(States.T1_STORE, storeT1Action(), null)
.state(States.T1_DELAY)
.choice(States.C1_CONTINUE)
.end(States.T1_TERMINATE)
.and()
.withStates()
.parent(States.TASKS)
.initial(States.T2_INIT, initT2Action())
.state(States.T2_READY)
.state(States.T2_POLL, pollT2Action(), null)
.state(States.T2_PROCESS, processT2Action(), null)
.state(States.T2_STORE, storeT2Action(), null)
.state(States.T2_DELAY)
.choice(States.C2_CONTINUE)
.end(States.T2_TERMINATE);
}
@Override
public void configure(StateMachineTransitionConfigurer<States, Events> transitions)
throws Exception {
transitions
.withExternal()
.source(States.READY).target(States.FORK)
.and()
.withFork()
.source(States.FORK).target(States.TASKS)
.and()
.withExternal()
.source(States.T1_INIT).target(States.T1_READY)
.and()
.withExternal()
.source(States.T1_READY).target(States.T1_POLL)
.and()
.withExternal()
.source(States.T1_POLL).target(States.T1_PROCESS).guard(pollT1Guard())
.and()
.withExternal()
.source(States.T1_PROCESS).target(States.T1_STORE).guard(processT1Guard())
.and()
.withExternal()
.source(States.T1_STORE).target(States.T1_DELAY).guard(storeT1Guard())
.and()
.withExternal()
.source(States.T1_DELAY).target(States.C1_CONTINUE)
.and()
.withChoice()
.source(States.C1_CONTINUE)
.first(States.T1_POLL, continueChoiceT1Guard())
.last(States.T1_TERMINATE)
.and()
.withExternal()
.source(States.T2_INIT).target(States.T2_READY)
.and()
.withExternal()
.source(States.T2_READY).target(States.T2_POLL)
.and()
.withExternal()
.source(States.T2_POLL).target(States.T2_PROCESS).guard(pollT2Guard())
.and()
.withExternal()
.source(States.T2_PROCESS).target(States.T2_STORE).guard(processT2Guard())
.and()
.withExternal()
.source(States.T2_STORE).target(States.T2_DELAY).guard(storeT2Guard())
.and()
.withExternal()
.source(States.T2_DELAY).target(States.C2_CONTINUE)
.and()
.withChoice()
.source(States.C2_CONTINUE)
.first(States.T2_POLL, continueChoiceT2Guard())
.last(States.T2_TERMINATE)
.and()
.withJoin()
.source(States.TASKS).target(States.JOIN)
.and()
.withExternal()
.source(States.JOIN).target(States.TERMINATE);
}
@Bean
public Guard<States, Events> initChoiceGuard() {
return new Guard<States, Events>() {
@Override
public boolean evaluate(StateContext<States, Events> context) {
System.out.println("in initChoiceGuard...");
return true;
}
};
}
int i1 = 0;
int i2 = 0;
@Bean
public Guard<States, Events> continueChoiceT1Guard() {
return new Guard<States, Events>() {
@Override
public boolean evaluate(StateContext<States, Events> context) {
System.out.println("in continueChoiceT1Guard - count: " + i1);
return (i1++ < 5);
}
};
}
@Bean
public Guard<States, Events> continueChoiceT2Guard() {
return new Guard<States, Events>() {
@Override
public boolean evaluate(StateContext<States, Events> context) {
System.out.println("in continueChoiceT2Guard - count: " + i2);
return (i2++ < 10);
}
};
}
@Bean
public Guard<States, Events> tasksChoiceGuard() {
return new Guard<States, Events>() {
@Override
public boolean evaluate(StateContext<States, Events> context) {
System.out.println("in tasksChoiceGuard");
return true;
}
};
}
@Bean
public Action<States, Events> initT1Action() {
return new Action<States, Events>() {
@Override
public void execute(StateContext<States, Events> context) {
System.out.println("in initAction");
((IDataProcessor) dp.get(0)).init();
}
};
}
@Bean
public Action<States, Events> initT2Action() {
return new Action<States, Events>() {
@Override
public void execute(StateContext<States, Events> context) {
System.out.println("in initAction");
((IDataProcessor) dp.get(1)).init();
}
};
}
@Bean
public Action<States, Events> pollT1Action() {
return new Action<States, Events>() {
@Override
public void execute(StateContext<States, Events> context) {
System.out.println("in pollAction");
((IDataProcessor) dp.get(0)).poll();
}
};
}
@Bean
public Action<States, Events> pollT2Action() {
return new Action<States, Events>() {
@Override
public void execute(StateContext<States, Events> context) {
System.out.println("in pollAction");
((IDataProcessor) dp.get(1)).poll();
}
};
}
@Bean
public Action<States, Events> processT1Action() {
return new Action<States, Events>() {
@Override
public void execute(StateContext<States, Events> context) {
System.out.println("in pollAction");
((IDataProcessor) dp.get(0)).process();
}
};
}
@Bean
public Action<States, Events> processT2Action() {
return new Action<States, Events>() {
@Override
public void execute(StateContext<States, Events> context) {
System.out.println("in pollAction");
((IDataProcessor) dp.get(1)).process();
}
};
}
@Bean
public Action<States, Events> storeT1Action() {
return new Action<States, Events>() {
@Override
public void execute(StateContext<States, Events> context) {
System.out.println("in pollAction");
((IDataProcessor) dp.get(0)).store();
}
};
}
@Bean
public Action<States, Events> storeT2Action() {
return new Action<States, Events>() {
@Override
public void execute(StateContext<States, Events> context) {
System.out.println("in pollAction");
((IDataProcessor) dp.get(1)).store();
}
};
}
@Bean
public Action<States, Events> resetT1Action() {
return new Action<States, Events>() {
@Override
public void execute(StateContext<States, Events> context) {
System.out.println("in pollAction");
((IDataProcessor) dp.get(0)).reset();
}
};
}
@Bean
public Action<States, Events> resetT2Action() {
return new Action<States, Events>() {
@Override
public void execute(StateContext<States, Events> context) {
System.out.println("in pollAction");
((IDataProcessor) dp.get(1)).reset();
}
};
}
@Bean
public Guard<States, Events> pollT1Guard() {
return new Guard<States, Events>() {
@Override
public boolean evaluate(StateContext<States, Events> context) {
return ((IDataProcessor) dp.get(0)).pollComplete();
}
};
}
@Bean
public Guard<States, Events> pollT2Guard() {
return new Guard<States, Events>() {
@Override
public boolean evaluate(StateContext<States, Events> context) {
return ((IDataProcessor) dp.get(1)).pollComplete();
}
};
}
@Bean
public Guard<States, Events> processT1Guard() {
return new Guard<States, Events>() {
@Override
public boolean evaluate(StateContext<States, Events> context) {
return ((IDataProcessor) dp.get(0)).processComplete();
}
};
}
@Bean
public Guard<States, Events> processT2Guard() {
return new Guard<States, Events>() {
@Override
public boolean evaluate(StateContext<States, Events> context) {
return ((IDataProcessor) dp.get(1)).processComplete();
}
};
}
@Bean
public Guard<States, Events> storeT1Guard() {
return new Guard<States, Events>() {
@Override
public boolean evaluate(StateContext<States, Events> context) {
return ((IDataProcessor) dp.get(0)).storeComplete();
}
};
}
@Bean
public Guard<States, Events> storeT2Guard() {
return new Guard<States, Events>() {
@Override
public boolean evaluate(StateContext<States, Events> context) {
return ((IDataProcessor) dp.get(1)).storeComplete();
}
};
}
@Bean
public Action<States, Events> fixAction() {
return new Action<States, Events>() {
@Override
public void execute(StateContext<States, Events> context) {
System.out.println("in fixAction");
}
};
}
@Bean
public StateMachine<States, Events> stateMachine() {
return stateMachine;
}
@Bean(name = StateMachineSystemConstants.TASK_EXECUTOR_BEAN_NAME)
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
te.setMaxPoolSize(50);
te.setThreadNamePrefix("LULExecutor-");
te.setCorePoolSize(25);
te.initialize();
return te;
}
//@StatesOnTransition(target = States.AUTOMATIC)
public void automaticFix(ExtendedState extendedState) {
}
Upvotes: -1
Reputation: 2646
You didn't show full configuration, from you code sample at least taskExecutor
is missing. For example tasks
sample doing fork/join is using ThreadPoolTaskExecutor
. Default executor is SyncTaskExecutor
. Docs have some notes how it is configured.
One other thing in the code example is that you have zero events defined meaning you would not be able to drive
a machine. You have defined transition from READY
to FORK
without an event, meaning it's a triggerless transition. When you start a state machine, it's then changed into its initial state READY
and anonymous transition takes it to FORK
. This starting sequence happens during a start and is outside of an actual executor.
Try to define transition from READY
to FORK
with an event, then send that event to machine after it has been started and parallel execution should work(assuming taskExecutor is properly set).
Upvotes: 0