Anthony
Anthony

Reputation: 5433

PHP process forking with Pheanstalk

I'm trying to create a PHP script that runs in the background and forks child processes. (I'm aware that could explode the server; there are extra safeguards in place that are outside the scope of this question)

In a nutshell, the code works like this:

$pids = array();
$ok = true;
while ($ok)
{
    $job = $pheanstalk->watch('jobs')->ignore('default')->reserve();
    echo 'Reserved job #' . $job->getId();
    $pids[$job->getId()] = pcntl_fork();
    if(!$pids[$job->getId()]) {
       doStuff();
       $pheanstalk->delete($job);
       exit();
    }
}

The problem is that once I fork the process, I get the error:

Reserved job #0
PHP Fatal error:  Uncaught exception 'Pheanstalk_Exception_ServerException' with message 'Cannot delete job 0: NOT_FOUND'

My question is, how is it that pheanstalk returned a job with no ID and no payload? It almost feels like $pheanstalk is damaged once I fork it. If I remove the forking, everything works fine. (Though it has to wait on each process)

Upvotes: 1

Views: 649

Answers (2)

Jonathan
Jonathan

Reputation: 2877

The reason you're having this problem, is that the job is reserved by the main process. After you call pcntl_fork() there is actually a copy of the $worker variable and thus the main process has a lock on the job, and the 2nd job when it tries to delete it says it doesn't exist (or in this case it's reserved by another process). The code below deals with it by creating a new worker and then releases the job on the main worker, and attempts to pick it up on the 2nd worker.

# worker for the main process
$worker = new \Pheanstalk\Pheanstalk($host, $port, $timeout, $persistent);

$pid = -1;

# seek out new jobs
while ($job = $worker->watch('queue_caller')->reserve()) {

    # make sure pcntl is installed & enabled
    if (function_exists('pcntl_fork')) {
        # create a child process
        $pid = pcntl_fork();
    }

    if ($pid === -1) {
        # code will run in single threaded mode
    } elseif ($pid !== 0) {
        # parent process
        # release the job so it can be picked up by the fork
        $worker->release($job);

        # short wait (1/20000th second) to ensure the fork executes first
        # adjust this value to what is appropriate for your environment
        usleep(50);

        # clear out zombie processes after they're completed
        pcntl_waitpid(0, $pidStatus, WNOHANG);

        # go back to looking for jobs
        continue;
    } else {
        # child worker is needed, because it has to own the job to delete it
        /** @var Pheanstalk $worker */
        $worker = new \Pheanstalk\Pheanstalk($host, $port, $timeout, $persistent);

        # only look for jobs for 1 second, in theory it should always find something
        $job = $worker->watch('queue_caller')->reserve(1);

        if (false === $job) {
            # for some reason there is no job
            # terminate the child process with an error code
            exit(1);
        }

    }

    /** Start your code **/

    do_something();

    /** End your code **/

    # delete the job from the queue        
    $worker->delete($job);

    # only terminate if it's the child process 
    if ($pid === 0) {
        # terminate the child process with success code
        exit(0);
    }

}

Upvotes: 1

Prahlad Yeri
Prahlad Yeri

Reputation: 3663

Put this if condition before you delete the pheanstalk job:

if ($job) $pheanstalk->delete($job);

Thats because its very much possible that another php instance of your file has already removed that job before code reaches this place. (The other instance could still retrieve this job using reserve() until the job is deleted from the queue.

Upvotes: 1

Related Questions