Felk
Felk

Reputation: 8224

How to get notified when a producer's connection dropped?

I am using an embedded ActiveMQ broker. My goal is to find a way to detect when an external producer on a Queue lost it's connection.

I am starting the broker like this:

BrokerService broker = new BrokerService();
broker.addConnector("tcp://" + LISTEN_DEVICE_IP + ":" + port);
setLastMessagesPersistent(broker);
broker.start();

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
connection = factory.createConnection();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

connection.start();

And after that I tried to add a TransportListener:

((ActiveMQConnection) connection).addTransportListener(new TransportListener() {
    public void transportResumed() {
        System.out.println("resumed");
    }
    public void transportInterupted() {
        System.out.println("interrupted");
    }
    public void onException(IOException arg0) {
        System.out.println("ioexception: " + arg0);
    }
    public void onCommand(Object arg0) {
        System.out.println("command: " + arg0);
    }
});

I also register a consumer and ProducerListener like this:

Destination dest = session.createQueue(queuename);
MessageConsumer consumer = session.createConsumer(dest);

ProducerEventSource source = new ProducerEventSource(connection, dest);
System.out.println("Setting Producer Listener");
source.setProducerListener(prodevent -> {
    System.out.println("producer status: " + prodevent.isStarted());
});
// Gets called from inside the broker's Thread and somehow causes deadlocks if I don't invoke this from the outside
new Thread(() -> {
    try {
        consumer.setMessageListener(new NetworkEventPlayerAdapter(objectMapper, event, gameEventManager, playerID));
    } catch (Exception e) {
        e.printStackTrace();
    }
}).start();

Unfortunately neither the TransportListener, nor the ProducerListener give me any output when I forcefully quit another application that previously was added as a producer (Alt+F4). The broker surely notices though:

 WARN | Transport Connection to: tcp://127.0.0.1:58988 failed: java.net.SocketException: Connection reset
 WARN | Transport Connection to: tcp://127.0.0.1:58986 failed: java.net.SocketException: Connection reset

But I don't find a way to get a callback on those events in Java. I also tried setting a custom IOExceptionHandler in the broker and adding an ExceptionListener to the connection. They also get never called.

Upvotes: 0

Views: 201

Answers (2)

Sundar
Sundar

Reputation: 574

You can use The Advisory Topics ActiveMQ.Advisory.Connection or even the ActiveMQ.Advisory.Producer.Queue ActiveMQ.Advisory.Producer.Topic , these provide the stats on connections of number of producers , check this link http://activemq.apache.org/advisory-message.html

Upvotes: 1

ralf htp
ralf htp

Reputation: 9422

a possible way is to parse the log-output for Connection reset

WARN | Transport Connection to: tcp://127.0.0.1:58988 failed: java.net.SocketException: Connection reset WARN | Transport Connection to: tcp://127.0.0.1:58986 failed: java.net.SocketException: Connection reset

Because the socket is implemented in activemq you would have to add the ExceptionListener there to write your own exception handling routine ...

Upvotes: 0

Related Questions