DMaxter
DMaxter

Reputation: 178

Spring Boot Async ResultSet is closed

I'm implementing notifications using Spring Boot and notifying users in a different thread using @Async.

Without this annotation, everything works well, but for when I put it on the method I use to notify, in only one observable entity, the observers don't get notified and I get this stack trace:

Unexpected exception occurred invoking async method: public void pt.ulisboa.tecnico.socialsoftware.tutor.notifications.NotificationServic
e.notifyObservers(package.notifications.Observable,package.notifications.domain.Notification,ppackage.user.User)                                
                                                                                                                                                                                                                                               
org.hibernate.exception.GenericJDBCException: could not initialize a collection: [package.course.CourseExecution.users#11]
        at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:47) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]
        at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:113) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]
        at org.hibernate.loader.collection.plan.AbstractLoadPlanBasedCollectionInitializer.initialize(AbstractLoadPlanBasedCollectionInitializer.java:97) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]
        at org.hibernate.persister.collection.AbstractCollectionPersister.initialize(AbstractCollectionPersister.java:707) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]
        at org.hibernate.event.internal.DefaultInitializeCollectionEventListener.onInitializeCollection(DefaultInitializeCollectionEventListener.java:76) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]
        at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:108) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]                                                                          at org.hibernate.internal.SessionImpl.initializeCollection(SessionImpl.java:2145) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]
        at org.hibernate.collection.internal.AbstractPersistentCollection$4.doWork(AbstractPersistentCollection.java:589) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]
        at org.hibernate.collection.internal.AbstractPersistentCollection.withTemporarySessionIfNeeded(AbstractPersistentCollection.java:264) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]
        at org.hibernate.collection.internal.AbstractPersistentCollection.initialize(AbstractPersistentCollection.java:585) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]
        at org.hibernate.collection.internal.AbstractPersistentCollection.read(AbstractPersistentCollection.java:149) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]
        at org.hibernate.collection.internal.PersistentSet.toString(PersistentSet.java:327) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]
        at java.base/java.lang.String.valueOf(String.java:2951) ~[na:na]
        at java.base/java.io.PrintStream.println(PrintStream.java:897) ~[na:na]
        at package.course.CourseExecution.Notify(CourseExecution.java:210) ~[classes/:na]
        at package.notifications.NotificationService.notifyObservers(NotificationService.java:82) ~[classes/:na]
        at package.notifications.NotificationService$$FastClassBySpringCGLIB$$d43e740c.invoke(<generated>) ~[classes/:na]
        at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218) ~[spring-core-5.2.2.RELEASE.jar:5.2.2.RELEASE]
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:769) ~[spring-aop-5.2.2.RELEASE.jar:5.2.2.RELEASE]
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163) ~[spring-aop-5.2.2.RELEASE.jar:5.2.2.RELEASE]                                                                             
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747) ~[spring-aop-5.2.2.RELEASE.jar:5.2.2.RELEASE]                                                                                 
        at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:366) ~[spring-tx-5.2.2.RELEASE.jar:5.2.2.RELEASE]                                                        
        at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:99) ~[spring-tx-5.2.2.RELEASE.jar:5.2.2.RELEASE]                                                                              
        at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186) ~[spring-aop-5.2.2.RELEASE.jar:5.2.2.RELEASE]                                                                             
        at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:747) ~[spring-aop-5.2.2.RELEASE.jar:5.2.2.RELEASE]                                                                                         at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115) ~[spring-aop-5.2.2.RELEASE.jar:5.2.2.RELEASE]                                               
        at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) ~[na:na]                                                                                                                                                         
        at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]                                                                                                                                                                            
