gtludwig
gtludwig

Reputation: 5601

How to best implement a multi-thread approach in Spring Boot

I have been struggling with implementing a multi-threaded approach to the application I am working on.

The part I want to run in parallel threads was originally constructed with a for loop going about a list.

@Service
public ApplicationServiceImpl implements ApplicationService {

    @Override
    public ResponseEntity<Void> startProcess(List<MyObject> myObjectList) throws Exception {
        for (MyObject myObject : myObjectList) {
            AnotherTypeOfObject anotherTypeOfObject = runMethodA(myObject);
            YetAnotherTypeOfObject yetAnotherTypeOfObject = runMethodB(anotherTypeOfObject);
            runMethodC(yetAnotherTypeOfObject, aStringValue, anotherStringValue);
            runMethodD(yetAnotherTypeOfObject);
        }
    }
}

The methods private AnotherTypeOfObject runMethodA(MyObject myObject) {...}, private YetAnotherTypeOfObject yetAnotherTypeOfObject(AnotherTypeOfObject anotherTypeOfObject) {...}, private void runMethodC(YetAnotherTypeOfObject yetAnotherTypeOfObject, String aStringValue, String anotherStringValue) {...} and private void runMethodD(MyObject myObject) {...} only use local variables.

I have looked quite a bit to get a solution that would allow firing the threads of a list of 100s of MyObject instead of one after the other.

What I have done is create a:

@Configuration
@EnableAsync
public class AsyncConfiguration() {

    @Bean(name = "threadPoolTaskExecutor")
    public Executor aSyncExecutor() {
        final ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
        threadPoolTaskExecutor.setCorePoolSize(4);
        threadPoolTaskExecutor.setMaxPoolSize(4);
        threadPoolTaskExecutor.setQueueCapacity(50);
        threadPoolTaskExecutor.setThreadNamePrefix("threadNamePrefix");
        threadPoolTaskExecutor.initialize();
        return threadPoolTaskExecutor;
    }
}

I do have loads of log.info("some recognizable text") through the methods A, B, C and D so I can make sure what is going on and I aggregated these methods into one like

private void runThreads(MyObject myObject, String aStringValue, String anotherStringValue) {
    AnotherTypeOfObject anotherTypeOfObject = runMethodA(myObject);
    YetAnotherTypeOfObject yetAnotherTypeOfObject = runMethodB(anotherTypeOfObject);
    runMethodC(yetAnotherTypeOfObject, aStringValue, anotherStringValue);
    runMethodD(yetAnotherTypeOfObject);
}

And I have tried to run the main method as:

@Override
@Async("threadPoolTaskExecutor")
public ResponseEntity<Void> startProcess(List<MyObject> myObjectList) throws Exception {
    String aStringValue = myObject.getAStringValue();
    String anotherStringValue = myObject.getAnotherStringValue();
    myObjectList.forEach(myObject -> runThreads(myObject, aStringValue, anotherStringValue));
}

I still don't get the intended result of firing a few threads for the runThreads(...) {} method, so the processing is done in parallel.

What am I missing here?

Upvotes: 1

Views: 18768

Answers (2)

Kais Neffati
Kais Neffati

Reputation: 45

Actually you are not parallelising the for loop, but the method that executes the for loop. A single thread in this case would execute all the loop.

You need to put the @Async on top of runThreads()

Although It's not recommended to create the executor with static configurations. Try to use the completablefuture API : https://www.baeldung.com/java-completablefuture

Upvotes: 4

GeertPt
GeertPt

Reputation: 17846

If it's only for running all elements of a collection in parallel, then you can use Stream.parallel(). It uses a default ForkJoinPool with a thread per CPU core. This is the simplest method introduced in Java 8.

myObjectList.stream()
        .parallel()
        .forEach(myObject -> runThreads(myObject, myObject.getAStringValue(),  myObject.getAnotherStringValue()));

For this you don't need any @Async or Spring-provided Executor.

You can use a custom ForkJoinPool to customize the number of threads, but the default might work well, too.

ForkJoinPool customThreadPool = new ForkJoinPool(4);
customThreadPool.invoke(
        () -> myObjectList.stream()
                .parallel()
                .forEach(myObject -> runThreads(myObject, myObject.getAStringValue(),  myObject.getAnotherStringValue())));

Upvotes: 8

Related Questions