Reputation: 793
I am trying to call a long-running method from a Spring application. This method reads the database through JPA and performs its task while the caller method finishes and returns. The problem is that Spring application needs this method to be Transactional, but I just can't make it so. What I'm getting is
Exception in thread "bulk_task_executor_thread1" org.springframework.dao.InvalidDataAccessApiUsageException: You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction.
at org.springframework.data.jpa.repository.query.JpaQueryExecution$StreamExecution.doExecute(JpaQueryExecution.java:343)
at org.springframework.data.jpa.repository.query.JpaQueryExecution.execute(JpaQueryExecution.java:87)
at org.springframework.data.jpa.repository.query.AbstractJpaQuery.doExecute(AbstractJpaQuery.java:116)
at org.springframework.data.jpa.repository.query.AbstractJpaQuery.execute(AbstractJpaQuery.java:106)
Here are the snippets of my code:
@Component
public class BulkLoadingService {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
@Autowired
private LoadingWorkerFactory loadingWorkerFactory;
@Autowired
@Qualifier("bulkTaskExecutor")
private TaskExecutor taskExecutor;
@Autowired
ModelMapper modelMapper;
@Transactional(readOnly = true)
public void loadAllData() {
LOG.info("loadAllData() started.");
LoadingWorker loadingWorker = loadingWorkerFactory.getLoadingWorker();
taskExecutor.execute(loadingWorker);
LOG.info("loadAllData() finished.");
}
}
-
@Configuration
public class BulkFrameworkConfiguration {
public static final int NTHREADS = 10;
@Bean
@Qualifier("bulkTaskExecutor")
public TaskExecutor threadPoolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(4);
executor.setMaxPoolSize(NTHREADS);
executor.setThreadNamePrefix("bulk_task_executor_thread");
executor.initialize();
return executor;
}
}
-
import org.modelmapper.ModelMapper;
import org.springframework.messaging.MessageChannel;
@Autowired
private MessageChannel actionBulkOutboundChannel;
@Autowired
private ActionRepository actionRepository;
@Component
public class LoadingWorkerFactory {
@Autowired
ModelMapper modelMapper;
public LoadingWorker getLoadingWorker() {
return new LoadingWorker(actionRepository, channel, modelMapper);
}
}
-
public class LoadingWorker implements Runnable {
private static final Logger LOG = LoggerFactory.getLogger(MethodHandles.lookup().lookupClass());
private ModelMapper modelMapper;
private ActionRepository dataRepository;
private MessageChannel outboundChannel;
public LoadingWorker(ActionRepository repository, MessageChannel channel, ModelMapper modelMapper) {
this.modelMapper = modelMapper;
this.dataRepository = repository;
this.outboundChannel = channel;
}
@Override
@Transactional(readOnly = true)
public void run() {
LOG.info("LoadingWorker.run() started.");
long startTime = System.currentTimeMillis();
long counter = 0;
try(Stream<ActionEntity> entityStream = dataRepository.getAll()) {
counter = entityStream.peek(entity -> {
ActionDocument ad = modelMapper.map(entity, ActionDocument.class);
LOG.debug("About to build a message '{}'", ad);
Message<ActionDocument> message = MessageBuilder.withPayload(ad).build();
try {
outboundChannel.send(message);
} catch (MessagingException me) {
LOG.error("Exception encountered while writing request message to queue: {}", me.getRootCause());
LOG.debug("Exception encountered while writing request message to queue", me);
} catch (Exception e) {
LOG.error("Some exception encountered while writing request message to queue", e);
}
}).count();
}
LOG.info("LoadingWorker.run() finished: {} Documents ({} ms)", counter, System.currentTimeMillis() - startTime);
}
protected Stream<ActionEntity> getEntityStream() {
return dataRepository.getAll();
}
}
-
@Repository
public interface ActionRepository extends JpaRepository<ActionEntity, UUID> {
@Query("SELECT oa FROM OfficeAction oa ORDER By oa.id")
Stream<OfficeAction> getAll();
...
-
@Entity
@Table(
name = "oa_office_action")
public class ActionEntity implements Serializable {
...
In a nutshell, BulkLoadingService uses LoadingWorkerFactory to create a new worker (LoadingWorker implements Runnable) and uses TaskExecutor to run this worker. LoadingWorker tries to access ActionRepository to get a Stream of all content, and this is when things are breaking up.
How is it possible in Spring to make a method (a method that does not belong to a @Bean or @Component) which is running in another Thread @Transactional? Is declarative transaction support impossible in my case?
P.S. I was able to get this to work by using TransactionTemplate directly (rather than trying to use @Transactional annotation). Here is what different:
public class LoadingWorker implements Runnable {
...
private PlatformTransactionManager transactionManager;
...
@Override
public void run() {
LOG.info("LoadingWorker.run() started.");
TransactionTemplate transactionTemplate = new TransactionTemplate(transactionManager);
transactionTemplate.setReadOnly(true);
transactionTemplate.execute(status -> {
LOG.info("Anonymous TransactionCallback started.");
inner();
return null;
});
}
protected void inner() {
long startTime = System.currentTimeMillis();
...
Although I am still wondering if this can be done using annotations.
Upvotes: 2
Views: 2836
Reputation: 3416
Without restructuring the whole part of the application. I'd say as you are already passing beans to the Runnable
implementation, you can pass TransactionTemplate and execute your code within a transaction with it.
You have to use the execute
method and put the implementation inside of it.
Upvotes: 4