Bogdan Cretu
Bogdan Cretu

Reputation: 73

ActiveMQ master slave failover loses messages

ActiveMQ is losing a lot of messages when failover is involved (only on topics). The producer writes 1000 messages in the topic while (at the same time) the consumer is reading from the same topic. In the middle of the process, I close ActiveMQ master and the process is continued with the ActiveMQ slave. When the transition is made, a lot of messages are lost (~100 messages). The product I'm working on involves not losing messages. What could I do for persistence on topics? Producer:

#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <fstream>
#include <decaf\lang\Throwable.h>

std::string _amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");
const std::string _username("user");
const std::string _password("pass");
const std::string _host("localhost");
const std::string _destination("Test.AMQ.bogcretu.Topic");

std::string _garbageMessage("GARBAGE0_GARBAGE1_GARBAGE2_GARBAGE3_GARBAGE4_GARBAGE5_GARBAGE6_GARBAGE7_GARBAGE8_GARBAGE9");
int _countMessages = 1000;
int _multiplyFactor = 100;
std::string _bodyMessage = "";

void CreateMessage()
{
    for (int i = 0; i < _multiplyFactor; i++) {
        _bodyMessage += _garbageMessage;
    }
}

int main()
{
    activemq::library::ActiveMQCPP::initializeLibrary();
    CreateMessage();
    activemq::core::ActiveMQConnectionFactory factory;
    factory.setBrokerURI(_amqURI);
    std::auto_ptr<cms::TextMessage> message;
    std::auto_ptr<cms::Connection> connection(factory.createConnection(_username, _password));

    connection->start();

    std::auto_ptr<cms::Session> session(connection->createSession());
    std::auto_ptr<cms::Destination> destionation(session->createTopic(_destination));
    std::auto_ptr<cms::MessageProducer> producer(session->createProducer(destionation.get()));

    producer->setDeliveryMode(cms::DeliveryMode::PERSISTENT);

    for (int i = 0; i < _countMessages; i++) {
        std::stringstream ss;
        ss << i;
        std::string number = ss.str();
        message.reset(session->createTextMessage(number));
        producer->send(message.get());
        std::cout << i << std::endl;
    }

    //message.reset(session->createTextMessage("DONE"));
    //producer->send(message.get());

    //connection->close();

    //activemq::library::ActiveMQCPP::shutdownLibrary();

    return 0;
}

Consumer:

#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <cms\MessageListener.h>
#include <cms\Message.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <decaf\lang\Throwable.h>

std::string amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");

class MsgListener : public cms::MessageListener
{
public:
    std::string _amqURI;
    cms::Connection *_connection;
    cms::Session* _session;
    cms::Destination* _destination;
    cms::MessageConsumer* _consumer;
    bool _sessionTransacted;
    bool _useTopic;

    MsgListener(std::string amqURI, bool sessionTransacted, bool useTopic = false) : _amqURI(amqURI), _sessionTransacted(sessionTransacted), _useTopic(useTopic), _connection(NULL), _session(NULL), _destination(NULL), _consumer(NULL)
    {
        this->_connection = cms::ConnectionFactory::createCMSConnectionFactory(this->_amqURI)->createConnection();
        this->_connection->start();

        /*if (this->_sessionTransacted == true) {
            this->_session = this->_connection->createSession(cms::Session::SESSION_TRANSACTED);
        }
        else {
            this->_session = this->_connection->createSession(cms::Session::AUTO_ACKNOWLEDGE);
        }*/

        this->_session = this->_connection->createSession(cms::Session::DUPS_OK_ACKNOWLEDGE);

        if (useTopic) {
            this->_destination = this->_session->createTopic("Test.AMQ.bogcretu.Topic");
        }
        else {
            this->_destination = this->_session->createQueue("Test.AMQ.bogcretu.Topic");
        }

        this->_consumer = this->_session->createConsumer(this->_destination);
        this->_consumer->setMessageListener(this);

        /*std::cout.flush();
        std::cerr.flush();*/


    }

    ~MsgListener()
    {

    }