Caused by: org.postgresql.util.PSQLException: This statement has been closed.                                                                                                                                                                  
        at org.postgresql.jdbc.PgStatement.checkClosed(PgStatement.java:705) ~[postgresql-42.2.8.jar:42.2.8]                                                                                                                                   
        at org.postgresql.jdbc.PgPreparedStatement.setInt(PgPreparedStatement.java:270) ~[postgresql-42.2.8.jar:42.2.8]                                                                                                                        
        at com.zaxxer.hikari.pool.HikariProxyPreparedStatement.setInt(HikariProxyPreparedStatement.java) ~[HikariCP-3.4.1.jar:na]                                                                                                              
        at org.hibernate.type.descriptor.sql.IntegerTypeDescriptor$1.doBind(IntegerTypeDescriptor.java:46) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]                                                                                               at org.hibernate.type.descriptor.sql.BasicBinder.bind(BasicBinder.java:73) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]                                                                                                                       at org.hibernate.type.AbstractStandardBasicType.nullSafeSet(AbstractStandardBasicType.java:276) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]                                                                                                  at org.hibernate.type.AbstractStandardBasicType.nullSafeSet(AbstractStandardBasicType.java:271) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]                                                                                                  at org.hibernate.loader.plan.exec.internal.AbstractLoadPlanBasedLoader.bindPositionalParameters(AbstractLoadPlanBasedLoader.java:320) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]                                                    
        at org.hibernate.loader.plan.exec.internal.AbstractLoadPlanBasedLoader.bindParameterValues(AbstractLoadPlanBasedLoader.java:291) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]                                                         
        at org.hibernate.loader.plan.exec.internal.AbstractLoadPlanBasedLoader.prepareQueryStatement(AbstractLoadPlanBasedLoader.java:210) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]                                                       
        at org.hibernate.loader.plan.exec.internal.AbstractLoadPlanBasedLoader.executeQueryStatement(AbstractLoadPlanBasedLoader.java:162) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]                                                       
        at org.hibernate.loader.plan.exec.internal.AbstractLoadPlanBasedLoader.executeLoad(AbstractLoadPlanBasedLoader.java:104) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]                                                                 
        at org.hibernate.loader.collection.plan.AbstractLoadPlanBasedCollectionInitializer.initialize(AbstractLoadPlanBasedCollectionInitializer.java:87) ~[hibernate-core-5.4.9.Final.jar:5.4.9.Final]                                        
        ... 25 common frames omitted

The Caused by exception message is always the same, the other one, not always

