Reputation: 1
I recently developed a consumer service integrated with Amazon Cloud Queue Service. It runs as a persistent process and automatically reconnects after disconnection. The service is implemented within the Laravel framework, using AMQP version 2.8 and PHP version 7.4.
After deploying it to the testing environment, I noticed frequent memory exhaustion issues. Could you please help identify any obvious problems with my service? Below is the key portion of the code (with business logic removed for simplicity).
<?php
namespace App\Services;
use PhpAmqpLib\Connection\AMQPSSLConnection;
use PhpAmqpLib\Exception\AMQPIOException;
use PhpAmqpLib\Exception\AMQPRuntimeException;
use PhpAmqpLib\Wire\AMQPTable;
class ConsumerService
{
protected $connection;
protected $channel;
protected $isConnected = false;
protected $retryInterval = 60;
protected $maxRetryAttempts = 10;
protected $retryAttempts = 0;
protected $host;
protected $port;
protected $user;
protected $pass;
protected $vhost;
const WAIT_BEFORE_RECONNECT_uS = 1000000;
const MAX_RETRY_ATTEMPTS = 3;
protected $queues = [
'HCP_INFO_UPDATE_QUEUE',
'DEALER_INFO_UPDATE_QUEUE',
'HCP_INFO_VERIFY_QUEUE',
'EMPLOYEE_UPDATE_QUEUE',
'USER_UNBIND_QUEUE',
];
protected $exchanges = [
'DEALER_INFO_UPDATE_QUEUE' => 'DEALER_INFO_UPDATE_EXCHANGE',
'HCP_INFO_VERIFY_QUEUE' => 'HCP_INFO_VERIFY_EXCHANGE',
'HCP_INFO_UPDATE_QUEUE' => 'HCP_INFO_UPDATE_EXCHANGE',
'EMPLOYEE_UPDATE_QUEUE' => 'EMPLOYEE_UPDATE_EXCHANGE',
'USER_UNBIND_QUEUE' => 'USER_UNBIND_EXCHANGE',
];
public function createConnection()
{
while ($this->retryAttempts < $this->maxRetryAttempts) {
try {
$this->connection = new AMQPSSLConnection($this->host, $this->port, $this->user, $this->pass, $this->vhost, ['verify_peer' => true, 'heartbeat' => 60, 'read_write_timeout' => 60]);
$this->isConnected = true;
$this->retryAttempts = 0;
return;
} catch (\Exception $e) {
$this->retryAttempts++;
sleep($this->retryInterval);
}
}
exit(1);
}
public function listenToQueues()
{
register_shutdown_function([$this, 'shutdown']);
while (true) {
try {
$this->createConnection();
$this->consumeConnections();
} catch (AMQPRuntimeException $e) {
$this->cleanUpConnection($this->connection);
usleep(self::WAIT_BEFORE_RECONNECT_uS);
} catch (AMQPIOException $e) {
$this->cleanUpConnection($this->connection);
usleep(self::WAIT_BEFORE_RECONNECT_uS);
} catch (\RuntimeException $e) {
$this->cleanUpConnection($this->connection);
usleep(self::WAIT_BEFORE_RECONNECT_uS);
} catch (\ErrorException $e) {
$this->cleanUpConnection($this->connection);
usleep(self::WAIT_BEFORE_RECONNECT_uS);
}
}
}
public function consumeConnections() // entry
{
$this->channel = $this->connection->channel();
$maxAttempts = self::MAX_RETRY_ATTEMPTS;
foreach ($this->queues as $queue) {
$consumerTag = $queue . 'consumer';
$exchangeName = $this->exchanges[$queue];
$this->channel->exchange_declare($exchangeName, 'fanout', false, true, false);
$this->channel->queue_declare($queue, false, true, false, false);
$this->channel->queue_bind($queue, $exchangeName);
$this->channel->basic_consume($queue, $consumerTag, false, false, false, false, function ($message) use ($queue, $maxAttempts) {
$result = $this->processMessage($queue, $message);
if ($result) {
$this->channel->basic_ack($message->delivery_info['delivery_tag']);
} else {
$headers = $message->get('application_headers');
$retryCount = $headers && isset($headers->getNativeData()['retry_count']) ? (int)$headers->getNativeData()['retry_count'] : 0;
if ($retryCount < $maxAttempts) {
$retryCount++;
$newHeaders = array_merge($headers ? $headers->getNativeData() : [], ['retry_count' => $retryCount]);
$message->set('application_headers', new AMQPTable($newHeaders));
$this->channel->basic_publish($message, $this->exchanges[$queue]);
$this->channel->basic_nack($message->delivery_info['delivery_tag']);
} else {
$this->channel->basic_nack($message->delivery_info['delivery_tag']);
}
}
});
}
while (count($this->channel->callbacks)) {
$this->channel->wait(null, false, $this->retryInterval);
}
}
public function processMessage($queue, $message)
{
return true;// do something with the message
}
public function cleanUpConnection($connection)
{
try {
if ($this->channel !== null) {
$this->channel->close();
}
if ($connection !== null) {
$connection->close();
}
} catch (\ErrorException $e) {
}
}
public function shutdown()
{
if ($this->connection !== null) {
$this->connection->close();
}
}
}
Upvotes: 0
Views: 25