KnowOrDie
KnowOrDie

Reputation: 1

Memory leak issue in MQ resident process consumer service

I recently developed a consumer service integrated with Amazon Cloud Queue Service. It runs as a persistent process and automatically reconnects after disconnection. The service is implemented within the Laravel framework, using AMQP version 2.8 and PHP version 7.4.

After deploying it to the testing environment, I noticed frequent memory exhaustion issues. Could you please help identify any obvious problems with my service? Below is the key portion of the code (with business logic removed for simplicity).


<?php

namespace App\Services;

use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Wire\AMQPTable;

class ConsumerService
{
    protected $connection;
    protected $channel;
    protected $isConnected = false;
    protected $retryInterval = 60;
    protected $maxRetryAttempts = 10;
    protected $retryAttempts = 0;
    protected $host;
    protected $port;
    protected $user;
    protected $pass;
    protected $vhost;
    const WAIT_BEFORE_RECONNECT_uS = 1000000;
    const MAX_RETRY_ATTEMPTS = 3;
    
    protected $queues = [
        'HCP_INFO_UPDATE_QUEUE',
        'DEALER_INFO_UPDATE_QUEUE',
        'HCP_INFO_VERIFY_QUEUE',
        'EMPLOYEE_UPDATE_QUEUE',
        'USER_UNBIND_QUEUE',
    ];

    protected $exchanges = [
        'DEALER_INFO_UPDATE_QUEUE' => 'DEALER_INFO_UPDATE_EXCHANGE',
        'HCP_INFO_VERIFY_QUEUE' => 'HCP_INFO_VERIFY_EXCHANGE',
        'HCP_INFO_UPDATE_QUEUE' => 'HCP_INFO_UPDATE_EXCHANGE',
        'EMPLOYEE_UPDATE_QUEUE' => 'EMPLOYEE_UPDATE_EXCHANGE',
        'USER_UNBIND_QUEUE' => 'USER_UNBIND_EXCHANGE',
    ];

    public function createConnection()
    {
        while ($this->retryAttempts < $this->maxRetryAttempts) {
            try {
                $this->connection = new AMQPSSLConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost, ['verify_peer' => true, 'heartbeat' => 60, 'read_write_timeout' => 60]);
                $this->isConnected = true;
                $this->retryAttempts = 0;
                return;
            } catch (\Exception $e) {
                $this->retryAttempts++;
                sleep($this->retryInterval);
            }
        }
        exit(1);
    }

    public function listenToQueues()
    {
        register_shutdown_function([$this, 'shutdown']);
        while (true) {
            try {
                $this->createConnection();
                $this->consumeConnections();
            } catch (AMQPRuntimeException $e) {
                $this->cleanUpConnection($this->connection);
                usleep(self::WAIT_BEFORE_RECONNECT_uS);
            } catch (AMQPIOException $e) {
                $this->cleanUpConnection($this->connection);
                usleep(self::WAIT_BEFORE_RECONNECT_uS);
            } catch (\RuntimeException $e) {
                $this->cleanUpConnection($this->connection);
                usleep(self::WAIT_BEFORE_RECONNECT_uS);
            } catch (\ErrorException $e) {
                $this->cleanUpConnection($this->connection);
                usleep(self::WAIT_BEFORE_RECONNECT_uS);
            }
        }
    }

    public function consumeConnections() // entry
    {
        $this->channel = $this->connection->channel();
        $maxAttempts = self::MAX_RETRY_ATTEMPTS;
        foreach ($this->queues as $queue) {
            $consumerTag = $queue . 'consumer';
            $exchangeName = $this->exchanges[$queue];
            $this->channel->exchange_declare($exchangeName, 'fanout', false, true, false);
            $this->channel->queue_declare($queue, false, true, false, false);
            $this->channel->queue_bind($queue, $exchangeName);
            $this->channel->basic_consume($queue, $consumerTag, false, false, false, false, function ($message) use ($queue, $maxAttempts) {
                $result = $this->processMessage($queue, $message);
                if ($result) {
                    $this->channel->basic_ack($message->delivery_info['delivery_tag']);
                } else {
                    $headers = $message->get('application_headers');
                    $retryCount = $headers && isset($headers->getNativeData()['retry_count']) ? (int)$headers->getNativeData()['retry_count'] : 0;
                    if ($retryCount < $maxAttempts) {
                        $retryCount++;
                        $newHeaders = array_merge($headers ? $headers->getNativeData() : [], ['retry_count' => $retryCount]);
                        $message->set('application_headers', new AMQPTable($newHeaders));
                        $this->channel->basic_publish($message, $this->exchanges[$queue]);
                        $this->channel->basic_nack($message->delivery_info['delivery_tag']);
                    } else {
                        $this->channel->basic_nack($message->delivery_info['delivery_tag']);
                    }
                }
            });
        }
        while (count($this->channel->callbacks)) {
            $this->channel->wait(null, false, $this->retryInterval);
        }
    }

    public function processMessage($queue, $message)
    {
        return true;// do something with the message
    }

    public function cleanUpConnection($connection)
    {
        try {
            if ($this->channel !== null) {
                $this->channel->close();
            }
            if ($connection !== null) {
                $connection->close();
            }
        } catch (\ErrorException $e) {
        }
    }

    public function shutdown()
    {
        if ($this->connection !== null) {
            $this->connection->close();
        }
    }
}

Upvotes: 0

Views: 25

Answers (0)

Related Questions