Reputation: 1648
I want to route multiple messages between several RabbitMQ exchanges. This is the routing table that I want to use:
// | exchange | type | routing key | queue |
// |-----------------------------------------------------------------|
// | processing | topic | processing.event.transaction | processing.transaction.queue |
// | database | topic | database.event.transaction | database.transaction.queue |
// | database | topic | database.event.api_attempts | database.api_attempts.queue |
// | database | topic | database.event.event_logs | database.event_logs.queue |
I have 3 modules which I want to configure to send messages this way:
REST API Module -> Gateway module
REST API Module -> Database Module
REST API Module configuration
String QUEUE_PROCESSING_TRANSACTION = "processing.transaction.queue";
String QUEUE_DATABASE_TRANSACTION = "database.transaction.queue";
String QUEUE_DATABASE_API_ATTEMPT = "database.api_attempts.queue";
String QUEUE_DATABASE_EVENT_LOGS = "database.event_logs.queue";
String EXCHANGE_PROCESSING = "processing";
String EXCHANGE_DATABASE = "database";
String ROUTING_KEY_PROCESSING = "processing.event.transaction";
String ROUTING_KEY_DATABASE = "database.event.transaction";
String ROUTING_KEY_API_ATTEMPTS = "database.event.api_attempts";
String ROUTING_KEY_EVENT_LOGS = "database.event.event_logs";
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_PROCESSING, BuiltinExchangeType.TOPIC);
channel.exchangeDeclare(EXCHANGE_DATABASE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_PROCESSING_TRANSACTION, false, false, false, null);
channel.queueDeclare(QUEUE_DATABASE_TRANSACTION, false, false, false, null);
channel.queueDeclare(QUEUE_DATABASE_API_ATTEMPT, false, false, false, null);
channel.queueDeclare(QUEUE_DATABASE_EVENT_LOGS, false, false, false, null);
channel.queueBind(QUEUE_PROCESSING_TRANSACTION, EXCHANGE_PROCESSING, ROUTING_KEY_PROCESSING);
channel.queueBind(QUEUE_DATABASE_TRANSACTION, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_API_ATTEMPT, EXCHANGE_DATABASE, ROUTING_KEY_API_ATTEMPTS);
channel.queueBind(QUEUE_DATABASE_EVENT_LOGS, EXCHANGE_DATABASE, ROUTING_KEY_EVENT_LOGS);
Sending Java objects to other modules:
TransactionsBean obj = new TransactionsBean();
obj.setId(Long.valueOf(111222333));
channel.basicPublish(EXCHANGE_PROCESSING, ROUTING_KEY_PROCESSING, null, SerializationUtils.serialize(obj));
channel.basicPublish(EXCHANGE_DATABASE, ROUTING_KEY_DATABASE, null, SerializationUtils.serialize(obj));
ApiAttemptsBean obj = new ApiAttemptsBean();
obj.setId(Long.valueOf(2332));
channel.basicPublish(EXCHANGE_DATABASE, ROUTING_KEY_API_ATTEMPTS, null, SerializationUtils.serialize(obj));
EventLogsBean obj = new EventLogsBean();
obj.setId(Long.valueOf(111222));
channel.basicPublish(EXCHANGE_DATABASE, ROUTING_KEY_EVENT_LOGS, null, SerializationUtils.serialize(obj));
Module Gateway configuration:
String QUEUE_PROCESSING_TRANSACTION = "processing.transaction.queue";
String QUEUE_DATABASE_TRANSACTION = "database.transaction.queue";
String QUEUE_DATABASE_API_ATTEMPT = "database.api_attempts.queue";
String QUEUE_DATABASE_EVENT_LOGS = "database.event_logs.queue";
String EXCHANGE_DATABASE = "database";
String ROUTING_KEY_DATABASE = "database.event.transaction";
String ROUTING_KEY_API_ATTEMPTS = "database.event.api_attempts";
String ROUTING_KEY_EVENT_LOGS = "database.event.event_logs";
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_PROCESSING, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_PROCESSING_TRANSACTION, false, false, false, null);
channel.queueBind(QUEUE_PROCESSING_TRANSACTION, EXCHANGE_PROCESSING, ROUTING_KEY_PROCESSING);
Map<String, Consumer<byte[]>> queueToConsumer = new HashMap<>();
queueToConsumer.put(QUEUE_DATABASE_TRANSACTION, this::process_transaction);
queueToConsumer.put(QUEUE_DATABASE_API_ATTEMPT, this::process_api_attempt);
queueToConsumer.put(QUEUE_DATABASE_EVENT_LOGS, this::process_event_logs);
queueToConsumer.forEach((queueName, consumer) -> {
try {
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
consumer.accept(body);
}
});
} catch (IOException e) {
e.printStackTrace();
}
});
}
private void process_transaction(byte[] object) {
TransactionsBean obj = (TransactionsBean) SerializationUtils.deserialize(object);
System.out.println("!!!! Received id " + obj.getId() + " in gateway");
}
private void process_api_attempt(byte[] object) {
ApiAttemptsBean obj = (ApiAttemptsBean) SerializationUtils.deserialize(object);
System.out.println("!!!! Received id " + obj.getId() + " in gateway");
}
private void process_event_logs(byte[] object) {
EventLogsBean obj = (EventLogsBean) SerializationUtils.deserialize(object);
System.out.println("!!!! Received id " + obj.getId() + " in gateway");
}
Module Database:
String QUEUE_PROCESSING_TRANSACTION = "processing.transaction.queue";
String QUEUE_DATABASE_TRANSACTION = "database.transaction.queue";
String QUEUE_DATABASE_API_ATTEMPT = "database.api_attempts.queue";
String QUEUE_DATABASE_EVENT_LOGS = "database.event_logs.queue";
String EXCHANGE_DATABASE = "database";
String ROUTING_KEY_PROCESSING = "processing.event.transaction";
String ROUTING_KEY_DATABASE = "database.event.transaction";
String ROUTING_KEY_API_ATTEMPTS = "database.event.api_attempts";
String ROUTING_KEY_EVENT_LOGS = "database.event.event_logs";
channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_DATABASE, BuiltinExchangeType.TOPIC);
channel.queueDeclare(QUEUE_DATABASE_TRANSACTION, false, false, false, null);
channel.queueDeclare(QUEUE_DATABASE_API_ATTEMPT, false, false, false, null);
channel.queueDeclare(QUEUE_DATABASE_EVENT_LOGS, false, false, false, null);
channel.queueBind(QUEUE_DATABASE_TRANSACTION, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_API_ATTEMPT, EXCHANGE_DATABASE, ROUTING_KEY_API_ATTEMPTS);
channel.queueBind(QUEUE_DATABASE_EVENT_LOGS, EXCHANGE_DATABASE, ROUTING_KEY_EVENT_LOGS);
Map<String, Consumer<byte[]>> queueToConsumer = new HashMap<>();
queueToConsumer.put(QUEUE_DATABASE_TRANSACTION, this::process_transaction);
queueToConsumer.put(QUEUE_DATABASE_API_ATTEMPT, this::process_api_attempt);
queueToConsumer.put(QUEUE_DATABASE_EVENT_LOGS, this::process_event_logs);
queueToConsumer.forEach((queueName, consumer) -> {
try {
channel.basicConsume(queueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
consumer.accept(body);
}
});
} catch (IOException e) {
e.printStackTrace();
}
});
private void process_transaction(byte[] object) {
TransactionsBean obj = (TransactionsBean) SerializationUtils.deserialize(object);
System.out.println("!!!! Received id " + obj.getId() + " in database");
}
private void process_api_attempt(byte[] object) {
ApiAttemptsBean obj = (ApiAttemptsBean) SerializationUtils.deserialize(object);
System.out.println("!!!! Received id " + obj.getId() + " in database");
}
private void process_event_logs(byte[] object) {
EventLogsBean obj = (EventLogsBean) SerializationUtils.deserialize(object);
System.out.println("!!!! Received id " + obj.getId() + " in database");
}
But messages are delivered not properly:
11:33:00,783 ERROR [com.rabbitmq.client.impl.ForgivingExceptionHandler] (pool-17-thread-6) Consumer org.database.context.ContextServer$1@6fee6ab4 (amq.ctag-arvcrYNc61cslclCTAnpDQ) method handleDelivery for channel AMQChannel(amqp://[email protected]:5672/,1) threw an exception for channel AMQChannel(amqp://[email protected]:5672/,1): java.lang.ClassCastException: deployment.db.war//plugin.factories.TransactionsBean cannot be cast to deployment.database.war//org.plugin.factories.EventLogsBean
Looks like messages are not properly routed probably because my routing table is not correct.
Can you give me some guide how I can fix this issue?
EDIT: Error stack:
22:19:26,584 ERROR [com.rabbitmq.client.impl.ForgivingExceptionHandler] (pool-19-thread-6) Consumer org.database.context.ContextServer$1@49ee659f (amq.ctag-vjArBDtmtruIgeCMLipHGQ) method handleDelivery for channel AMQChannel(amqp://[email protected]:5672/,1) threw an exception for channel AMQChannel(amqp://[email protected]:5672/,1): java.lang.ClassCastException: deployment.db.war//org.plugin.database.bean.TransactionsBean cannot be cast to deployment.db.war//org.plugin.database.bean.ApiAttemptsBean
at deployment.db.war//org.database.context.ContextServer.process_api_attempt(ContextServer.java:79)
at deployment.db.war//org.database.context.ContextServer$1.handleDelivery(ContextServer.java:64)
at deployment.db.war//com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at deployment.db.war//com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
22:19:26,586 INFO [javax.enterprise.resource.webcontainer.jsf.config] (ServerService Thread Pool -- 111) Initializing Mojarra 2.2.13.SP5 for context '/rest_api'
22:19:26,619 INFO [stdout] (pool-21-thread-6) !!!! Received id 2332 in gateway
22:19:26,667 ERROR [com.rabbitmq.client.impl.ForgivingExceptionHandler] (pool-19-thread-6) Consumer org.database.context.ContextServer$1@29ba98e4 (amq.ctag-RMVncG2xQn3KBJ561F9HNQ) method handleDelivery for channel AMQChannel(amqp://[email protected]:5672/,1) threw an exception for channel AMQChannel(amqp://[email protected]:5672/,1): java.lang.ClassCastException: deployment.db.war//org.plugin.database.bean.TransactionsBean cannot be cast to deployment.db.war//org.plugin.database.bean.EventLogsBean
at deployment.db.war//org.database.context.ContextServer.process_event_logs(ContextServer.java:84)
at deployment.db.war//org.database.context.ContextServer$1.handleDelivery(ContextServer.java:64)
at deployment.db.war//com.rabbitmq.client.impl.ConsumerDispatcher$5.run(ConsumerDispatcher.java:149)
at deployment.db.war//com.rabbitmq.client.impl.ConsumerWorkService$WorkPoolRunnable.run(ConsumerWorkService.java:104)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1135)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:844)
22:19:26,669 INFO [stdout] (pool-21-thread-6) !!!! Received id 111222 in gateway
Upvotes: 4
Views: 198
Reputation: 19445
In the Module Database
you set all queue bindings with the same routing_key: ROUTING_KEY_DATABASE
channel.queueBind(QUEUE_DATABASE_TRANSACTION, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_API_ATTEMPT, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_EVENT_LOGS, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
Probably should be:
channel.queueBind(QUEUE_DATABASE_TRANSACTION, EXCHANGE_DATABASE, ROUTING_KEY_DATABASE);
channel.queueBind(QUEUE_DATABASE_API_ATTEMPT, EXCHANGE_DATABASE, ROUTING_KEY_API_ATTEMPTS);
channel.queueBind(QUEUE_DATABASE_EVENT_LOGS, EXCHANGE_DATABASE, ROUTING_KEY_EVENT_LOGS);
It's just a copy-paste error (-:
Edit
The code now seems good, so a few things to check:
-1-
Did you clear the queues before running the new code (so there is no old messages)
-2-
Try adding breakpoints or at least println in:
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
consumer.accept(body);
}
Try to get some values from the envelope
regarding the message routing key and original exchange.
EDIT 2
I checked it again, by eye everything looks good. The only thing I can give you is to better debug your code to understand what is the issue.
-1-
Did you try to login to the rabbitMQ UI to monitor the messages?
-2-
Try to add better logs, for instance, add the functino name in the println:
System.out.println("!!!! Received id " + obj.getId() + " in database");
changed with
System.out.println("process_transaction: Received id " + obj.getId() + " in database");
And do that inside all the function to know by the message where exactly you are.
-3-
From the envelope, you can envelope.getExchange()
but also envelope. getRoutingKey()
to see exactly how the message was called.
Upvotes: 1