FloppyDisk
FloppyDisk

Reputation: 1703

HornetQ Consumer Stops Receiving Messages After N hours

I'm using a HornetQ(v2.2.13) consumer, running standalone, to read off a Persistent topic published by a JBOSS server(7.1.1 final). All goes well for several hours (between 2-6) and then the consumer just stops receiving messages from the topic. From the log file on the server, I see data keeps getting pumped down the pipe but the consumer log file indicates the client stopped reading the data. I deduced that from the client saying the last time it read a message off the topic was 12:00:00 and the server log saying the last time it pushed a message to the topic was 14:00:00.

I've tried adjusting the HornetQ configs, but it doesn't seem to be working for a sustainable duration.

The code I use to communicate with the topic is as follows.

private TransportConfiguration getTC(String hostname) {
        Map<String,Object> params = new HashMap<String, Object>();
        params.put(TransportConstants.HOST_PROP_NAME, hostname);
        params.put(TransportConstants.PORT_PROP_NAME, 5445);
        TransportConfiguration tc = new TransportConfiguration(NettyConnectorFactory.class.getName(), params);
        return tc;

    }

    private Topic createDestination(String destinationName) {
        Topic topic = new HornetQTopic(destinationName);
        return topic;

    }

    private HornetQConnectionFactory createCF(TransportConfiguration tc) {
        HornetQConnectionFactory cf = HornetQJMSClient.createConnectionFactoryWithoutHA(JMSFactoryType .CF, tc);
        return cf == null ? null : cf;

    }

Code snippet that creates the session and starts it:

TransportConfiguration tc = this.getTC(this.hostname);
        HornetQConnectionFactory cf = this.createCF(tc);
        cf.setRetryInterval(4000);
        cf.setReconnectAttempts(10);
        cf.setConfirmationWindowSize(1000000);

        Destination destination = this.createDestination(this.topicName);
        logger.info("Starting Topic Connection");
        try {
            this.connection = cf.createConnection();

            connection.start();
            this.session = connection.createSession(transactional, ackMode);
            MessageConsumer consumer = session.createConsumer(destination);
            consumer.setMessageListener(this);

            logger.info("Started topic connection");
        } catch (Exception ex) {
            ex.printStackTrace();
            logger.error("EXCEPTION!");
        }

Upvotes: 1

Views: 1912

Answers (1)

Clebert Suconic
Clebert Suconic

Reputation: 5383

You don't get any log about server getting disconnected on the server's side.

Did you try playing with client-failure-check-period and other ping parameters?

What about VM Settings?

How are you acknowledging the messages? I see that you created it as transaction. Are you sure you are committing the TX as you recieve messages?

Upvotes: 1

Related Questions