PeMa
PeMa

Reputation: 1716

Transaction handling when wrapping Stream into Flux

I really have issues understanding what's going on behind the sences when manually wrapping Stream received as a query result from spring data jpa into a Flux.

Consider the following:

Entity:

@NoArgsConstructor
@AllArgsConstructor
@Data
@Entity
public class TestEntity {
    @Id
    private Integer a;
    private Integer b;
}

Repository:

public interface TestEntityRepository extends JpaRepository<TestEntity, Integer> {
    Stream<TestEntity> findByBBetween(int b1, int b2);
}

Simple test code:

@Test
@SneakyThrows
@Transactional
public void dbStreamToFluxTest() {
    testEntityRepository.save(new TestEntity(2, 6));
    testEntityRepository.save(new TestEntity(3, 8));
    testEntityRepository.save(new TestEntity(4, 10));

    testEntityFlux(testEntityStream()).subscribe(System.out::println);
    testEntityFlux().subscribe(System.out::println);
    Thread.sleep(200);
}

private Flux<TestEntity> testEntityFlux() {
    return fromStream(this::testEntityStream);
}

private Flux<TestEntity> testEntityFlux(Stream<TestEntity> testEntityStream) {
    return fromStream(() -> testEntityStream);
}

private Stream<TestEntity> testEntityStream() {
    return testEntityRepository.findByBBetween(1, 9);
}

static <T> Flux<T> fromStream(final Supplier<Stream<? extends T>> streamSupplier) {
    return Flux
            .defer(() -> Flux.fromStream(streamSupplier))
            .subscribeOn(Schedulers.elastic());
}

Questions:

  1. Is this the correct way to do what I do, especially regarding the static fromStream method?
  2. While the call to testEntityFlux(testEntityStream()) does what I expect, for reasons I really don't understand, the call to testEntityFlux() runs into an error:

reactor.core.Exceptions$ErrorCallbackNotImplemented: org.springframework.dao.InvalidDataAccessApiUsageException: You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction. Caused by: org.springframework.dao.InvalidDataAccessApiUsageException: You're trying to execute a streaming query method without a surrounding transaction that keeps the connection open so that the Stream can actually be consumed. Make sure the code consuming the stream uses @Transactional or any other way of declaring a (read-only) transaction.

... what usually happens when I forget the @Transactional, which I didn't.

EDIT

Note: The code was inspired by: https://github.com/chang-chao/spring-webflux-reactive-jdbc-sample/blob/master/src/main/java/me/changchao/spring/springwebfluxasyncjdbcsample/service/CityServiceImpl.java which in turn was inspired by https://spring.io/blog/2016/07/20/notes-on-reactive-programming-part-iii-a-simple-http-server-application. However, the Mono version has the same "issue".

EDIT 2

An example using optional, note that in testEntityMono() replacing testEntityOptional() with testEntityOptionalManual() leads to working code. Thus it all seems to be directly related to how jpa does the data fetching:

@SneakyThrows
@Transactional
public void dbOptionalToMonoTest() {
    testEntityRepository.save(new TestEntity(2, 6));
    testEntityRepository.save(new TestEntity(3, 8));
    testEntityRepository.save(new TestEntity(4, 10));

    testEntityMono(testEntityOptional()).subscribe(System.out::println);
    testEntityMono().subscribe(System.out::println);

    Thread.sleep(1200);
}

private Mono<TestEntity> testEntityMono() {
    return fromSingle(() -> testEntityOptional().get());
}

private Mono<TestEntity> testEntityMono(Optional<TestEntity> testEntity) {
    return fromSingle(() -> testEntity.get());
}

private Optional<TestEntity> testEntityOptional() {
    return testEntityRepository.findById(4);
}

@SneakyThrows
private Optional<TestEntity> testEntityOptionalManual() {
    Thread.sleep(1000);
    return Optional.of(new TestEntity(20, 20));
}

static <T> Mono<T> fromSingle(final Supplier<T> tSupplier) {
    return Mono
            .defer(() -> Mono.fromSupplier(tSupplier))
            .subscribeOn(Schedulers.elastic());
}

