Reputation: 2962
I have implemented a queue with the help of MySql Table and Java Program. I want to implement the following program with Apache ActiveMQ ,any suggestions are very appreciated.
Table name : XXXXX
Col's : id | Msg | key_id |Status
----------------------------|------
| 1 | Msg1| 1 | 1
| 2 | Msg2| 2 | 1
| 3 | Msg3| 1 | 0
| 4 | Msg4| 1 | 0
| 5 | Msg5| 4 | 0
| 6 | Msg6| 3 | 0
while (true) {
try {
Thread.sleep(5000);
Fetch only one record from table XXXX whose key_id not in list and status is 0. Note list contains zero elements .
add key_id in list.
update status of fetched record to 1
process the fetched record ... some business logic
set status to 2 if business logic works as expected,else status as 3 if exception occured
} catch (Exception ex) {
}
How do I Implement this with Apache ActiveMQ?
How i tried this
package com.jms.activemq.examples;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageProducer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Sender {
private ConnectionFactory factory = null;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private MessageProducer producer = null;
public Sender() {
}
public void sendMessage() {
try {
factory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("SAMPLEQUEUE");
producer = session.createProducer(destination);
TextMessage message = session.createTextMessage();
while (true) {
try {
Thread.sleep(5000);
Fetch only one record from table XXXX whose key_id not in list and status is 0. Note list contains zero elements .
add key_id in list.
update status of fetched record to 1
message.setText("some data from Database");
producer.send(message);
System.out.println("Sent: " + message.getText());
} catch (Exception ex) {
}
}
}catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Sender sender = new Sender();
sender.sendMessage();
}
}
And Receiver
package com.jms.activemq.examples;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.Session;
import javax.jms.TextMessage;
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
public class Receiver {
private ConnectionFactory factory = null;
private Connection connection = null;
private Session session = null;
private Destination destination = null;
private MessageConsumer consumer = null;
public Receiver() {
}
public void receiveMessage() {
try {
factory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_BROKER_URL);
connection = factory.createConnection();
connection = factory.createConnection();
connection.start();
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
destination = session.createQueue("SAMPLEQUEUE");
consumer = session.createConsumer(destination);
Message message = consumer.receive();
if (message instanceof TextMessage) {
process the fetched record ... some business logic
set status to 2 if business logic works as expected,else status as 3 if exception occured
TextMessage text = (TextMessage) message;
System.out.println("Message is : " + text.getText());
}
} catch (JMSException e) {
e.printStackTrace();
}
}
public static void main(String[] args) {
Receiver receiver = new Receiver();
receiver.receiveMessage();
}
}
any suggestions are very appreciated.
Upvotes: 3
Views: 1009
Reputation: 154
If I correctly understand your needs, you could use the simple JDBC connector like:
public class SimpleJDBCConnector {
static final String JDBC_DRIVER = "org.postgresql.Driver";
static final String DB_URL = "jdbc:postgresql://localhost:5432/postgres";
static final String USER = "postgres";
static final String PASS = "admin";
Connection conn = null;
Statement stmt = null;
ResultSet rs = null;
public ResultSet executeQuery(String sql) throws SQLException {
stmt = conn.createStatement();
return stmt.executeQuery(sql);
}
public void closeConnection() throws SQLException {
conn.close();
}
public boolean connect() throws ClassNotFoundException, SQLException {
Class.forName(JDBC_DRIVER);
conn = DriverManager.getConnection(DB_URL, USER, PASS);
return true;
}
}
And the following SQL queries:
Set<Integer> set = new HashSet<Integer>();
int status=0;
int id=0;
int key_id=0;
String sql1 = "SELECT id,msg,key_id,status FROM xxxx WHERE key_id NOT IN ("
+set.toString().replaceAll("[\\]\\[\\ ]","")+") AND status = "+status+ + " LIMIT 1;";
String sql2 = "UPDATE xxxx SET status="+status+" WHERE id="+id;
To extract data from the response, you could do the following:
ResultSet rs = db.executeQuery(sql1);
while(rs.next()){
id = rs.getInt("id");
String msg = rs.getString("msg");
key_id = rs.getInt("key_id");
status = rs.getInt("status");
}
Upvotes: 1