Reputation: 1
I am implementing a transaction process using Spring State Machine in a Spring Boot application. The process involves a series of authentication steps (AUTH_1 to AUTH_4) that need to be executed in parallel and async since they are independent of each other. Below is a simplified version of my state machine configuration and the relevant actions:
public class SSMTxn {
public static enum States {
TXN_START,
INITIAL_AUTH,
FORK,
JOIN,
AUTH_1,
AUTH_2,
AUTH_3,
AUTH_4,
AUTH_DONE,
AUTH_SUCCESS,
AUTH_FAILED,
SUCCESSFUL_TXN,
FAILED_TXN,
COMPLETED,
}
public enum Events {
INITIATE_TXN,
AUTHENTICATION_STARTED,
AUTHENTICATION_DONE,
AUTHENTICATION_FAILED,
AUTHENTICATION_SUCCESSFUL,
CREATED_SUCCESSFUL_TXN,
CREATED_FAILED_TXN,
END_TXN,
}
}
@Bean
public Action<States, Events> initialAuthAction() {
return context -> {
try {
log.info(
"createTransaction-thread " +
System.currentTimeMillis() +
"_" +
Thread.currentThread().toString() +
"initialAuthAction started"
);
// perform auth
} catch (Exception ex) {
context.getStateMachine().setStateMachineError(ex);
}
};
}
@Bean
public Action<States, Events> auth1Action() {
return context -> {
try {
log.info(
"createTransaction-thread " +
System.currentTimeMillis() +
"_" +
Thread.currentThread().toString() +
"auth1Action started"
);
// perform auth
} catch (Exception ex) {
context.getStateMachine().setStateMachineError(ex);
// apend the txnResponse
}
};
}
public Action<States, Events> auth2Action() {..}
public Action<States, Events> auth3Action() {..}
public Action<States, Events> auth4Action() {
//..
context.getStateMachine().sendEvent(Events.AUTHENTICATION_DONE);
}
public Action<States, Events> authDoneAction() {
return context -> {
log.info("createTransaction-thread " + System.currentTimeMillis() + "_" + Thread.currentThread().toString() +
": authCompleteAction -started ");
TxnResponse txnResponse = context.getExtendedState().get(RESPONSE, TxnResponse.class);
if (ObjectUtils.isEmpty(txnResponse)) {
context.getStateMachine().sendEvent(Events.AUTHENTICATION_SUCCESSFUL);
}
else {
context.getStateMachine().sendEvent(Events.AUTHENTICATION_FAILED);
}
};
}
public Action<States, Events> authSuccessAction() {..}
public Action<States, Events> authFailedAction() {..}
public Action<States, Events> txnSuccessFullAction() {..}
public Action<States, Events> txnFailedAction() {..}
My Objective: I need to execute the AUTH_1, AUTH_2, AUTH_3, and AUTH_4 actions asynchronously. After all these actions complete, the state should transition based on their collective results (e.g., to AUTH_SUCCESS or AUTH_FAILED).
Requirements:
All actions should start processing simultaneously after the initial authentication. I need to handle the results of these actions in a single endpoint(one controller method), both sending the initial event and retrieving the final resulting state.
I am looking for advice on the best practices to set up these parallel executions within the Spring State Machine framework and how to orchestrate their results effectively. Additionally, how can I ensure that the final state reflects the outcome of all parallel processes?
Any examples or guidance on configuring the state machine for parallel processing of states would be greatly appreciated!
Below is my current config .Currently the auth actions are being executed sequentially.
@Slf4j
@Configuration
@EnableStateMachineFactory
public class StateMachineConfig extends StateMachineConfigurerAdapter<States, Events> {
private static final String REQUEST = "request";
private static final String RESPONSE = "response";
private static final String OUTPUT = "output";
private static final Logger logger = LoggerFactory.getLogger(StateMachineConfigV1.class);
@Autowired
TransactionService transactionService;
@Bean(name = StateMachineSystemConstants.TASK_EXECUTOR_BEAN_NAME)
public TaskExecutor taskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(40);
executor.setThreadNamePrefix("txn_ssm_executor");
executor.initialize();
return executor;
}
@Bean
public ThreadPoolTaskScheduler scheduler(){
ThreadPoolTaskScheduler threadPoolTaskScheduler
= new ThreadPoolTaskScheduler();
threadPoolTaskScheduler.setPoolSize(40);
threadPoolTaskScheduler.setThreadNamePrefix(
"txn_ssm_scheduler_");
return threadPoolTaskScheduler;
}
@Override
public void configure(StateMachineConfigurationConfigurer<States, Events> config) throws Exception {
config.withPersistence()
.runtimePersister(stateMachineRuntimePersisterV0)
.and()
.withConfiguration()
.taskExecutor(taskExecutor())
.taskScheduler(scheduler())
.autoStartup(true)
;
}
@Override
public void configure(StateMachineStateConfigurer<States, Events> states) throws Exception {
states
.withStates()
.initial(States.INITIAL)
.state(States.INITIAL)
.state(States.INITIAL_AUTH, initialAuthAction())
.fork(States.FORK)
.join(States.JOIN)
.state(States.AUTH_DONE, authCompleteAction())
.state(States.AUTH_SUCCESS, authSuccessAction())
.state(States.AUTH_FAILED, authFailedAction())
.state(States.SUCCESSFUL_TXN, handleSuccessfulTxn())
.state(States.FAILED_TXN, authFailedAction())
.state(States.COMPLETED)
.and()
.withStates()
.parent(States.FORK)
.initial(States.AUTH_1)
.state(States.AUTH_1, auth1Action())
.and()
.withStates()
.parent(States.FORK)
.initial(States.AUTH_2)
.state(States.AUTH_2, auth2Action())
.and()
.withStates()
.parent(States.FORK)
.initial(States.AUTH_3)
.state(States.AUTH_3, auth3Action())
.and()
.withStates()
.parent(States.FORK)
.initial(States.AUTH_4)
.state(States.AUTH_4, auth4Action())
;
}
@Override
public void configure(StateMachineTransitionConfigurer<States, Events> transitions) throws Exception {
transitions
.withExternal().source(States.INITIAL).target(States.INITIAL_AUTH).event(Events.INITIATE_TXN)
.and().withExternal().source(States.INITIAL_AUTH).target(States.AUTH_FAILED).event(Events.AUTHENTICATION_FAILED)
.and().withExternal().source(States.INITIAL_AUTH).target(States.FORK).event(Events.AUTHENTICATION_STARTED)
.and().withFork().source(States.FORK).target(States.AUTH_CUSTOMER)
.target(States.AUTH_CARD).target(States.AUTH_MCC).target(States.AUTH_FRM)
.and().withJoin().source(States.AUTH_CUSTOMER).source(States.AUTH_CARD)
.source(States.AUTH_MCC).source(States.AUTH_FRM).target(States.JOIN)
.and().withExternal().source(States.JOIN).target(States.AUTH_DONE).event(Events.AUTHENTICATION_DONE)
.and().withExternal().source(States.AUTH_DONE).target(States.AUTH_SUCCESS).event(Events.AUTHENTICATION_SUCCESSFUL)
.and().withExternal().source(States.AUTH_DONE).target(States.AUTH_FAILED).event(Events.AUTHENTICATION_FAILED)
.and().withExternal().source(States.AUTH_SUCCESS).target(States.SUCCESSFUL_TXN).event(Events.CREATED_SUCCESSFUL_TXN)
.and().withExternal().source(States.AUTH_SUCCESS).target(States.FAILED_TXN).event(Events.CREATED_FAILED_TXN)
.and().withExternal().source(States.AUTH_FAILED).target(States.COMPLETED).event(Events.END_TXN)
.and().withExternal().source(States.SUCCESSFUL_TXN).target(States.COMPLETED).event(Events.END_TXN)
.and().withExternal().source(States.FAILED_TXN).target(States.COMPLETED).event(Events.END_TXN)
;
}
}
Upvotes: 0
Views: 63