    void onMessage(const cms::Message* CMSMessage)
    {
        static int count = 0;

        try
        {

            const cms::TextMessage* textMessage = dynamic_cast<const cms::TextMessage*> (CMSMessage);
            std::string text = "";
            if (textMessage != NULL) {
                text = textMessage->getText();
            }
            else {
                text = "NOT A TEXTMESSAGE!";

            }

            std::cout << "(" << count << ", " << text << ")" << std::endl;
            count++;

        }
        catch (cms::CMSException& e)
        {
            e.printStackTrace();
        }

        if (this->_sessionTransacted) {
            this->_session->commit();
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
};

int main()
{
    activemq::library::ActiveMQCPP::initializeLibrary();
    MsgListener consumer(amqURI, true, true);
    while (true);
    //activemq::library::ActiveMQCPP::shutdownLibrary();
}

Consumer_durable:

#include <activemq\library\ActiveMQCPP.h>
#include <cms\Connection.h>
#include <cms\Session.h>
#include <activemq\core\ActiveMQConnection.h>
#include <activemq\core\ActiveMQConnectionFactory.h>
#include <activemq\core\ActiveMQSession.h>
#include <activemq\core\ActiveMQConsumer.h>
#include <activemq\core\ActiveMQQueueBrowser.h>
#include <activemq\core\PrefetchPolicy.h>
#include <activemq/core/policies/DefaultPrefetchPolicy.h>
#include <activemq\util\ActiveMQProperties.h>
#include <activemq\util\Config.h>
#include <decaf\util\Properties.h>
#include <cms\Queue.h>
#include <cms\Topic.h>
#include <cms\MapMessage.h>
#include <cms\MessageEnumeration.h>
#include <cms\CMSException.h>
#include <cms\MessageListener.h>
#include <cms\Message.h>
#include <activemq\commands\Command.h>
#include <decaf\lang\Pointer.h>
#include <activemq\transport\Transport.h>
#include <activemq\transport\TransportFilter.h>
#include <activemq\commands\SessionInfo.h>
#include <activemq\commands\ActiveMQMessage.h>
#include <activemq\commands\BrokerInfo.h>

#include <activemq\exceptions\ConnectionFailedException.h>
#include <string>
#include <vector>
#include <iostream>
#include <chrono>
#include <thread>
#include <decaf\lang\Throwable.h>

std::string amqURI("failover:(tcp://host1:61616,tcp://host2:61616)?initialReconnectDelay=5000&maxReconnectAttempts=2");

class MsgListener : public cms::MessageListener
{
public:
    std::string _amqURI;
    cms::Connection *_connection;
    cms::Session* _session;
    cms::Destination* _destination;
    cms::MessageConsumer* _consumer;
    bool _sessionTransacted;
    bool _useTopic;

    MsgListener(std::string amqURI, bool sessionTransacted, bool useTopic = false) : _amqURI(amqURI), _sessionTransacted(sessionTransacted), _useTopic(useTopic), _connection(NULL), _session(NULL), _destination(NULL), _consumer(NULL)
    {
        this->_connection = cms::ConnectionFactory::createCMSConnectionFactory(this->_amqURI)->createConnection();
        this->_connection->start();

        /*if (this->_sessionTransacted == true) {
            this->_session = this->_connection->createSession(cms::Session::SESSION_TRANSACTED);
        }
        else {
            this->_session = this->_connection->createSession(cms::Session::AUTO_ACKNOWLEDGE);
        }*/

        this->_session = this->_connection->createSession(cms::Session::DUPS_OK_ACKNOWLEDGE);

        if (useTopic) {
            this->_destination = this->_session->createTopic("Test.AMQ.bogcretu.Topic");
        }
        else {
            this->_destination = this->_session->createQueue("Test.AMQ.bogcretu.Topic");
        }

        //this->_consumer = this->_session->createConsumer(this->_destination);



        static const cms::Topic * topic = dynamic_cast<const cms::Topic*>(this->_destination);
        this->_consumer = this->_session->createDurableConsumer(topic, "sub_name", "");
        this->_consumer->setMessageListener(this);

        /*std::cout.flush();
        std::cerr.flush();*/


    }

    ~MsgListener()
    {

    }

    void onMessage(const cms::Message* CMSMessage)
    {
        static int count = 0;

        try
        {

            const cms::TextMessage* textMessage = dynamic_cast<const cms::TextMessage*> (CMSMessage);
            std::string text = "";
            if (textMessage != NULL) {
                text = textMessage->getText();
            }
            else {
                text = "NOT A TEXTMESSAGE!";
            }

            std::cout << "(" << count << ", " << text << ")" << std::endl;
            count++;

        }
        catch (cms::CMSException& e)
        {
            e.printStackTrace();
        }

        if (this->_sessionTransacted) {
            this->_session->commit();
        }

        std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
};

int main()
{
    activemq::library::ActiveMQCPP::initializeLibrary();
    MsgListener consumer(amqURI, true, true);
    while (true);
    //activemq::library::ActiveMQCPP::shutdownLibrary();
}

Upvotes: 0

Views: 577

Answers (1)

Tim Bish
Tim Bish

Reputation: 18356

If you want message persistence then you should either be using Queues, or using Durable Topic subscriptions. Topics on their own do not persist messages regardless of the persistent mode of the producer, in fact if there are no consumers subscribed and you send to a Topic the message is discarded, likewise the ActiveMQ configuration for controlling the constant pending message limit for Topics would discard old messages on a Topic where consumers can't keep up as a Topic has a low service assurance level.

You need to use a Queue and set persistent on the producer, or ensure that you have a pre-existing durable Topic subscription and send the messages using a producer that assigns the persistent flag if you want the message written to the store and recovered on broker failover.

Upvotes: 1

Related Questions