Reputation: 8224
I am using an embedded ActiveMQ broker. My goal is to find a way to detect when an external producer on a Queue lost it's connection.
I am starting the broker like this:
BrokerService broker = new BrokerService();
broker.addConnector("tcp://" + LISTEN_DEVICE_IP + ":" + port);
setLastMessagesPersistent(broker);
broker.start();
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
connection.start();
And after that I tried to add a TransportListener:
((ActiveMQConnection) connection).addTransportListener(new TransportListener() {
public void transportResumed() {
System.out.println("resumed");
}
public void transportInterupted() {
System.out.println("interrupted");
}
public void onException(IOException arg0) {
System.out.println("ioexception: " + arg0);
}
public void onCommand(Object arg0) {
System.out.println("command: " + arg0);
}
});
I also register a consumer and ProducerListener like this:
Destination dest = session.createQueue(queuename);
MessageConsumer consumer = session.createConsumer(dest);
ProducerEventSource source = new ProducerEventSource(connection, dest);
System.out.println("Setting Producer Listener");
source.setProducerListener(prodevent -> {
System.out.println("producer status: " + prodevent.isStarted());
});
// Gets called from inside the broker's Thread and somehow causes deadlocks if I don't invoke this from the outside
new Thread(() -> {
try {
consumer.setMessageListener(new NetworkEventPlayerAdapter(objectMapper, event, gameEventManager, playerID));
} catch (Exception e) {
e.printStackTrace();
}
}).start();
Unfortunately neither the TransportListener, nor the ProducerListener give me any output when I forcefully quit another application that previously was added as a producer (Alt+F4). The broker surely notices though:
WARN | Transport Connection to: tcp://127.0.0.1:58988 failed: java.net.SocketException: Connection reset
WARN | Transport Connection to: tcp://127.0.0.1:58986 failed: java.net.SocketException: Connection reset
But I don't find a way to get a callback on those events in Java.
I also tried setting a custom IOExceptionHandler
in the broker and adding an ExceptionListener
to the connection. They also get never called.
Upvotes: 0
Views: 201
Reputation: 574
You can use The Advisory Topics ActiveMQ.Advisory.Connection or even the ActiveMQ.Advisory.Producer.Queue ActiveMQ.Advisory.Producer.Topic , these provide the stats on connections of number of producers , check this link http://activemq.apache.org/advisory-message.html
Upvotes: 1
Reputation: 9422
a possible way is to parse the log-output for Connection reset
WARN | Transport Connection to: tcp://127.0.0.1:58988 failed: java.net.SocketException: Connection reset WARN | Transport Connection to: tcp://127.0.0.1:58986 failed: java.net.SocketException: Connection reset
Because the socket is implemented in activemq you would have to add the ExceptionListener there to write your own exception handling routine ...
Upvotes: 0