Upvotes: 3

Views: 3957

Answers (2)

mp911de
mp911de

Reputation: 18127

TL;DR:

It boils down to the differences between imperative and reactive programming assumptions and Thread affinity.

Details

We first need to understand what happens with transaction management to understand why your arrangement ends with a failure.

Using a @Transactional method creates a transactional scope for all code within the method. Transactional methods returning scalar values, Stream, collection-like types, or void (basically non-reactive types) are considered imperative transactional methods.

In imperative programming, flows stick to their carrier Thread. The code is expected to remain on the same Thread and not to switch threads. Therefore, transaction management associates transactional state and resources with the carrier Thread in a ThreadLocal storage. As soon as code within a transactional method switches threads (e.g. spinning up a new Thread or using a Thread pool), the unit of work that gets executed on a different Thread leaves the transactional scope and potentially runs in its own transaction. In the worst case, the transaction is left open on an external Thread because there is no transaction manager monitoring entry/exit of the transactional unit of work.

@Transactional methods returning a reactive type (such as Mono or Flux) are subject to reactive transaction management. Reactive transaction management is different from imperative transaction management as the transactional state is attached to a Subscription, specifically the subscriber Context. The context is only available with reactive types, not with scalar types as there are no means to attach data to void or a String.

Looking at the code:

@Test
@Transactional
public void dbStreamToFluxTest() {
    // …
}

we see that this method is a @Transactional test method. Here we have two things to consider:

  1. The method returns void so it is subject to imperative transaction management associating the transactional state with a ThreadLocal.
  2. There's no reactive transaction support for @Test methods because typically a Publisher is expected to be returned from the method, and by doing so, there would be no way to assert the outcome of the stream.

    @Test
    @Transactional
    public Publisher<Object> thisDoesNotWork() {
        return myRepository.findAll(); // Where did my assertions go?
    }
    

Let's take a closer look at the fromStream(…) method:

static <T> Flux<T> fromStream(final Supplier<Stream<? extends T>> streamSupplier) {
    return Flux
        .defer(() -> Flux.fromStream(streamSupplier))
        .subscribeOn(Schedulers.elastic());
}

The code accepts a Supplier that returns a Stream. Next, subscription (subscribe(…), request(…)) signals are instructed to happen on the elastic Scheduler which effectively switches on which Thread the Stream gets created and consumed. Therefore, subscribeOn causes the Stream creation (call to findByBBetween(…)) to happen on a different Thread than your carrier Thread.

Removing subscribeOn(…) will fix your issue.

There is a bit more to explain why you want to refrain from using reactive types with JPA. Reactive programming has no strong Thread affinity. Thread switching may occur at any time. Depending on how you use the resulting Flux and how you have designed your entities, you might experience visibility issues as entities are passed across threads. Ideally, data in a reactive context remains immutable. Such an approach does not always comply with JPA rules.

Another aspect is lazy loading. By using JPA entities from threads other than the carrier Thread, the entity may not be able to correlate its context back to the JPA Transaction. You can easily run into LazyInitializationException without being aware of why this is as Thread switching can be opaque to you.

The recommendation is: Do not use reactive types with JPA or any other transactional resources. Stay with Java 8 Stream instead.

Upvotes: 13

JB Nizet
JB Nizet

Reputation: 692121

The Stream returned by the repository is lazy. It uses the connection to the database in order to get the rows when the stream is being consumed by a terminal operation.

The connection is bound to the current transaction, and the current transaction is stored in a ThreadLocal variable, i.e. is bound to the thread that is eecuting your test method.

But the consumption of the stream is done on a separate thread, belonging to the thread pool used by the elastic scheduler of Reactor. So you create the lazy stream on the main thread, which has the transaction bound to it, but you consume the stream on a separate thread, which doesn't have the transaction bound to it.

Don't use reactor with JPA transactions and entities. They're incompatible.

Upvotes: 1

Related Questions