
Reputation: 1

Memory leak issue in MQ resident process consumer service

I recently developed a consumer service integrated with Amazon Cloud Queue Service. It runs as a persistent process and automatically reconnects after disconnection. The service is implemented within the Laravel framework, using AMQP version 2.8 and PHP version 7.4.

After deploying it to the testing environment, I noticed frequent memory exhaustion issues. Could you please help identify any obvious problems with my service? Below is the key portion of the code (with business logic removed for simplicity).


namespace App\Services;

use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Wire\AMQPTable;

class ConsumerService
    protected $connection;
    protected $channel;
    protected $isConnected = false;
    protected $retryInterval = 60;
    protected $maxRetryAttempts = 10;
    protected $retryAttempts = 0;
    protected $host;
    protected $port;
    protected $user;
    protected $pass;
    protected $vhost;
    const WAIT_BEFORE_RECONNECT_uS = 1000000;
    const MAX_RETRY_ATTEMPTS = 3;
    protected $queues = [

    protected $exchanges = [

    public function createConnection()
        while ($this->retryAttempts < $this->maxRetryAttempts) {
            try {
                $this->connection = new AMQPSSLConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost, ['verify_peer' => true, 'heartbeat' => 60, 'read_write_timeout' => 60]);
                $this->isConnected = true;
                $this->retryAttempts = 0;
            } catch (\Exception $e) {

    public function listenToQueues()
        register_shutdown_function([$this, 'shutdown']);
        while (true) {
            try {
            } catch (AMQPRuntimeException $e) {
            } catch (AMQPIOException $e) {
            } catch (\RuntimeException $e) {
            } catch (\ErrorException $e) {

    public function consumeConnections() // entry
        $this->channel = $this->connection->channel();
        $maxAttempts = self::MAX_RETRY_ATTEMPTS;
        foreach ($this->queues as $queue) {
            $consumerTag = $queue . 'consumer';
            $exchangeName = $this->exchanges[$queue];
            $this->channel->exchange_declare($exchangeName, 'fanout', false, true, false);
            $this->channel->queue_declare($queue, false, true, false, false);
            $this->channel->queue_bind($queue, $exchangeName);
            $this->channel->basic_consume($queue, $consumerTag, false, false, false, false, function ($message) use ($queue, $maxAttempts) {
                $result = $this->processMessage($queue, $message);
                if ($result) {
                } else {
                    $headers = $message->get('application_headers');
                    $retryCount = $headers && isset($headers->getNativeData()['retry_count']) ? (int)$headers->getNativeData()['retry_count'] : 0;
                    if ($retryCount < $maxAttempts) {
                        $newHeaders = array_merge($headers ? $headers->getNativeData() : [], ['retry_count' => $retryCount]);
                        $message->set('application_headers', new AMQPTable($newHeaders));
                        $this->channel->basic_publish($message, $this->exchanges[$queue]);
                    } else {
        while (count($this->channel->callbacks)) {
            $this->channel->wait(null, false, $this->retryInterval);

    public function processMessage($queue, $message)
        return true;// do something with the message

    public function cleanUpConnection($connection)
        try {
            if ($this->channel !== null) {
            if ($connection !== null) {
        } catch (\ErrorException $e) {

    public function shutdown()
        if ($this->connection !== null) {

Upvotes: 0

Views: 25

Answers (0)

Related Questions