Kurt Clauss
Kurt Clauss

Reputation: 73

Spring state machine nested machines running synchronously

I'm working with the Spring State Machine and am trying to configure some nested functionality. Essentially, I'm trying to run two processes as individual machines nested within a state. I have the following code for states and transitions:

public enum States {
READY, FORK, JOIN, TASKS, TERMINATE,

T1_INIT, C1_INIT, T1_READY, T1_POLL, T1_PROCESS, T1_STORE, T1_DELAY, C1_CONTINUE, T1_TERMINATE,
T2_INIT, C2_INIT, T2_READY, T2_POLL, T2_PROCESS, T2_STORE, T2_DELAY, C2_CONTINUE, T2_TERMINATE }

public enum Events {INITIALIZE, RUN, STOP, FALLBACK, CONTINUE, FIX, PROC_COMPLETE}
@Override
public void configure(StateMachineStateConfigurer<States, Events> states)
        throws Exception {
    states
            .withStates()
            .initial(States.READY)
            .fork(States.FORK)
            .state(States.TASKS)
            .join(States.JOIN)
            .end(States.TERMINATE)
            .and()
            .withStates()
            .parent(States.TASKS)
            .initial(States.T1_INIT, initT1Action())
            .state(States.T1_READY)
            .state(States.T1_POLL, pollT1Action(), null)
            .state(States.T1_PROCESS, processT1Action(), null)
            .state(States.T1_STORE, storeT1Action(), null)
            .state(States.T1_DELAY)
            .choice(States.C1_CONTINUE)
            .end(States.T1_TERMINATE)
            .and()
            .withStates()
            .parent(States.TASKS)
            .initial(States.T2_INIT, initT2Action())
            .state(States.T2_READY)
            .state(States.T2_POLL, pollT2Action(), null)
            .state(States.T2_PROCESS, processT2Action(), null)
            .state(States.T2_STORE, storeT2Action(), null)
            .state(States.T2_DELAY)
            .choice(States.C2_CONTINUE)
            .end(States.T2_TERMINATE);

}

@Override
public void configure(StateMachineTransitionConfigurer<States, Events> transitions)
        throws Exception {
    transitions
            .withExternal()
            .source(States.READY).target(States.FORK)
            .and()
            .withFork()
            .source(States.FORK).target(States.TASKS)
            .and()
            .withExternal()
            .source(States.T1_INIT).target(States.T1_READY)
            .and()
            .withExternal()
            .source(States.T1_READY).target(States.T1_POLL)
            .and()
            .withExternal()
            .source(States.T1_POLL).target(States.T1_PROCESS).guard(pollT1Guard())
            .and()
            .withExternal()
            .source(States.T1_PROCESS).target(States.T1_STORE).guard(processT1Guard())
            .and()
            .withExternal()
            .source(States.T1_STORE).target(States.T1_DELAY).guard(storeT1Guard())
            .and()
            .withExternal()
            .source(States.T1_DELAY).target(States.C1_CONTINUE)
            .and()
            .withChoice()
            .source(States.C1_CONTINUE)
            .first(States.T1_POLL, continueChoiceT1Guard())
            .last(States.T1_TERMINATE)
            .and()
            .withExternal()
            .source(States.T2_INIT).target(States.T2_READY)
            .and()
            .withExternal()
            .source(States.T2_READY).target(States.T2_POLL)
            .and()
            .withExternal()
            .source(States.T2_POLL).target(States.T2_PROCESS).guard(pollT2Guard())
            .and()
            .withExternal()
            .source(States.T2_PROCESS).target(States.T2_STORE).guard(processT2Guard())
            .and()
            .withExternal()
            .source(States.T2_STORE).target(States.T2_DELAY).guard(storeT2Guard())
            .and()
            .withExternal()
            .source(States.T2_DELAY).target(States.C2_CONTINUE)
            .and()
            .withChoice()
            .source(States.C2_CONTINUE)
            .first(States.T2_POLL, continueChoiceT2Guard())
            .last(States.T2_TERMINATE)
            .and()
            .withJoin()
            .source(States.TASKS).target(States.JOIN)
            .and()
            .withExternal()
            .source(States.JOIN).target(States.TERMINATE);

}

