Tan
Tan

Reputation: 1585

Trying to release conncection to avoid maximum client connection error

I am trying to figure out a solution for the following error message without increasing the default connection size from 1000 to 2000 or more.

Recently I came across the following error when around 1000 messages were sent to the broker with a delay of 5 minutes as shown in the code below.

WARN  | Could not accept connection  : Exceeded the maximum number of allowed client connections. See the 'maximumConnections' property on the TCP transport configuration URI in the ActiveMQ configuration file (e.g., activemq.xml) | org.apache.activemq.broker.TransportConnector | ActiveMQ Transport Server Thread Handler: tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600

The following is the code which is listening to ActiveMQ continously and as soon as it sees COMPLETE, it sends out an email to the user after generating a file.Otherwise, it goes inside else block and sends out the message to the broker again.

Inside the else block, I want to test by closing the connection after I am done sending the message. So I have closed the connection inside a finally block as shown below. Is this a correct way to go about it?

@Component
public class DownloadConsumer {
    
    @Autowired
    private JavaMailSender javaMailSender;
    
    // one instance, reuse
    private final CloseableHttpClient httpClient = HttpClients.createDefault();
    
    Connection connection;
            
    
    // Working Code with JMS 2.0
    @JmsListener(destination = "MessageProducerJMSV1")
        public void processBrokerQueues(String message) throws DaoException, JMSException {
            
        
         try {
            
            RequestDao requestDao = (RequestDao) context.getBean("requestDao");
            
            String receivedStatus = requestDao.getRequestStatus(message);
            
            
             
            //Retrieve Username from the message to include in an email
             String[] parts = message.split("#");
             String userName = parts[1].trim();
             
            //Retrieve personnelID from the message to include in the webservice calls
            
             String personnelID = parts[3].trim();
            
            
            
            
            //Before sending this message, do the check for COMPLETE or ERROR etc
            if(receivedStatus.equals("COMPLETE")) {
                
                
                
                String latestUUID = requestDao.getUUID();
                
                logger.info("Received UUID in Controller is as follows! ");
                logger.info(latestUUID);
                
                requestDao.sendMessage(message,latestUUID);
                logger.info("Received status is COMPLETE! ");
                logger.info("Sending email to the user! ");
                String emailMessage = "Dear "+userName+",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
                String recipientEmail = userName+"@organization.com";
                
                
                
                
                /*****************************************************\
                // START: EMAIL Related Code
                
                 *******************************************************/
                
                MimeMessage msg = javaMailSender.createMimeMessage();
                 MimeMessageHelper helper = new MimeMessageHelper(msg, true);
                 helper.setFrom("[email protected]");
                 helper.setTo(recipientEmail);
                 helper.setSubject("Requested Files !");
                 helper.setText(emailMessage,true);
                 
                 javaMailSender.send(msg);
                 
                
                    
                                
            }
            else {
                
                
                // Getting JMS connection from the server and starting it
                ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
                connection = connectionFactory.createConnection();
                connection.start();
                
                Session session = connection.createSession(false,
                        Session.AUTO_ACKNOWLEDGE);
                
                // Destination represents here our queue 'MessageProducerJMSV1' on the  JMS server
                Destination destination = session.createQueue(subject);
                
                
                MessageProducer producer = session.createProducer(destination);
                
                //Sending message to the queue
                TextMessage toSendMessage = session.createTextMessage(message);
                
                long delay = 300 * 1000;
                long period =300 * 1000;
                
                toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
                
                producer.send(toSendMessage);
                
                
                
                
            }
            
            }
            catch(Throwable th){
                th.printStackTrace();   
                
            }
            finally {
            
                connection.close();
                
            }
            
         }
   // URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
    private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
    private static String subject = "MessageProducerJMSV1"; //Queue Name
    // default broker URL is : tcp://localhost:61616"
    
    private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
    private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
    
    

}

Upvotes: 0

Views: 707

Answers (2)

Justin Bertram
Justin Bertram

Reputation: 35028

