Reputation: 21
I have a Java application that reads data from a CSV file, turns them into business objects, and then saves them to a db. Most of the work is done single-threaded, but some of it is done using multiple threads. The application uses Spring framework, Hibernate as the ORM provider, c3p0 as the connection pool provider, and MySQL as the database. The parts of the application which are multi-threaded are annotated with @Async, using a ThreadPoolTaskExecutor created as a bean for the TaskExecutor that will do this work. The @Aysnc annotated methods return CompletableFuture.
With a small CSV file (under 10k records) everything is fine. However, some of our clients have much larger files (150k records) and this increase in load produces issues during the multi-threaded portions of the application. Here is one of the @Async annotated methods:
public class AppointmentTreeDAO {
@Autowired
private AppointmentRepository appointmentRepository;
private static final Logger logger = LoggerFactory.getLogger(AppointmentTreeDAO.class);
@Async
public CompletableFuture<Appointment> getObjectTree(Long oldApptId) {
logger.info("getting appointment thread");
return CompletableFuture.completedFuture(appointmentRepository.findByApptId(oldApptId));
}
}
This is the AppointmentRepository class (the last method is the one the code in question is calling):
public interface AppointmentRepository extends JpaRepository<Appointment, Long> {
List<Appointment> findByAccountId(String accountId);
List<Appointment> findByAccountIdAndProductIdAndApptStartDatetimeBetween(String accountId,
String productId, Date startDate, Date endDte);
Slice<Appointment> findByAccountIdAndProductIdAndApptStartDatetimeBetween(String accountId,
String productId, Date startDate, Date endDte, Pageable pageable);
@Query(value = "select a.appt_id from appointment a where a.appt_start_datetime " +
"between :startDate AND :endDate and a.account_id = :accountId and a.product_id = :productId",
nativeQuery = true)
List<Long> findByApptIdBetween(@Param("startDate") String startDate, @Param("endDate") String endDate,
@Param("accountId") String accountId, @Param("productId") String productId);
Appointment findByApptId(Long apptId);
}
Since I have that log line in the AppointmentTreeDAO class, I am able to see in the logs that each thread is doing work:
2:58:12.771 PM
2021-11-11 14:58:12.771 INFO 1 --- [mtSave-38] c.t.etl.repository.AppointmentTreeDAO : getting appointment thread
Eventually, I start seeing exceptions like this from the threads (same thread as above in this case):
2:58:16.790 PM
2021-11-11 14:58:16.790 WARN 1 --- [mtSave-38] o.h.engine.jdbc.spi.SqlExceptionHelper : SQL Error: 0, SQLState: 08001
2:58:16.803 PM
2021-11-11 14:58:16.803 ERROR 1 --- [mtSave-38] o.h.engine.jdbc.spi.SqlExceptionHelper : Could not create connection to database server. Attempted reconnect 3 times. Giving up.
2:58:16.804 PM
2021-11-11 14:58:16.804 WARN 1 --- [mtSave-38] c.t.etl.pipeline.load.Loader : java.util.concurrent.CompletionException: org.springframework.dao.DataAccessResourceFailureException: Unable to acquire JDBC Connection; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.lambda$doSubmit$3(AsyncExecutionAspectSupport.java:279)
at java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.springframework.dao.DataAccessResourceFailureException: Unable to acquire JDBC Connection; nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
at org.springframework.orm.jpa.vendor.HibernateJpaDialect.convertHibernateAccessException(HibernateJpaDialect.java:255)
at org.springframework.orm.jpa.vendor.HibernateJpaDialect.translateExceptionIfPossible(HibernateJpaDialect.java:233)
at org.springframework.orm.jpa.AbstractEntityManagerFactoryBean.translateExceptionIfPossible(AbstractEntityManagerFactoryBean.java:551)
at org.springframework.dao.support.ChainedPersistenceExceptionTranslator.translateExceptionIfPossible(ChainedPersistenceExceptionTranslator.java:61)
at org.springframework.dao.support.DataAccessUtils.translateIfNecessary(DataAccessUtils.java:242)
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:152)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.data.jpa.repository.support.CrudMethodMetadataPostProcessor$CrudMethodMetadataPopulatingMethodInterceptor.invoke(CrudMethodMetadataPostProcessor.java:145)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.interceptor.ExposeInvocationInterceptor.invoke(ExposeInvocationInterceptor.java:97)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:215)
at com.sun.proxy.$Proxy148.findByApptId(Unknown Source)
at com.etl.repository.AppointmentTreeDAO.getObjectTree(AppointmentTreeDAO.java:25)
at com.etl.repository.AppointmentTreeDAO$$FastClassBySpringCGLIB$$fb95149d.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invoke(MethodProxy.java:218)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.invokeJoinpoint(CglibAopProxy.java:779)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:163)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.aop.framework.CglibAopProxy$CglibMethodInvocation.proceed(CglibAopProxy.java:750)
at org.springframework.aop.interceptor.AsyncExecutionInterceptor.lambda$invoke$0(AsyncExecutionInterceptor.java:115)
at org.springframework.aop.interceptor.AsyncExecutionAspectSupport.lambda$doSubmit$3(AsyncExecutionAspectSupport.java:276)
... 4 more
Caused by: org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
at org.hibernate.exception.internal.SQLExceptionTypeDelegate.convert(SQLExceptionTypeDelegate.java:48)
at org.hibernate.exception.internal.StandardSQLExceptionConverter.convert(StandardSQLExceptionConverter.java:42)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:113)
at org.hibernate.engine.jdbc.spi.SqlExceptionHelper.convert(SqlExceptionHelper.java:99)
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:111)
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.getPhysicalConnection(LogicalConnectionManagedImpl.java:138)
at org.hibernate.engine.jdbc.internal.StatementPreparerImpl.connection(StatementPreparerImpl.java:50)
at org.hibernate.engine.jdbc.internal.StatementPreparerImpl$5.doPrepare(StatementPreparerImpl.java:149)
at org.hibernate.engine.jdbc.internal.StatementPreparerImpl$StatementPreparationTemplate.prepareStatement(StatementPreparerImpl.java:176)
at org.hibernate.engine.jdbc.internal.StatementPreparerImpl.prepareQueryStatement(StatementPreparerImpl.java:151)
at org.hibernate.loader.Loader.prepareQueryStatement(Loader.java:2103)
at org.hibernate.loader.Loader.executeQueryStatement(Loader.java:2040)
at org.hibernate.loader.Loader.executeQueryStatement(Loader.java:2018)
at org.hibernate.loader.Loader.doQuery(Loader.java:948)
at org.hibernate.loader.Loader.doQueryAndInitializeNonLazyCollections(Loader.java:349)
at org.hibernate.loader.Loader.doList(Loader.java:2849)
at org.hibernate.loader.Loader.doList(Loader.java:2831)
at org.hibernate.loader.Loader.listIgnoreQueryCache(Loader.java:2663)
at org.hibernate.loader.Loader.list(Loader.java:2658)
at org.hibernate.loader.hql.QueryLoader.list(QueryLoader.java:506)
at org.hibernate.hql.internal.ast.QueryTranslatorImpl.list(QueryTranslatorImpl.java:400)
at org.hibernate.engine.query.spi.HQLQueryPlan.performList(HQLQueryPlan.java:219)
at org.hibernate.internal.SessionImpl.list(SessionImpl.java:1414)
at org.hibernate.query.internal.AbstractProducedQuery.doList(AbstractProducedQuery.java:1625)
at org.hibernate.query.internal.AbstractProducedQuery.list(AbstractProducedQuery.java:1593)
at org.hibernate.query.internal.AbstractProducedQuery.getSingleResult(AbstractProducedQuery.java:1641)
at org.hibernate.query.criteria.internal.compile.CriteriaQueryTypeQueryAdapter.getSingleResult(CriteriaQueryTypeQueryAdapter.java:111)
at org.springframework.data.jpa.repository.query.JpaQueryExecution$SingleEntityExecution.doExecute(JpaQueryExecution.java:196)
at org.springframework.data.jpa.repository.query.JpaQueryExecution.execute(JpaQueryExecution.java:88)
at org.springframework.data.jpa.repository.query.AbstractJpaQuery.doExecute(AbstractJpaQuery.java:155)
at org.springframework.data.jpa.repository.query.AbstractJpaQuery.execute(AbstractJpaQuery.java:143)
at org.springframework.data.repository.core.support.RepositoryMethodInvoker.doInvoke(RepositoryMethodInvoker.java:137)
at org.springframework.data.repository.core.support.RepositoryMethodInvoker.invoke(RepositoryMethodInvoker.java:121)
at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.doInvoke(QueryExecutorMethodInterceptor.java:152)
at org.springframework.data.repository.core.support.QueryExecutorMethodInterceptor.invoke(QueryExecutorMethodInterceptor.java:131)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.data.projection.DefaultMethodInvokingMethodInterceptor.invoke(DefaultMethodInvokingMethodInterceptor.java:80)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.transaction.interceptor.TransactionInterceptor$1.proceedWithInvocation(TransactionInterceptor.java:123)
at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:388)
at org.springframework.transaction.interceptor.TransactionInterceptor.invoke(TransactionInterceptor.java:119)
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:186)
at org.springframework.dao.support.PersistenceExceptionTranslationInterceptor.invoke(PersistenceExceptionTranslationInterceptor.java:137)
... 22 more
Caused by: java.sql.SQLNonTransientConnectionException: Could not create connection to database server. Attempted reconnect 3 times. Giving up.
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:110)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:73)
at com.mysql.cj.jdbc.ConnectionImpl.connectWithRetries(ConnectionImpl.java:903)
at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:828)
at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:453)
at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:246)
at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:198)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at org.springframework.jdbc.datasource.DriverManagerDataSource.getConnectionFromDriverManager(DriverManagerDataSource.java:155)
at org.springframework.jdbc.datasource.DriverManagerDataSource.getConnectionFromDriver(DriverManagerDataSource.java:146)
at org.springframework.jdbc.datasource.AbstractDriverBasedDataSource.getConnectionFromDriver(AbstractDriverBasedDataSource.java:205)
at org.springframework.jdbc.datasource.AbstractDriverBasedDataSource.getConnection(AbstractDriverBasedDataSource.java:169)
at org.hibernate.engine.jdbc.connections.internal.DatasourceConnectionProviderImpl.getConnection(DatasourceConnectionProviderImpl.java:122)
at org.hibernate.internal.NonContextualJdbcConnectionAccess.obtainConnection(NonContextualJdbcConnectionAccess.java:38)
at org.hibernate.resource.jdbc.internal.LogicalConnectionManagedImpl.acquireConnectionIfNeeded(LogicalConnectionManagedImpl.java:108)
... 60 more
Caused by: com.mysql.cj.exceptions.CJCommunicationsException: Communications link failure
The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.
at sun.reflect.GeneratedConstructorAccessor129.newInstance(Unknown Source)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:61)
at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:105)
at com.mysql.cj.exceptions.ExceptionFactory.createException(ExceptionFactory.java:151)
at com.mysql.cj.exceptions.ExceptionFactory.createCommunicationsException(ExceptionFactory.java:167)
at com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:89)
at com.mysql.cj.NativeSession.connect(NativeSession.java:144)
at com.mysql.cj.jdbc.ConnectionImpl.connectWithRetries(ConnectionImpl.java:847)
... 73 more
Caused by: java.net.NoRouteToHostException: Cannot assign requested address (Address not available)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
at java.net.Socket.connect(Socket.java:589)
at com.mysql.cj.protocol.StandardSocketFactory.connect(StandardSocketFactory.java:155)
at com.mysql.cj.protocol.a.NativeSocketConnection.connect(NativeSocketConnection.java:63)
... 75 more
2:58:16.804 PM
2021-11-11 14:58:16.804 WARN 1 --- [mtSave-38] c.t.etl.pipeline.load.Loader : Thread 77 ProcessFutures Loop Exception: org.springframework.dao.DataAccessResourceFailureException: Unable to acquire JDBC Connection;
nested exception is org.hibernate.exception.JDBCConnectionException: Unable to acquire JDBC Connection
Soon after throwing the above exception it starts doing work again and its now able to call the db when it couldn't just a short time ago:
2:58:16.807 PM
2021-11-11 14:58:16.807 INFO 1 --- [mtSave-38] c.t.etl.repository.AppointmentTreeDAO : getting appointment thread
This happens intermittently with many different threads, if not all. But eventually all the work is complete and all futures are returned from that method. Then the application returns to being single threaded and the exceptions stop. Once the application returns to using multiple threads the same exceptions as above occur again.
This is my persistence.xml file:
<properties>
<property name="javax.persistence.jdbc.driver" value="com.mysql.jdbc.Driver" />
<property name="hibernate.dialect" value="org.hibernate.dialect.MySQLDialect" />
<property name="hibernate.show_sql" value="false" />
<property name="hibernate.id.new_generator_mappings" value="false" />
<property name="org.hibernate.flushMode" value="ALWAYS" />
<property name="hibernate.c3p0.acquire_increment" value="1" />
<property name="hibernate.c3p0.idle_test_period" value="60"/>
<property name="hibernate.c3p0.max_size" value="54"/>
<property name="hibernate.c3p0.max_statements" value="50"/>
<property name="hibernate.c3p0.min_size" value="5"/>
</properties>
This is how the TaskExecutor is created:
public Executor taskExecutor() {
logger.info("Creating Async Task Executor");
final ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(45);
executor.setMaxPoolSize(45);
executor.setThreadNamePrefix("mtSave-");
executor.initialize();
executor.setKeepAliveSeconds(120);
return executor;
}
I have autoReconnect=true&maxReconnects=10 on my db connection string.
It seems like the threads are being starved for a connection, but I don't understand why. There will always be more work items than there are threads. 150k records of work compared to 45 threads on the high end, and under 10k records of work on the low end. The application never threw these exceptions until multi threading was introduced. I have the connection pool max size (54) set higher than the number of threads I have (45).
Ive tried changing the number of threads, the max size of the connection pool, different size files and did tons of research. Please advise.
Upvotes: 2
Views: 922
Reputation: 108796
Java / JDBC connections aren't thread safe so each thread creates its own connections.
And, MySQL servers limit the number of simultaneous connections they allow. That's a hard limit set at MySQL server startup time. So it's possible your JDBC code in all your threads is attempting to make more connections than the server allows, and so the server rejects them. The app responds with the exceptions you saw.
org.springframework.dao.DataAccessResourceFailureException: Unable to acquire JDBC Connection;
There's another factor to keep in mind. Concurrent queries to the same table (INSERTs or anything else) can contend with one another. Paradoxically, you may get much better throughput with a much smaller number of threads. Try four threads.
If you give the command SHOW GLOBAL STATUS LIKE 'Connection%'
you'll see a count of connection errors. Give the command before and after you run your program and take the difference between the values. The values look like this.
Connection_errors_accept errors that occurred during calls to accept() on the listening port.
Connection_errors_internal connections refused due to internal errors in the server, such as failure to start a new thread or an out-of-memory condition.
Connection_errors_max_connections connections refused because the server max_connections limit was reached.
Connection_errors_peer_address errors while searching for connecting client IP addresses.
Connection_errors_select errors during calls to OS select() or poll() on the listening port. (Failure of this operation does not necessarily means a client connection was rejected.)
Connection_errors_tcpwrap connections refused by the libwrap library.
Connections connection attempts (successful or not) to the MySQL server.
Upvotes: 1