Reputation: 3183
I'm trying to add some functionality for rabbitmq with delay messages. Actually I need to get this message after 2 weeks. As I know we do not need any plugin. Also when this message invokes, how should I reschedule new x delay exchanger to invoke again over 2 weeks. Where shoul I added this x delay message.
config
"messageQueue": {
"connectionString": "amqp://guest:guest@localhost:5672?heartbeat=5",
"queueName": "history",
"exchange": {
"type": "headers",
"prefix": "history."
},
"reconnectTimeout": 5000
},
service:
import amqplib from 'amqplib'
import config from 'config'
import logger from './logger'
const {reconnectTimeout, connectionString, exchange: {prefix, type: exchangeType}, queueName} = config.messageQueue
const onConsume = (expectedMessages, channel, onMessage) => async message => {
const {fields: {exchange}, properties: {correlationId, replyTo}, content} = message
logger.silly(`consumed message from ${exchange}`)
const messageTypeName = exchange.substring(exchange.startsWith(prefix) ? prefix.length : 0)
const messageType = expectedMessages[messageTypeName]
if (!messageType) {
logger.warn(`Unexpected message of type ${messageTypeName} received. The service only accepts messages of types `, Object.keys(expectedMessages))
return
}
const deserializedMessage = messageType.decode(content)
const object = deserializedMessage.toJSON()
const result = await onMessage(messageTypeName, object)
if (correlationId && replyTo) {
const {type, response} = result
const encoded = type.encode(response).finish()
channel.publish('', replyTo, encoded, {correlationId})
}
}
const startService = async (expectedMessages, onMessage) => {
const restoreOnFailure = e => {
logger.warn('connection with message bus lost due to error', e)
logger.info(`reconnecting in ${reconnectTimeout} milliseconds`)
setTimeout(() => startService(expectedMessages, onMessage), reconnectTimeout)
}
const exchanges = Object.keys(expectedMessages).map(m => `${prefix}${m}`)
try {
const connection = await amqplib.connect(connectionString)
connection.on('error', restoreOnFailure)
const channel = await connection.createChannel()
const handleConsume = onConsume(expectedMessages, channel, onMessage)
const queue = await channel.assertQueue(queueName)
exchanges.forEach(exchange => {
channel.assertExchange(exchange, exchangeType, {durable: true})
channel.bindQueue(queue.queue, exchange, '')
})
logger.debug(`start listening messages from ${exchanges.join(', ')}`)
channel.consume(queue.queue, handleConsume, {noAck: true})
}
catch (e) {
logger.warn('error while subscribing for messages message', e)
restoreOnFailure(e)
}
}
export default startService
Upvotes: 0
Views: 4274
Reputation: 16177
RabbitMQ has a plug-in for scheduling messages. You can use it, subject to an important design caveat which I explain below.
Use Steps
You must first install it:
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
Then, you have to set up a delayed exchange:
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-delayed-type", "direct");
channel.exchangeDeclare("my-exchange", "x-delayed-message", true, false, args);
Finally, you can set the x-delay
parameter (where delay is in milliseconds).
byte[] messageBodyBytes = "delayed payload".getBytes();
AMQP.BasicProperties.Builder props = new AMQP.BasicProperties.Builder();
headers = new HashMap<String, Object>();
headers.put("x-delay", 5000);
props.headers(headers);
channel.basicPublish("my-exchange", "", props.build(), messageBodyBytes);
Two weeks is equal to (7*24*60*60*1000 = 604,800,000)
milliseconds.
Important Caveat As I explained in this answer, this is a really bad thing to ask the message broker to do.
It's important to keep in mind, when dealing with message queues, they perform a very specific function in a system: to hold messages while the processor(s) are busy processing earlier messages. It is expected that a properly-functioning message queue will deliver messages as soon as reasonable. Basically, the fundamental expectation is that as soon as a message reaches the head of the queue, the next pull on the queue will yield the message -- no delay.
Delay becomes a result of how a system with a queue processes messages. In fact, Little's Law offers some interesting insights into this. If you're going to stick an arbitrary delay in there, you really have no need of a message queue to begin with - all your work is scheduled up front.
So, in a system where a delay is necessary (for example, to join/wait for a parallel operation to complete), you should be looking at other methods. Typically a queryable database would make sense in this particular instance. If you find yourself keeping messages in a queue for a pre-set period of time, you're actually using the message queue as a database - a function it was not designed to provide. Not only is this risky, but it also has a high likelihood of hurting the performance of your message broker.
Upvotes: 4