I was expecting the machine to fork the two machines identified in the Task state and run them as individual machines. T1 and T2 actions are set up to have a delay in each state within Task. T1 is delayed 1 second, and T2 is delayed half a second.

What I find is T1 runs through all of it's states and ends before T2 even starts. Any thoughts on making them run as they should with a fork/join??

Upvotes: 2

Views: 3493

Answers (2)

Kurt Clauss
Kurt Clauss

Reputation: 73

In fact, here's the code for the entire FSM configuration class:

@Configuration
@EnableStateMachine
@WithStateMachine
public class FSMFactoryConfig extends EnumStateMachineConfigurerAdapter<States, Events> {

@Autowired
StateMachine<States, Events> stateMachine;

ArrayList<IDataProcessor> dp = new ArrayList<>();

private void createProcessors() {
    dp.add(0, (IDataProcessor) new DataProcessor());
    dp.add(1, (IDataProcessor) new DataProcessor2());
}

@Override
public void configure(StateMachineConfigurationConfigurer<States, Events> config)
        throws Exception {

    createProcessors();

    config
            .withConfiguration()
            .autoStartup(false)
            .taskExecutor(taskExecutor())
            .taskScheduler(new ConcurrentTaskScheduler())
            .listener(new StateMachineEventListener());
}

@Override
public void configure(StateMachineStateConfigurer<States, Events> states)
        throws Exception {
    states
            .withStates()
            .initial(States.READY)
            .fork(States.FORK)
            .state(States.TASKS)
            .join(States.JOIN)
            .end(States.TERMINATE)
            .and()
            .withStates()
            .parent(States.TASKS)
            .initial(States.T1_INIT, initT1Action())
            .state(States.T1_READY)
            .state(States.T1_POLL, pollT1Action(), null)
            .state(States.T1_PROCESS, processT1Action(), null)
            .state(States.T1_STORE, storeT1Action(), null)
            .state(States.T1_DELAY)
            .choice(States.C1_CONTINUE)
            .end(States.T1_TERMINATE)
            .and()
            .withStates()
            .parent(States.TASKS)
            .initial(States.T2_INIT, initT2Action())
            .state(States.T2_READY)
            .state(States.T2_POLL, pollT2Action(), null)
            .state(States.T2_PROCESS, processT2Action(), null)
            .state(States.T2_STORE, storeT2Action(), null)
            .state(States.T2_DELAY)
            .choice(States.C2_CONTINUE)
            .end(States.T2_TERMINATE);

}

@Override
public void configure(StateMachineTransitionConfigurer<States, Events> transitions)
        throws Exception {
    transitions
            .withExternal()
            .source(States.READY).target(States.FORK)
            .and()
            .withFork()
            .source(States.FORK).target(States.TASKS)
            .and()
            .withExternal()
            .source(States.T1_INIT).target(States.T1_READY)
            .and()
            .withExternal()
            .source(States.T1_READY).target(States.T1_POLL)
            .and()
            .withExternal()
            .source(States.T1_POLL).target(States.T1_PROCESS).guard(pollT1Guard())
            .and()
            .withExternal()
            .source(States.T1_PROCESS).target(States.T1_STORE).guard(processT1Guard())
            .and()
            .withExternal()
            .source(States.T1_STORE).target(States.T1_DELAY).guard(storeT1Guard())
            .and()
            .withExternal()
            .source(States.T1_DELAY).target(States.C1_CONTINUE)
            .and()
            .withChoice()
            .source(States.C1_CONTINUE)
            .first(States.T1_POLL, continueChoiceT1Guard())
            .last(States.T1_TERMINATE)
            .and()
            .withExternal()
            .source(States.T2_INIT).target(States.T2_READY)
            .and()
            .withExternal()
            .source(States.T2_READY).target(States.T2_POLL)
            .and()
            .withExternal()
            .source(States.T2_POLL).target(States.T2_PROCESS).guard(pollT2Guard())
            .and()
            .withExternal()
            .source(States.T2_PROCESS).target(States.T2_STORE).guard(processT2Guard())
            .and()
            .withExternal()
            .source(States.T2_STORE).target(States.T2_DELAY).guard(storeT2Guard())
            .and()
            .withExternal()
            .source(States.T2_DELAY).target(States.C2_CONTINUE)
            .and()
            .withChoice()
            .source(States.C2_CONTINUE)
            .first(States.T2_POLL, continueChoiceT2Guard())
            .last(States.T2_TERMINATE)
            .and()
            .withJoin()
            .source(States.TASKS).target(States.JOIN)
            .and()
            .withExternal()
            .source(States.JOIN).target(States.TERMINATE);

}



@Bean
public Guard<States, Events> initChoiceGuard() {
    return new Guard<States, Events>() {

        @Override
        public boolean evaluate(StateContext<States, Events> context) {
            System.out.println("in initChoiceGuard...");
            return true;
        }
    };
}

int i1 = 0;
int i2 = 0;

@Bean
public Guard<States, Events> continueChoiceT1Guard() {

    return new Guard<States, Events>() {
        @Override
        public boolean evaluate(StateContext<States, Events> context) {
            System.out.println("in continueChoiceT1Guard - count: " + i1);
            return (i1++ < 5);
        }
    };
}

@Bean
public Guard<States, Events> continueChoiceT2Guard() {

    return new Guard<States, Events>() {
        @Override
        public boolean evaluate(StateContext<States, Events> context) {
            System.out.println("in continueChoiceT2Guard - count: " + i2);
            return (i2++ < 10);
        }
    };
}

@Bean
public Guard<States, Events> tasksChoiceGuard() {
    return new Guard<States, Events>() {

        @Override
        public boolean evaluate(StateContext<States, Events> context) {
            System.out.println("in tasksChoiceGuard");
            return true;
        }
    };
}

@Bean
public Action<States, Events> initT1Action() {
    return new Action<States, Events>() {
        @Override
        public void execute(StateContext<States, Events> context) {
            System.out.println("in initAction");
            ((IDataProcessor) dp.get(0)).init();
        }
    };
}

@Bean
public Action<States, Events> initT2Action() {
    return new Action<States, Events>() {
        @Override
        public void execute(StateContext<States, Events> context) {
            System.out.println("in initAction");
            ((IDataProcessor) dp.get(1)).init();
        }
    };
}

@Bean
public Action<States, Events> pollT1Action() {
    return new Action<States, Events>() {

        @Override
        public void execute(StateContext<States, Events> context) {
            System.out.println("in pollAction");
            ((IDataProcessor) dp.get(0)).poll();

        }
    };
}

@Bean
public Action<States, Events> pollT2Action() {
    return new Action<States, Events>() {

        @Override
        public void execute(StateContext<States, Events> context) {
            System.out.println("in pollAction");
            ((IDataProcessor) dp.get(1)).poll();

        }
    };
}

@Bean
public Action<States, Events> processT1Action() {
    return new Action<States, Events>() {

        @Override
        public void execute(StateContext<States, Events> context) {
            System.out.println("in pollAction");
            ((IDataProcessor) dp.get(0)).process();

        }
    };
}

@Bean
public Action<States, Events> processT2Action() {
    return new Action<States, Events>() {

        @Override
        public void execute(StateContext<States, Events> context) {
            System.out.println("in pollAction");
            ((IDataProcessor) dp.get(1)).process();

        }
    };
}

@Bean
public Action<States, Events> storeT1Action() {
    return new Action<States, Events>() {

        @Override
        public void execute(StateContext<States, Events> context) {
            System.out.println("in pollAction");
            ((IDataProcessor) dp.get(0)).store();

        }
    };
}

@Bean
public Action<States, Events> storeT2Action() {
    return new Action<States, Events>() {

        @Override
        public void execute(StateContext<States, Events> context) {
            System.out.println("in pollAction");
            ((IDataProcessor) dp.get(1)).store();

        }
    };
}

@Bean
public Action<States, Events> resetT1Action() {
    return new Action<States, Events>() {

        @Override
        public void execute(StateContext<States, Events> context) {
            System.out.println("in pollAction");
            ((IDataProcessor) dp.get(0)).reset();

        }
    };
}

@Bean
public Action<States, Events> resetT2Action() {
    return new Action<States, Events>() {

        @Override
        public void execute(StateContext<States, Events> context) {
            System.out.println("in pollAction");
            ((IDataProcessor) dp.get(1)).reset();

        }
    };
}

@Bean
public Guard<States, Events> pollT1Guard() {
    return new Guard<States, Events>() {
        @Override
        public boolean evaluate(StateContext<States, Events> context) {
            return ((IDataProcessor) dp.get(0)).pollComplete();
        }
    };
}

@Bean
public Guard<States, Events> pollT2Guard() {
    return new Guard<States, Events>() {
        @Override
        public boolean evaluate(StateContext<States, Events> context) {
            return ((IDataProcessor) dp.get(1)).pollComplete();
        }
    };
}

@Bean
public Guard<States, Events> processT1Guard() {
    return new Guard<States, Events>() {
        @Override
        public boolean evaluate(StateContext<States, Events> context) {
            return ((IDataProcessor) dp.get(0)).processComplete();
        }
    };
}

@Bean
public Guard<States, Events> processT2Guard() {
    return new Guard<States, Events>() {
        @Override
        public boolean evaluate(StateContext<States, Events> context) {
            return ((IDataProcessor) dp.get(1)).processComplete();
        }
    };
}

@Bean
public Guard<States, Events> storeT1Guard() {
    return new Guard<States, Events>() {
        @Override
        public boolean evaluate(StateContext<States, Events> context) {
            return ((IDataProcessor) dp.get(0)).storeComplete();
        }
    };
}

@Bean
public Guard<States, Events> storeT2Guard() {
    return new Guard<States, Events>() {
        @Override
        public boolean evaluate(StateContext<States, Events> context) {
            return ((IDataProcessor) dp.get(1)).storeComplete();
        }
    };
}

@Bean
public Action<States, Events> fixAction() {
    return new Action<States, Events>() {

        @Override
        public void execute(StateContext<States, Events> context) {
            System.out.println("in fixAction");
        }
    };
}

@Bean
public StateMachine<States, Events> stateMachine() {
    return stateMachine;
}

@Bean(name = StateMachineSystemConstants.TASK_EXECUTOR_BEAN_NAME)
public TaskExecutor taskExecutor() {
    ThreadPoolTaskExecutor te = new ThreadPoolTaskExecutor();
    te.setMaxPoolSize(50);
    te.setThreadNamePrefix("LULExecutor-");
    te.setCorePoolSize(25);
    te.initialize();
    return te;
}

//@StatesOnTransition(target = States.AUTOMATIC)
public void automaticFix(ExtendedState extendedState) {

}

Upvotes: -1

Janne Valkealahti
Janne Valkealahti

Reputation: 2646

You didn't show full configuration, from you code sample at least taskExecutor is missing. For example tasks sample doing fork/join is using ThreadPoolTaskExecutor. Default executor is SyncTaskExecutor. Docs have some notes how it is configured.

https://github.com/spring-projects/spring-statemachine/blob/master/spring-statemachine-samples/tasks/src/main/java/demo/tasks/Application.java#L189

One other thing in the code example is that you have zero events defined meaning you would not be able to drive a machine. You have defined transition from READY to FORK without an event, meaning it's a triggerless transition. When you start a state machine, it's then changed into its initial state READY and anonymous transition takes it to FORK. This starting sequence happens during a start and is outside of an actual executor.

Try to define transition from READY to FORK with an event, then send that event to machine after it has been started and parallel execution should work(assuming taskExecutor is properly set).

Upvotes: 0

Related Questions