Reputation: 439
I am just betting there is something fundamental I am missing about this but for the life of me I have not been able to figure it out. I am attempting to use Spring Integration to receive from an amqp:inbound-channel-adaptor, which ends up calling a service that invokes an Oracle stored procedure using "jdbcTemplate.getDataSource().getConnection().prepareStatement(sql).execute();"
Have all this running through a Spring Boot application that includes the spring-boot-starter-web, so it pulls in embedded tomcat and my Spring Boot app also includes spring-boot-starter-amqp, spring-boot-starter-integration, spring-boot-starter-jdbc so to let Spring Boot do all the magic for me... SB ROCKS!
I did have to also specifically include spring-integration-amqp in my pom.xml file in order to use XML for some reason:
<!-- Required in order to expose amqp xml schema namespace -->
<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-amqp</artifactId>
</dependency>
Everything appears to work fine; however, it appears that the channel is not letting go of the JDBC connection. I eventually get a org.apache.tomcat.jdbc.pool.PoolExhaustedException. This happens as soon as the channel has serviced 'spring.datasource.max-active' messages from the queue. I confirmed this by playing with that property and it always happens at whatever number I set it to. I log the service method called in the service-activator after it is done with the db call so I know they complete but the db connection apparently never gets handed back to the pool... Any ideas? Here is the relevant snippets from my project:
application.properties:
spring.datasource.url = jdbc:oracle:thin:@[myserver].com:1521:[SID]
spring.datasource.username = scrubbed
spring.datasource.password = scrubbed
spring.datasource.driver-class-name = oracle.jdbc.driver.OracleDriver
spring.datasource.max-active=50
...
spring.rabbitmq.port = 5672
spring.rabbitmq.addresses = clusered.server1, clusered.server1
spring.rabbitmq.username = scrubbed
spring.rabbitmq.password = scrubbed
Service:
@Autowired
private JdbcTemplate jdbcTemplate;
@Override
public void executeDbJob(String dbProcedureCall) {
log.info("Executing Job id: {} Database Call: {}", jobId, dbProcedureCall);
executeStoredProcedure(dbProcedureCall);
log.info("Finished DB call for job id: {} Database Call: {}", jobId, dbProcedureCall);
}
/**
* This wraps a stored procedure call just as it is, parameter values included into an anonymous
* pl/sql block. This could be a pretty big security issue so we will want to scrub the
* procedure coming in to ensure it doesn't have additional embedded SQL.
*/
private void executeStoredProcedure(final String procedure) {
final String sql = "begin ".concat(procedure).concat(" end;");
try {
jdbcTemplate.getDataSource().getConnection().prepareStatement(sql).execute();
} catch (SQLException e) {
log.error("Could not execute procedure call: {} raised: {}", sql, e);
}
}
XML:
<?xml version="1.0" encoding="UTF-8"?>
<beans:beans xmlns="http://www.springframework.org/schema/integration"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:stream="http://www.springframework.org/schema/integration/stream"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:beans="http://www.springframework.org/schema/beans"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:util="http://www.springframework.org/schema/util"
xmlns:amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/util http://www.springframework.org/schema/util/spring-util.xsd
http://www.springframework.org/schema/rabbit http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/integration/amqp http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd"
default-lazy-init="false">
<beans:description>Recommendation Engine provider account event flow</beans:description>
<!-- ********** Consumer flows ********** -->
<!-- Note: The rabbitConnectionFactory is setup automagically just like rabbitTemplate by spring-boot-starter-amqp -->
<amqp:inbound-channel-adapter id="initiateJobRequest" channel="initiateJobRequestChannel"
message-converter="jsonMessageConverter" queue-names="bc.initiate" connection-factory="rabbitConnectionFactory"
auto-startup="true" concurrent-consumers="1" acknowledge-mode="AUTO" error-handler="loggingErrorHandler" task-executor="amqpClientExecutor"/>
<!-- Add queue capacity to make channel pollable -->
<channel id="initiateJobRequestChannel">
<queue capacity="32" />
<interceptors>
<wire-tap channel="amqpDebugLogger" />
</interceptors>
</channel>
<service-activator id="initiateJobRequestExtractor" input-channel="initiateJobRequestChannel"
output-channel="nullChannel" ref="jobExecutorService" method="executeDbJob">
<poller ref="initiateJobPoller" />
</service-activator>
<!-- ********** Loggers ********** -->
<logging-channel-adapter id="amqpDebugLogger" level="DEBUG" auto-startup="true"
log-full-message="true" />
<!-- ********** Executors ********** -->
<task:executor id="amqpClientExecutor" pool-size="8" queue-capacity="0" rejection-policy="CALLER_RUNS" />
<poller id="initiateJobPoller" task-executor="initiateJobTaskExecutor" fixed-rate="50" receive-timeout="1000" />
<task:executor id="initiateJobTaskExecutor" pool-size="16" queue-capacity="0" rejection-policy="CALLER_RUNS" />
</beans:beans>
Adding the exception blah in case it is helpful:
org.apache.tomcat.jdbc.pool.PoolExhaustedException: [initiateJobTaskExecutor-5] Timeout: Pool empty. Unable to fetch a connection in 30 seconds, none available[size:50; busy:50; idle:0; lastwait:30000].
at org.apache.tomcat.jdbc.pool.ConnectionPool.borrowConnection(ConnectionPool.java:672)
at org.apache.tomcat.jdbc.pool.ConnectionPool.getConnection(ConnectionPool.java:186)
at org.apache.tomcat.jdbc.pool.DataSourceProxy.getConnection(DataSourceProxy.java:127)
at com.everbridge.bce.jobexecutor.service.impl.JobExecutorServiceImpl.executeStoredProcedure(JobExecutorServiceImpl.java:90)
at com.everbridge.bce.jobexecutor.service.impl.JobExecutorServiceImpl.executeDbJob(JobExecutorServiceImpl.java:55)
at sun.reflect.GeneratedMethodAccessor58.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.springframework.expression.spel.support.ReflectiveMethodExecutor.execute(ReflectiveMethodExecutor.java:112)
at org.springframework.expression.spel.ast.MethodReference.getValueInternal(MethodReference.java:102)
at org.springframework.expression.spel.ast.MethodReference.access$000(MethodReference.java:49)
at org.springframework.expression.spel.ast.MethodReference$MethodValueRef.getValue(MethodReference.java:342)
at org.springframework.expression.spel.ast.CompoundExpression.getValueInternal(CompoundExpression.java:88)
at org.springframework.expression.spel.ast.SpelNodeImpl.getTypedValue(SpelNodeImpl.java:131)
at org.springframework.expression.spel.standard.SpelExpression.getValue(SpelExpression.java:330)
at org.springframework.integration.util.AbstractExpressionEvaluator.evaluateExpression(AbstractExpressionEvaluator.java:164)
at org.springframework.integration.util.MessagingMethodInvokerHelper.processInternal(MessagingMethodInvokerHelper.java:276)
at org.springframework.integration.util.MessagingMethodInvokerHelper.process(MessagingMethodInvokerHelper.java:142)
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:75)
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:71)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:99)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:78)
at org.springframework.integration.endpoint.PollingConsumer.handleMessage(PollingConsumer.java:74)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:219)
at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:55)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:149)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:146)
at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:298)
at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:52)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:745)
Upvotes: 2
Views: 4650
Reputation: 439
I figured this one out... Kudos go out to Pavel Horal for his response to Mark's question about "C3PO connection pooling - connections not being released" (and Mark for asking it), which gave me a clue on what was my issue and how to fix it. Turns out it is because I am asking for a database connection from JdbcTemplate (jdbcTemplate.getDataSource().getConnection().prepareStatement(sql).execute()
); I replaced it with jdbcTemplate.update(sql);
and it worked!
Upvotes: 1