Reputation: 1585
I am trying to figure out a solution for the following error message without increasing the default connection size from 1000 to 2000 or more.
Recently I came across the following error when around 1000 messages were sent to the broker with a delay of 5 minutes as shown in the code below.
WARN | Could not accept connection : Exceeded the maximum number of allowed client connections. See the 'maximumConnections' property on the TCP transport configuration URI in the ActiveMQ configuration file (e.g., activemq.xml) | org.apache.activemq.broker.TransportConnector | ActiveMQ Transport Server Thread Handler: tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600
The following is the code which is listening to ActiveMQ continously and as soon as it sees COMPLETE
, it sends out an email to the user after generating a file.Otherwise, it goes inside else
block and sends out the message to the broker again.
Inside the else block, I want to test by closing the connection after I am done sending the message. So I have closed the connection inside a finally block as shown below. Is this a correct way to go about it?
@Component
public class DownloadConsumer {
@Autowired
private JavaMailSender javaMailSender;
// one instance, reuse
private final CloseableHttpClient httpClient = HttpClients.createDefault();
Connection connection;
// Working Code with JMS 2.0
@JmsListener(destination = "MessageProducerJMSV1")
public void processBrokerQueues(String message) throws DaoException, JMSException {
try {
RequestDao requestDao = (RequestDao) context.getBean("requestDao");
String receivedStatus = requestDao.getRequestStatus(message);
//Retrieve Username from the message to include in an email
String[] parts = message.split("#");
String userName = parts[1].trim();
//Retrieve personnelID from the message to include in the webservice calls
String personnelID = parts[3].trim();
//Before sending this message, do the check for COMPLETE or ERROR etc
if(receivedStatus.equals("COMPLETE")) {
String latestUUID = requestDao.getUUID();
logger.info("Received UUID in Controller is as follows! ");
logger.info(latestUUID);
requestDao.sendMessage(message,latestUUID);
logger.info("Received status is COMPLETE! ");
logger.info("Sending email to the user! ");
String emailMessage = "Dear "+userName+",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
String recipientEmail = userName+"@organization.com";
/*****************************************************\
// START: EMAIL Related Code
*******************************************************/
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg, true);
helper.setFrom("[email protected]");
helper.setTo(recipientEmail);
helper.setSubject("Requested Files !");
helper.setText(emailMessage,true);
javaMailSender.send(msg);
}
else {
// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false,
Session.AUTO_ACKNOWLEDGE);
// Destination represents here our queue 'MessageProducerJMSV1' on the JMS server
Destination destination = session.createQueue(subject);
MessageProducer producer = session.createProducer(destination);
//Sending message to the queue
TextMessage toSendMessage = session.createTextMessage(message);
long delay = 300 * 1000;
long period =300 * 1000;
toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
producer.send(toSendMessage);
}
}
catch(Throwable th){
th.printStackTrace();
}
finally {
connection.close();
}
}
// URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static String subject = "MessageProducerJMSV1"; //Queue Name
// default broker URL is : tcp://localhost:61616"
private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
}
Upvotes: 0
Views: 707
Reputation: 35028
The reason you're getting "Exceeded the maximum number of allowed client connections" is because you're creating connections and not closing them. In other words, your application is "leaking" connections. In order to fix the leak you need to close the connection. Closing a JMS connection in a finally
block is the generally accepted practice so your code looks good there. However, you need to check for null
in case there is a problem before the connection
is actually created, e.g.:
finally {
if (connection != null) {
connection.close();
}
}
That said, it's worth noting that creating & closing a JMS connection & session & producer to send a single message is a well-known anti-pattern. It would be better if you cached the connection (e.g. in a static
variable) and re-used it. For example:
@Component
public class DownloadConsumer {
@Autowired
private JavaMailSender javaMailSender;
// one instance, reuse
private final CloseableHttpClient httpClient = HttpClients.createDefault();
private static Connection connection;
private static Object connectionLock = new Object();
// URL of the JMS server. DEFAULT_BROKER_URL will just mean that JMS server is on localhost
private static String url = ActiveMQConnection.DEFAULT_BROKER_URL;
private static String subject = "MessageProducerJMSV1"; //Queue Name
// default broker URL is : tcp://localhost:61616"
private static ClassPathXmlApplicationContext context = new ClassPathXmlApplicationContext("ApplicationContext.xml");
private static final Logger logger = LoggerFactory.getLogger(DownloadConsumer.class);
// Working Code with JMS 2.0
@JmsListener(destination = "MessageProducerJMSV1")
public void processBrokerQueues(String message) throws DaoException, JMSException {
try {
RequestDao requestDao = (RequestDao) context.getBean("requestDao");
String receivedStatus = requestDao.getRequestStatus(message);
//Retrieve Username from the message to include in an email
String[] parts = message.split("#");
String userName = parts[1].trim();
//Retrieve personnelID from the message to include in the webservice calls
String personnelID = parts[3].trim();
//Before sending this message, do the check for COMPLETE or ERROR etc
if (receivedStatus.equals("COMPLETE")) {
String latestUUID = requestDao.getUUID();
logger.info("Received UUID in Controller is as follows! ");
logger.info(latestUUID);
requestDao.sendMessage(message, latestUUID);
logger.info("Received status is COMPLETE! ");
logger.info("Sending email to the user! ");
String emailMessage = "Dear " + userName + ",<p>Your files are ready. </p><p> Thanks,<br/> Jack/p>";
String recipientEmail = userName + "@organization.com";
/*****************************************************\
// START: EMAIL Related Code
*******************************************************/
MimeMessage msg = javaMailSender.createMimeMessage();
MimeMessageHelper helper = new MimeMessageHelper(msg, true);
helper.setFrom("[email protected]");
helper.setTo(recipientEmail);
helper.setSubject("Requested Files !");
helper.setText(emailMessage, true);
javaMailSender.send(msg);
} else {
try {
createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// Destination represents here our queue 'MessageProducerJMSV1' on the JMS server
Destination destination = session.createQueue(subject);
MessageProducer producer = session.createProducer(destination);
//Sending message to the queue
TextMessage toSendMessage = session.createTextMessage(message);
long delay = 300 * 1000;
long period = 300 * 1000;
toSendMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
producer.send(toSendMessage);
} catch (Throwable th) {
th.printStackTrace();
synchronized (connectionLock) {
// if there are any problems close the connection and it will be re-created next time
if (connection != null) {
connection.close();
connection = null;
}
}
}
}
} catch (Throwable th) {
th.printStackTrace();
}
}
private void createConnection() {
synchronized (connectionLock) {
if (connection == null) {
// Getting JMS connection from the server
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
}
}
}
}
You'll note that in this code there is no finally
block to close the connection. That's intentional because the whole point of this code is keep the connection open so that it's not opening & closing the connection to send a single message. The connection is being re-used between invocations. The only time the connection is closed is when a Throwable
is caught.
Also, keep in mind that there's no reason to call start()
on the JMS connection if it's just sending a message. The start()
method only impacts consumers.
Upvotes: 2
Reputation: 2438
The issue is the below code, which will open new connection each message - you should make this ideally invoke this once (and again if connection expires).
// Getting JMS connection from the server and starting it
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(url);
connection = connectionFactory.createConnection();
connection.start();
get a session from it and close the session. Based on usage you can even keep session for longer.
Upvotes: 0