Alwin G
Alwin G

Reputation: 115

Spring boot multithreaded async not working

The task is to call a database, retrieve certain records update and save them. As the amount of records if fairly large we want to do this Async, however, this doesn't seem to be implemented correctly.

The main class:

@SpringBootApplication
@EnableAsync
MainApplication() {

    @Bean("threadPoolExecutor")
    public TaskExecutor getAsyncExecutor(){
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
            executor.setCorePoolSize(DataSourceConfig.getTHREAD_POOL_SIZE());
            executor.setMaxPoolSize(DataSourceConfig.getTHREAD_POOL_SIZE());
            executor.setWaitForTasksToCompleteOnShutdown(true);
            executor.setThreadNamePrefix("RetryEnhancement-");
            return executor;
        }
    }

Method in the first service:

@Service
public class FirstService() {
    @Transactional
    public void fullProcess() {
        for(int counter = 0; counter < ConfigFile.getTHREADS(); counter++){
            secondaryService.threads();
         }
    }
}

Method in the second service:

@Service
public class SecondService () {
    @Async("threadPoolExecutor")
    public void threads() {
        while(thirdService.threadMethod()) {
            //doNothing
        }
    }
}

Method in the third service:

@Service
public class ThirdService() {
    @Transactional
    public boolean threads() {
        Record record = repository.fetchRecord();
        if(record!=null) {
            updateRecord(record);
            saveRecord(record);
            return true;
        } else {
            return false;
        }
    }
}

Repository:

public interface repository extends CrudRepository<Record, long> {
    @Lock(LockModeType.PESSIMISTIC_WRITE)
    Record fetchRecord();
}

The issue I'm finding is that, while the code executes perfectly fine, it seems to have a Synchronous execution (found by adding a .sleep and watching the execution in the logger). The seperate threads seem to be waiting until the other is executed. I'm probably doing something wrong and if another thread already explains the issue, than please refer it, though I have not been able to find this issue in a different thread.

Upvotes: 0

Views: 952

Answers (1)

M. Deinum
M. Deinum

Reputation: 125252

Your solution is way to complex. Ditch all of that and just inject the TaskExecutor and do the updateRecord in a separate thread (you might need to retrieve it again as you are now using a different thread and thus connection.

Something like this should do the trick

private final TaskExecutor executor; // injected through constructor

public void process() {
  Stream<Record> records = repository.fetchRecords(); // Using a stream gives you a lazy cursor!
  records.forEach(this::processRecord);
}

private void processRecord(Record record) {
  executor.submit({
    updateRecord(record);
    saveRecord(record);
  });
}

You might want to put the processRecord into another object and make it @Transactional or wrap it in a TransactionTemplate to get that behavior.

Upvotes: 1

Related Questions