Reputation: 41
My RabbitMQ consumer has to process messages in fixed size batches with a prefetch count of 15. This is limited to 15 in order to match AWS SES email send rate - the consumer process has to issue SES API requests in parallel. What would be the best way of dealing with a partial final batch, i.e. one that's left with fewer than 15 messages.
Any new messages are added to the consumer's batch array. When the batch size is achieved, the batch is processed and an acknowledgement is sent back for all messages in the batch.
The 'leftover messages' in the final batch must be processed in my scenario before the 10 second connection timeout comes into effect. Is there a way to implement a timeout on the callback function so that, if for a certain period the number of messages received is less than the prefetch count, the remaining messages in consumer's temporary batch array are processed and acknowledged. It's worth mentioning that no new messages are published to the queue while the consumer process is executing. Thanks in advance.
$connection = new AMQPStreamConnection(HOST, PORT, USER, PASS);
$channel = $connection->channel();
$channel->queue_declare('test_queue', true, false, false, false);
$channel->basic_qos(null, 15, null);
$callback = function($message){
Ack_Globals::$msg_batch[] = $message;
Ack_Globals::$msg_count++;
$time_diff = Ack_Globals::$cur_time-Ack_Globals::$lbatch_recieved;
if(sizeof(Ack_Globals::$msg_batch) >= 15){
$time_end = microtime(true);
Ack_Globals::$lbatch_recieved = $time_end;
Ack_Globals::$batch_count = Ack_Globals::$batch_count + 1;
//Calculate the average time to create this batch
Ack_Globals::$bgen_time_avg = ($time_end - Ack_Globals::$time_start)/Ack_Globals::$batch_count;
//Process this batch
/* Process */
//Acknowledge this batch
$message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag'], true);
echo "\nMessage Count: ".Ack_Globals::$msg_count;
echo "\nSize of Array: ".sizeof(Ack_Globals::$msg_batch);
echo "\nLast batch received: ".Ack_Globals::$lbatch_recieved;
echo "\nBatch ".Ack_Globals::$batch_count." processed.";
echo "\nAverage batch generation time: ". Ack_Globals::$bgen_time_avg;
//Clear the batch array
Ack_Globals::$msg_batch = array();
}else{}
};
if ((Ack_Globals::$batch_count === 0) && (Ack_Globals::$msg_count === 0)){
//initialise the timer
Ack_Globals::$time_start = microtime(true);
}
$channel->basic_consume('int_surveys', '', false, false, false, false, $callback);
while ($channel->is_consuming()){
Ack_Globals::$cur_time = AMQPChannel::$current_time;
$channel->wait(null, false, 10);
}
$channel->close();
$connection->close();
Upvotes: 2
Views: 1211