The methods mentioned in the trace are:

    @Async("notifyExecutor") // Even with the default executor, the error occurs
    @Retryable(
            value = { SQLException.class },
            backoff = @Backoff(delay = 5000))
    @Transactional(isolation = Isolation.REPEATABLE_READ)
    public void notifyObservers(Observable observable, Notification notification, User exclude) {
        observable.Notify(notification, exclude);
    }
    @Retryable(value = { SQLException.class }, backoff = @Backoff(delay = 5000))
    @Transactional(isolation = Isolation.REPEATABLE_READ)
    public AnnouncementDto createAnnouncement(AnnouncementDto announcementDto) {

        checkIfConsistentAnnouncement(announcementDto);

        User user = getTeacher(announcementDto.getUserId());

        CourseExecution courseExecution = getCourseExecution(announcementDto.getCourseExecutionId());

        if (announcementDto.getCreationDate() == null) {
            announcementDto
                    .setCreationDate(LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm")));
        }

        // Announcement has a CourseExecution as attribute
        Announcement announcement = new Announcement(user, courseExecution, announcementDto);
        entityManager.persist(announcement);

        NotificationDto notification = NotificationsCreation.create(ADD_ANNOUNCEMENT_TITLE,
                List.of(announcement.getUser().getName()), ADD_ANNOUNCEMENT_CONTENT,
                List.of(announcement.getTitle(), user.getName()), Notification.Type.ANNOUNCEMENT);

        this.notify(courseExecution, notification, user);

        return new AnnouncementDto(announcement);
    }

    // Calls the Async method
    private void notify(CourseExecution course, NotificationDto notification, User user) {
        notificationService.notifyObservers(course, notificationService.createNotification(notification), user);
    }
    @Override
    public void Notify(Notification notification, User user) {
        for (Observer observer : this.users) { // Error occurs here, doesn't get inside the loop
            if (((User) observer).getId() == user.getId()) {
                continue;
            }

            observer.update(this, notification);
        }
    }

I've seen answers for these, but they don't apply here. What I think is very weird is the fact that this only occurs with Announcement and not with the other observables. Since I want the announcements for all observers in CourseExecution, I made CourseExecution observable and when there is a new announcement we notify all CourseExecution observers.

Can someone help me please?

Upvotes: 6

Views: 2140

Answers (4)

Federico Piazza
Federico Piazza

Reputation: 30995

The problem is that the transaction is not propagated properly when using Async.

It should be working properly with:

    @Async("notifyExecutor")
    @Retryable(
            value = { SQLException.class },
            backoff = @Backoff(delay = 5000))
    @Transactional(propagation = Propagation.REQUIRES_NEW) // Make separate transaction
    public void notifyObservers(Observable observable, Notification notification, User exclude) {
        observable.Notify(notification, exclude);
    }

Upvotes: 2

benbenw
benbenw

Reputation: 753

Transaction as managed by Spring @Transactionnal do not span on several threads

The problem you face here is :

  • CourseExecution has a lazy uninitialized collection (users)
  • the transaction wrapping createAnnouncement end before the notificationService.notifyObservers execution (as it's async)
  • the connection / entitymanager associated with the CourseExecution instance is closed by the end of the createAnnouncement transaction
  • when hibernate try to init the users collection : the connection / statement is closed

It's usually a bad idea to pass managed entities between threads especially if you want to lazy load collections in a different thread

Upvotes: 1

filpa
filpa

Reputation: 3614

I imagine someone on the Spring tracker may be able to answer the question in more technical detail.

As Deadron mentions in his answer, this seems to be an issue with TX Management. @Async makes it so that control flow is immediately returned to the calling function, which (from the point of view of the transaction manager) means the the statement executed successfully and a commit may take place. The method that is marked Async will not have the same transactional context as the caller, even if marked with propagation=REQUIRED (which will just return a new transaction)

@Transactional(isolation = Isolation.REPEATABLE_READ) attempts to check for an existing transaction (as the default propagation level is REQUIRED, which supports an existing transaction if it exists or creates a new one if not). This transaction does exist at the time of the method invocation, but not at the time where the @Async method has to perform any work within the transaction (i.e. binding parameters, as seen in your stacktrace). At that point, accessing that original transaction is not possible since it was local to the original thread.

According to this answer:

under REPEATABLE READ the second SELECT is guaranteed to display at least the rows that were returned from the first SELECT unchanged. New rows may be added by a concurrent transaction in that one minute, but the existing rows cannot be deleted nor changed.

In addition, the following section of the Isolation Javadocs:

/**
 * A constant indicating that dirty reads and non-repeatable reads are
 * prevented; phantom reads can occur. This level prohibits a transaction
 * from reading a row with uncommitted changes in it, and it also prohibits
 * the situation where one transaction reads a row, a second transaction
 * alters the row, and the first transaction rereads the row, getting
 * different values the second time (a "non-repeatable read").
 * @see java.sql.Connection#TRANSACTION_REPEATABLE_READ
 */
REPEATABLE_READ(TransactionDefinition.ISOLATION_REPEATABLE_READ),

would, given the @Transactional and @Async interaction described above, help explain this behaviour, especially if your Observable holds a reference to the User that in turn has some kind of reference to the row (i.e. new transaction is created for the @Async method, but the original transaction has not yet been fully committed). You might try the following:

  1. Try to set the propagation level to MANDATORY. Does the notifyObservers method get invoked successfully, or do you get an error that a Transaction is not active? If you get an error, that would confirm the previous paragraph.
  2. If you do get an error, does the behaviour change when you set the propagation level to REQUIRES_NEW?
  3. What happens if you set the isolation level to the default level?
  4. What happens if you artifically introduce slowdown in your createAnnouncement method, prior to returning? E.g. Thread.sleep(5000), while changing nothing else? Does the error persist? If not, that would lend credence to the timing issue mentioned in the last paragraph in conjunction with REPEATABLE_READ.

Upvotes: 4

Deadron
Deadron

Reputation: 5289

I would wager this is an issue with the db connection and async. The database connection is held in a thread local and an async method is not going to execute in the same thread AND due to potential for parallel execution cant really share the same connection/transaction. Have you tried propogation requires new on your async method?

A more experienced member may be able to clarify the interaction of async and outstanding database connections but it is the likely culprit.

Upvotes: 4

Related Questions