user3465269
user3465269

Reputation: 11

Propagate transaction to Forkjoin submit

I'm create a ForkJoinPool with a number of thread to execute a stream parallel, that is executed from a query in jpa, but I'm have trouble with the Transactional propagate to the method submit of ForkJoinPool.

@Transactional(readOnly = true)
public void streamTest() {
    ForkJoinPool customThreadPool = new ForkJoinPool(20);
    try {
    customThreadPool.submit(() ->
         priceRepository.streamAll()
         .parallel()
         .map(p -> this.transform(p))
         .forEach(System.out::println)
         ).get();
    } catch (InterruptedException | ExecutionException e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
}

I'm getting the error: "You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction."

If I take off the ForkJoinPool to execute the stream, it's works fine. How can I propagate the transaction(readOnly) to the execution of the method submit from ForkJoinPool, is there a way?

Upvotes: 1

Views: 1629

Answers (1)

user3465269
user3465269

Reputation: 11

I discovered how to set transactional inside the task of ForkJoinPool. I only have to use the TransactionSynchronizationManager as I did below.

@Transactional(readOnly = true)
public void streamTest() {
ForkJoinPool customThreadPool = new ForkJoinPool(20);
try {
customThreadPool.submit(() -> {
    TransactionSynchronizationManager.setActualTransactionActive(true);
    TransactionSynchronizationManager.setCurrentTransactionReadOnly(true);
    TransactionSynchronizationManager.initSynchronization();
     priceRepository.streamAll()
     .parallel()
     .map(p -> this.transform(p))
     .forEach(System.out::println);
     }).get();
} catch (InterruptedException | ExecutionException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
}

}

Upvotes: 0

Related Questions