Reputation: 115
The task is to call a database, retrieve certain records update and save them. As the amount of records if fairly large we want to do this Async, however, this doesn't seem to be implemented correctly.
The main class:
@SpringBootApplication
@EnableAsync
MainApplication() {
@Bean("threadPoolExecutor")
public TaskExecutor getAsyncExecutor(){
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(DataSourceConfig.getTHREAD_POOL_SIZE());
executor.setMaxPoolSize(DataSourceConfig.getTHREAD_POOL_SIZE());
executor.setWaitForTasksToCompleteOnShutdown(true);
executor.setThreadNamePrefix("RetryEnhancement-");
return executor;
}
}
Method in the first service:
@Service
public class FirstService() {
@Transactional
public void fullProcess() {
for(int counter = 0; counter < ConfigFile.getTHREADS(); counter++){
secondaryService.threads();
}
}
}
Method in the second service:
@Service
public class SecondService () {
@Async("threadPoolExecutor")
public void threads() {
while(thirdService.threadMethod()) {
//doNothing
}
}
}
Method in the third service:
@Service
public class ThirdService() {
@Transactional
public boolean threads() {
Record record = repository.fetchRecord();
if(record!=null) {
updateRecord(record);
saveRecord(record);
return true;
} else {
return false;
}
}
}
Repository:
public interface repository extends CrudRepository<Record, long> {
@Lock(LockModeType.PESSIMISTIC_WRITE)
Record fetchRecord();
}
The issue I'm finding is that, while the code executes perfectly fine, it seems to have a Synchronous execution (found by adding a .sleep and watching the execution in the logger). The seperate threads seem to be waiting until the other is executed. I'm probably doing something wrong and if another thread already explains the issue, than please refer it, though I have not been able to find this issue in a different thread.
Upvotes: 0
Views: 952
Reputation: 125252
Your solution is way to complex. Ditch all of that and just inject the TaskExecutor
and do the updateRecord
in a separate thread (you might need to retrieve it again as you are now using a different thread and thus connection.
Something like this should do the trick
private final TaskExecutor executor; // injected through constructor
public void process() {
Stream<Record> records = repository.fetchRecords(); // Using a stream gives you a lazy cursor!
records.forEach(this::processRecord);
}
private void processRecord(Record record) {
executor.submit({
updateRecord(record);
saveRecord(record);
});
}
You might want to put the processRecord
into another object and make it @Transactional
or wrap it in a TransactionTemplate
to get that behavior.
Upvotes: 1