NeonHead
NeonHead

Reputation: 51

What is the meaning of $channel->wait() in RabbitMQ

I am quite new in RabbitMQ. I'm working with php-amqplib library with codeigniter, and still wondering about some knowledge which I am lacking.

Like In a situation when one user of my project wants to broadcast new campaign to 100k leads, the second user gets effected if he has some 100 mails to be sent, The second has to wait for 100k mails to get delivered first then the last user gets his turn.

I need a solution for Concurrent Consumers, who works smoothly without affecting the other

Here is my code snippet:

public function campaign2(){
        $this->load->library('mylibrary');
        for( $i=1;$i<=5;$i++ ) {
            $url = "http://localhost/myproject/rabbit/waiting";
            $param = array('index' => $i);
            $this->waiting($i);
        }          
}

public function waiting($i)
    {
        ini_set('memory_limit','400M');
        ini_set('max_execution_time', 0);
        ini_set('display_errors', 1);

        ${'conn_'.$i} = connectRabbit();
        ${'channel_'.$i} = ${'conn_'.$i}->channel();
        ${'channel_'.$i}->exchange_declare('ha-local-campaign-'.$i.'-exchange', 'fanout', false, true, false);
        $q    = populateQueueName('campaign-'.$i);
        ${'channel_'.$i}->queue_declare($q, false, true, false, false); 
        ${'channel_'.$i}->queue_bind($q, 'ha-local-campaign-'.$i.'-exchange', 'priority.'.$i);
        $consumer_tag = 'campaign_consumer' ;
        function process_message($msg) {
            echo 'Mail Sent';
            $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
        }
        function shutdown($channel, $conn){
            echo '['.date('H:i:s').'] Campaign consumer - Shutdown!!'; 
        }

        ${'channel_'.$i}->basic_consume($q, $consumer_tag, false, false, true, false,'process_message');
        while(1) {
           ${'channel_'.$i}->wait();
        }
        register_shutdown_function('shutdown', ${'channel_'.$i}, ${'conn_'.$i});  
    }

If anyone kindly guide me through the process I will be grateful.

Upvotes: 5

Views: 5722

Answers (1)

yivi
yivi

Reputation: 47370

When you call $channel->wait() you are:

  • Inspecting the channel's queues to see if there are pending messages.

  • For each message you are going to call the registered callback for the corresponding channel's callback.

From the "hello world example", step by step::

// First, you define `$callback` as a function receiving
// one parameter (the _message_).
$callback = function($msg) {
  echo " [x] Received ", $msg->body, "\n";
};

// Then, you assign `$callback` the the "hello" queue.
$channel->basic_consume('hello', '', false, true, false, false, $callback);

// Finally: While I have any callbacks defined for the channel, 
while(count($channel->callbacks)) {
    // inspect the queue and call the corresponding callbacks
    //passing the message as a parameter
    $channel->wait();
}
// This is an infinite loop: if there are any callbacks,
// it'll run forever unless you interrupt script's execution.

Have your second user send use a different queue. You can have as many queues as you want.

Upvotes: 4

Related Questions