Mihael Keehl
Mihael Keehl

Reputation: 355

Is it possible to reload a class into PHP?

The objective is to continually collect data of the current temperature. But a separate process should analyse the output of that data because I have to tweak the algorithm a lot but I want to avoid downtime so stopping the process is a no-go.

The problem is when I separate these processes, that process 2 would either continually have to make calls to the database or read from a local file to do something with the output generated by 1 process but I want to act upon it immediately and that is expensive in terms of resources.

Would it be possible to reload the class into memory somehow when the file changes by for example writing a function that keeps calculating the MD5 of the file, and if it changes than reload the class somehow? So this separate class should act as a plugin. Is there any way to make that work?

Upvotes: 1

Views: 1115

Answers (1)

ryantxr
ryantxr

Reputation: 4219

Here is a possible solution. Use Beanstalk (https://github.com/kr/beanstalkd).

PHP class to talk to Beanstalk (https://github.com/pda/pheanstalk)

Run beanstalk.

Create a process that goes into an infinite loop that reads from a Beanstalk queue. (Beanstalk queues are called "Tubes"). PHP processes are not meant to be run for a very long time. The main reason for this is memory. The easiest way to do handle this is to restart the process every once in a while or if memory gets to a certain threshold.

NOTE: What I do is to have the process exit after some fixed time or if it uses a certain amount of memory. Then, I use Supervisor to restart it.

You can put data into Beanstalk as JSON and decode it on the receiving end. The sending and receiving processes need to agree on that format. You could store your work payload in a database and just send the primary key in the queue.

Here is some code you can use:

class BeanstalkClient extends AbstractBaseQueue{
    public $queue;
    public $host;
    public $port;
    public $timeout;

    function __construct($timeout=null) {
        $this->loadClasses();
        $this->host = '127.0.0.1';
        $this->port = BEANSTALK_PORT;
        $this->timeout = 30;
        $this->connect();
    }
    public function connect(){
        $this->queue = new \Pheanstalk\Pheanstalk($this->host, $this->port);
    }
    public function publish($tube, $data, $delay){
        $payload = $this->encodeData($data);
        $this->queue->useTube($tube)->put($payload,
                        \Pheanstalk\PheanstalkInterface::DEFAULT_PRIORITY, $delay);

    }
    public function waitForMessages($tube, $callback=null){
        if ( $this->timeout ) {
            return $this->queue->watchOnly($tube)->reserve($this->timeout);
        }
        return $this->queue->watchOnly($tube)->reserve();
    }

    public function delete($message){
        $this->queue->delete($message);
    }

    public function encodeData($data){
        $payload = json_encode($data);
        return $payload;
    }

    public function decodeData($encodedData) {
        return json_decode($encodedData, true);
    }

    public function getData($message){
        if ( is_string($message) ) {
            throw new Exception('message is a string');
        }
        return json_decode($message->getData(), true);
    }
}



abstract class BaseQueueProcess {
    protected $channelName = ''; // child class should set this
    // The queue object
    public $queue = null;
    public $processId = null; // this is the system process id
    public $name = null;
    public $status = null;

    public function initialize() {
        $this->processId = getmypid();
        $this->name = get_called_class();
        $this->endTime = time() + (2 * 60 * 60); // restart every hour
        // seconds to timeout when waiting for a message
        // if the process isn't doing anything, timeout so they have a chance to do housekeeping.
        $queueTimeout = 900; 
        if ( empty($this->queue) ) {
            $this->queue = new BeanstalkClient($queueTimeout);
        }   
    }

    public function receiveMessage($queueMessage) {
        $taskData = $this->queue->getData($queueMessage);
        // debuglog(' Task Data = ' . print_r($taskData, true));
        if ( $this->validateTaskData($taskData) ) {
            // process the message
            $good = $this->didReceiveMessage($taskData);
            if ( $good !== false ) {
                // debuglog("Completing task {$this->taskId}");
                $this->completeTask($queueMessage);
            }
            else {
                $this->failTask($queueMessage);
            }
        }
        else {
            // Handle bad message
            $this->queue->delete($queueMessage);
        }
    }

    public function run() {
        $this->processName = $this->channelName;
        // debuglog('Start ' . $this->processName);
        // debuglog(print_r($this->params, true));
        while(1) {
            $queueMessage = $this->queue->waitForMessages($this->channelName);

            if ( ! empty($queueMessage) ) {
                $this->receiveMessage($queueMessage);
            } 
            else {
                // empty message
                // a timeout
                // // debuglog("empty message " . get_called_class());
            }
            $memory = memory_get_usage();

            if( $memory > 20000000  ) {
                // debuglog('Exit '.get_called_class().' due to memory. Memory:'. ($memory/1024/1024).' MB');
                // Supervisor will restart process.
                exit;
            }
            elseif ( time() > $this->endTime ) {
                // debuglog('Exit '.get_called_class().' due to time.');
                // Supervisor will restart process.
                exit;
            }
            usleep(10);

        }
    }


    public function completeTask($queueMessage) {
        // 
        $this->queue->delete($queueMessage);
    }

    public function failTask($queueMessage) {
        //
        $this->queue->delete($queueMessage);
    }
}

class MyProcess extends BaseQueueProcess {
    public function initialize() {
        $this->channelName = 'Temperature';
        parent::initialize();
    }

    public function didReceiveMessage($taskData) {
        // debuglog(print_r($taskData, true));

        // process data here
        // return false if something went wrong

        return true;
    }
}


//Sender

class WorkSender {

    const TubeName = 'Temperature';
    const TubeDelay = 0; // Set delay to 0, i.e. don't use a delay.

    function send($data) {
        $c = BeanstalkClient();
        $c->publish(self::TubeName, $data, self::TubeDelay);
    }

}

Upvotes: 1

Related Questions