The reason you're getting "Exceeded the maximum number of allowed client connections" is because you're creating connections and not closing them. In other words, your application is "leaking" connections. In order to fix the leak you need to close the connection. Closing a JMS connection in a finally block is the generally accepted practice so your code looks good there. However, you need to check for null in case there is a problem before the connection is actually created, e.g.:

finally {
    if (connection != null) {
        connection.close();
    }                
}

That said, it's worth noting that creating & closing a JMS connection & session & producer to send a single message is a well-known anti-pattern. It would be better if you cached the connection (e.g. in a static variable) and re-used it. For example:

@Component
public class DownloadConsumer {

   @Autowired
   private JavaMailSender javaMailSender;

   // one instance, reuse
   private final CloseableHttpClient httpClient = HttpClients.createDefault();

   private static Connection connection;

   private static Object connectionLock = new Object();

   // URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
   private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
   private static String subject = "MessageProducerJMSV1"; //Queue Name
   // default broker URL is : tcp://localhost:61616"

   private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
   private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);

   // Working Code with JMS 2.0
   @JmsListener(destination = "MessageProducerJMSV1")
   public void processBrokerQueues(String message) throws DaoException, JMSException {

      try {

         RequestDao requestDao = (RequestDao) context.getBean("requestDao");

         String receivedStatus = requestDao.getRequestStatus(message);

         //Retrieve Username from the message to include in an email
         String[] parts = message.split("#");
         String userName = parts[1].trim();

         //Retrieve personnelID from the message to include in the webservice calls

         String personnelID = parts[3].trim();

         //Before sending this message, do the check for COMPLETE or ERROR etc
         if (receivedStatus.equals("COMPLETE")) {

            String latestUUID = requestDao.getUUID();

            logger.info("Received UUID in Controller is as follows! ");
            logger.info(latestUUID);

            requestDao.sendMessage(message, latestUUID);
            logger.info("Received status is COMPLETE! ");
            logger.info("Sending email to the user! ");
            String emailMessage = "Dear " + userName + ",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
            String recipientEmail = userName + "@organization.com";

            /*****************************************************\
             // START: EMAIL Related Code
             *******************************************************/

            MimeMessage msg = javaMailSender.createMimeMessage();
            MimeMessageHelper helper = new MimeMessageHelper(msg, true);
            helper.setFrom("[email protected]");
            helper.setTo(recipientEmail);
            helper.setSubject("Requested Files !");
            helper.setText(emailMessage, true);

            javaMailSender.send(msg);

         } else {
            try {
               createConnection();

               Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

               // Destination represents here our queue 'MessageProducerJMSV1' on the  JMS server
               Destination destination = session.createQueue(subject);

               MessageProducer producer = session.createProducer(destination);

               //Sending message to the queue
               TextMessage toSendMessage = session.createTextMessage(message);

               long delay = 300 * 1000;
               long period = 300 * 1000;

               toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);

               producer.send(toSendMessage);
            } catch (Throwable th) {
               th.printStackTrace();
               
               synchronized (connectionLock) {
                  // if there are any problems close the connection and it will be re-created next time
                  if (connection != null) {
                     connection.close();
                     connection = null;
                  }
               }
            }
         }
      } catch (Throwable th) {
         th.printStackTrace();
      }
   }
   
   private void createConnection() {
      synchronized (connectionLock) {
         if (connection == null) {
            // Getting JMS connection from the server
            ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
            connection = connectionFactory.createConnection();
         }
      }
   }
}

You'll note that in this code there is no finally block to close the connection. That's intentional because the whole point of this code is keep the connection open so that it's not opening & closing the connection to send a single message. The connection is being re-used between invocations. The only time the connection is closed is when a Throwable is caught.

Also, keep in mind that there's no reason to call start() on the JMS connection if it's just sending a message. The start() method only impacts consumers.

Upvotes: 2

PrasadU
PrasadU

Reputation: 2438

The issue is the below code, which will open new connection each message - you should make this ideally invoke this once (and again if connection expires).

// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();

get a session from it and close the session. Based on usage you can even keep session for longer.

Upvotes: 0

Related Questions