LMK
LMK

Reputation: 2962

How to implement condition based Queue with Apache ActiveMQ in Java

I have implemented a queue with the help of MySql Table and Java Program. I want to implement the following program with Apache ActiveMQ ,any suggestions are very appreciated.

Table name : XXXXX

Col's : id | Msg |  key_id  |Status
----------------------------|------
      |  1 | Msg1|   1      |  1 
      |  2 | Msg2|   2      |  1
      |  3 | Msg3|   1      |  0
      |  4 | Msg4|   1      |  0
      |  5 | Msg5|   4      |  0
      |  6 | Msg6|   3      |  0


while (true) {

        try {
            Thread.sleep(5000);
            Fetch only one record from table XXXX whose key_id not in list and status is 0. Note list contains zero elements .
            add key_id in list.
            update status of fetched record to 1
            process the fetched record ... some business logic
            set status to 2 if business logic works as expected,else status as 3 if exception occured


            } catch (Exception ex) {

}

How do I Implement this with Apache ActiveMQ?
How i tried this

        package com.jms.activemq.examples;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.MessageProducer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

public class Sender {

private ConnectionFactory factory = null;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private MessageProducer producer = null;

public Sender() {

}

public void sendMessage() {

    try {
        factory = new ActiveMQConnectionFactory(
                ActiveMQConnection.DEFAULT_BROKER_URL);
        connection = factory.createConnection();
        connection.start();
        session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        destination = session.createQueue("SAMPLEQUEUE");
        producer = session.createProducer(destination);
        TextMessage message = session.createTextMessage();

        while (true) {

            try {
                Thread.sleep(5000);
            Fetch only one record from table XXXX whose key_id not in list and status is 0. Note list contains zero elements .
            add key_id in list.
            update status of fetched record to 1
                message.setText("some data from Database");
                producer.send(message);

                System.out.println("Sent: " + message.getText());
            } catch (Exception ex) {

            }
        }


        }catch (JMSException e) {
        e.printStackTrace();
    }
    }



    public static void main(String[] args) {
        Sender sender = new Sender();
        sender.sendMessage();

    }

    }

And Receiver

        package com.jms.activemq.examples;

    import javax.jms.Connection;
    import javax.jms.ConnectionFactory;
    import javax.jms.Destination;
    import javax.jms.JMSException;
    import javax.jms.Message;
    import javax.jms.MessageConsumer;
    import javax.jms.Session;
    import javax.jms.TextMessage;
    import org.apache.activemq.ActiveMQConnection;
    import org.apache.activemq.ActiveMQConnectionFactory;

    public class Receiver {

        private ConnectionFactory factory = null;
        private Connection connection = null;
        private Session session = null;
        private Destination destination = null;
        private MessageConsumer consumer = null;

        public Receiver() {

        }

        public void receiveMessage() {
            try {
                factory = new ActiveMQConnectionFactory(
                        ActiveMQConnection.DEFAULT_BROKER_URL);
                connection = factory.createConnection();
                connection = factory.createConnection();
                connection.start();
                session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
                destination = session.createQueue("SAMPLEQUEUE");
                consumer = session.createConsumer(destination);
                Message message = consumer.receive();

                if (message instanceof TextMessage) {
          process the fetched record ... some business logic
          set status to 2 if business logic works as expected,else status as 3 if exception occured

                    TextMessage text = (TextMessage) message;
                    System.out.println("Message is : " + text.getText());

                }
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }

        public static void main(String[] args) {
            Receiver receiver = new Receiver();
            receiver.receiveMessage();
        }
    }

any suggestions are very appreciated.

Upvotes: 3

Views: 1009

Answers (1)

Dmitry
Dmitry

Reputation: 154

If I correctly understand your needs, you could use the simple JDBC connector like:

public class SimpleJDBCConnector {
    static final String JDBC_DRIVER = "org.postgresql.Driver";
    static final String DB_URL = "jdbc:postgresql://localhost:5432/postgres";
    static final String USER = "postgres";
    static final String PASS = "admin";

    Connection conn = null;
    Statement stmt = null;
    ResultSet rs = null;

    public ResultSet executeQuery(String sql) throws SQLException {
        stmt = conn.createStatement();
        return stmt.executeQuery(sql);
    }

    public void closeConnection() throws SQLException {
        conn.close();
    }

    public boolean connect() throws ClassNotFoundException, SQLException {
            Class.forName(JDBC_DRIVER);
            conn = DriverManager.getConnection(DB_URL, USER, PASS);
        return true;
    }
}

And the following SQL queries:

Set<Integer> set = new HashSet<Integer>();
int status=0;
int id=0;
int key_id=0;

String sql1 = "SELECT id,msg,key_id,status FROM xxxx WHERE key_id NOT IN ("
        +set.toString().replaceAll("[\\]\\[\\ ]","")+") AND status = "+status+ + " LIMIT 1;";

String sql2 = "UPDATE xxxx SET status="+status+" WHERE id="+id;

To extract data from the response, you could do the following:

ResultSet rs = db.executeQuery(sql1);
        while(rs.next()){
            id  = rs.getInt("id");
            String msg = rs.getString("msg");
            key_id = rs.getInt("key_id");
            status = rs.getInt("status");
        }

Upvotes: 1

Related Questions