skyman
skyman

Reputation: 2335

Reliable Messaging with RabbitMQ

I have an application that sends AMQP messages via RabbitMQ. message sending is triggered on an http request. Recently I have noticed that some messages appear to be getting lost (as in never delivered). I also noticed that the list of channels being managed by the server is steadily increasing. The first thing I have corrected is to close channels after they are no longer required. However, I am still not sure my code is correctly structured to ensure delivery. Two sections of code are below; the first is a section of a singleton that manages the connection (does not recreate on every call), the second is the sending code. Any advice / guidance would be appreciated.

@Service
public class PersistentConnection {
    private static Connection myConnection = null;
    private Boolean blocked = false;

    @Autowired ApplicationConfiguration applicationConfiguration;
    @Autowired ConfigurationService configurationService;

    @PostConstruct
    private void init() {
    }

    @PreDestroy
    private void destroy() {
        try {
            myConnection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public Connection getConnection( ) {
        if (myConnection == null) {
            start();
        }
        else if (!myConnection.isOpen()) {
            log.warn("AMQP Connection closed.  Attempting to start.");
            start();
        }
        return myConnection;
    }


    private void start() {
        log.debug("Building AMQP Connection");

        ConnectionFactory factory = new ConnectionFactory();
        String ipAddress = applicationConfiguration.getAMQPHost();
        String password = applicationConfiguration.getAMQPUser();
        String user = applicationConfiguration.getAMQPPassword();
        String virtualHost = applicationConfiguration.getAMQPVirtualHost();
        String port = applicationConfiguration.getAMQPPort();

        try {
            factory.setUsername(user);
            factory.setPassword(password);
            factory.setVirtualHost(virtualHost);
            factory.setPort(Integer.parseInt(port));
            factory.setHost(ipAddress);
            myConnection = factory.newConnection();
        }
        catch (Exception e) {
            e.printStackTrace();
        }

        myConnection.addBlockedListener(new BlockedListener() {
            public void handleBlocked(String reason) throws IOException {
                // Connection is now blocked
                blocked = true;
            }

            public void handleUnblocked() throws IOException {
                // Connection is now unblocked
                blocked = false;
            }
        });
    }

    public Boolean isBlocked() {
        return blocked;
    }
}

/*
 * Sends ADT message to AMQP server.
 */
private void send(String routingKey, String message) throws Exception { 
    String exchange = applicationConfiguration.getAMQPExchange();  
    String exchangeType = applicationConfiguration.getAMQPExchangeType();

    Connection connection = myConnection.getConnection();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare(exchange, exchangeType);
    channel.basicPublish(exchange, routingKey, null, message.getBytes());

    // Close the channel if it is no longer needed in this thread
    channel.close();
}

Upvotes: 1

Views: 1072

Answers (1)

Gabriele Santomaggio
Gabriele Santomaggio

Reputation: 22760

Try this code:

@Service
public class PersistentConnection {
    private Connection myConnection = null;
    private Boolean blocked = false;

    @Autowired ApplicationConfiguration applicationConfiguration;
    @Autowired ConfigurationService configurationService;

    @PostConstruct
    private void init() {
      start(); /// In this way you can initthe connection and you are sure it is called only one time.
    }

    @PreDestroy
    private void destroy() {
        try {
            myConnection.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public Connection getConnection( ) {
        return myConnection;
    }


    private void start() {
        log.debug("Building AMQP Connection");

        ConnectionFactory factory = new ConnectionFactory();
        String ipAddress = applicationConfiguration.getAMQPHost();
        String password = applicationConfiguration.getAMQPUser();
        String user = applicationConfiguration.getAMQPPassword();
        String virtualHost = applicationConfiguration.getAMQPVirtualHost();
        String port = applicationConfiguration.getAMQPPort();

        try {
            factory.setUsername(user);
            factory.setPassword(password);
            factory.setVirtualHost(virtualHost);
            factory.setPort(Integer.parseInt(port));
            factory.setHost(ipAddress);
            myConnection = factory.newConnection();
        }
        catch (Exception e) {
            e.printStackTrace();
        }

        myConnection.addBlockedListener(new BlockedListener() {
            public void handleBlocked(String reason) throws IOException {
                // Connection is now blocked
                blocked = true;
            }

            public void handleUnblocked() throws IOException {
                // Connection is now unblocked
                blocked = false;
            }
        });
    }

    public Boolean isBlocked() {
        return blocked;
    }
}

/*
 * Sends ADT message to AMQP server.
 */
private void send(String routingKey, String message) throws Exception { 
    String exchange = applicationConfiguration.getAMQPExchange();  
    String exchangeType = applicationConfiguration.getAMQPExchangeType();

    Connection connection = myConnection.getConnection();
    if (connection!=null){
    Channel channel = connection.createChannel();
    try{
    channel.exchangeDeclare(exchange, exchangeType);
    channel.basicPublish(exchange, routingKey, null, message.getBytes());
    } finally{
      // Close the channel if it is no longer needed in this thread
       channel.close();
   }

} }

This could be enough, you have an connection with rabbitmq when the system starts.

If you an lazy singleton, the code is just a bit different.

I suggest to not use isOpen() method, please read here:

isOpen

boolean isOpen() Determine whether the component is currently open. Will return false if we are currently closing. Checking this method should be only for information, because of the race conditions - state can change after the call. Instead just execute and try to catch ShutdownSignalException and IOException Returns: true when component is open, false otherwise

EDIT**

Question 1:

What are you looking for is the HA client.

RabbitMQ java client by default doesn't support this features, since the version 3.3.0 supports only the reconnect,read this:

...allows Java-based clients to reconnect automatically after network failure. If you want be sure about your messages you have to create an robust client able to resists to all fails.

Generally you should consider the fails, for example: what happen if there is an error during the message publish?

In your case you simply lose the message,You should re-queue the message manually.

Question 2:

I don’t know your code, but connection == null shouldn’t happen, because this procedure is called for first:

@PostConstruct
    private void init() {
      start(); /// In this way you can initthe connection and you are sure it is called only one time.
    }

Anyway you can raise an exception, the question is: What do I have to do with the message that I was trying to send?

See the question 1

I’d like to suggest to read more about the HA, for example this:

Create a reliable system with rabbitmq is not complex, but you should know some basic concept.

Anyway .. Let me know!

Upvotes: 2

Related Questions