Tom
Tom

Reputation: 2873

run big loop with parallel threads in PHP CLI

I have a computation-expensive backend process in Symfony2 / PHP that I would like to run multi-threaded.

Since I iterate over thousands of objects, I think I shouldn't start one thread per object. I would like to have a $cores variable that defines how many threads I want in parallel, then iterate through the loop and keep that many threads running. So every time a thread finishes, a new one with the next object should be started, until all objects are done.

Looking at the pthreads documentation and doing some Google searches, I can't find a useable example for this situation. All examples I found have a fixed number of threads they run once, none of them iterate over thousands of objects.

Can someone point me into the right direction to get started? I understand the basics of setting up a thread and joining it, etc. but not how to do it in a loop with a wait condition.

Upvotes: 6

Views: 6215

Answers (2)

Tom
Tom

Reputation: 2873

Joe has a good approach, but I found a different solution elsewhere that I am now using. Basically, I have two commands, one control and one worker command. The control command starts background processes and checks their results:

protected function process($worker, $entity, $timeout=60) {
    $min = $this->em->createQuery('SELECT MIN(e.id) FROM BM2SiteBundle:'.$entity.' e')->getSingleScalarResult();
    $max = $this->em->createQuery('SELECT MAX(e.id) FROM BM2SiteBundle:'.$entity.' e')->getSingleScalarResult();

    $batch_size = ceil((($max-$min)+1)/$this->parallel);
    $pool = array();
    for ($i=$min; $i<=$max; $i+=$batch_size) {
        $builder = new ProcessBuilder();
        $builder->setPrefix($this->getApplication()->getKernel()->getRootDir().'/console');
        $builder->setArguments(array(
            '--env='.$this->getApplication()->getKernel()->getEnvironment(),
            'maf:worker:'.$worker,
            $i, $i+$batch_size-1
            ));
        $builder->setTimeout($timeout);

        $process = $builder->getProcess();
        $process->start();
        $pool[] = $process;
    }
    $this->output->writeln($worker.": started ".count($pool)." jobs");
    $running = 99;
    while ($running > 0) {
        $running = 0;
        foreach ($pool as $p) {
            if ($p->isRunning()) {
                $running++;
            }
        }
        usleep(250);
    }

    foreach ($pool as $p) {
        if (!$p->isSuccessful()) {
            $this->output->writeln('fail: '.$p->getExitCode().' / '.$p->getCommandLine());
            $this->output->writeln($p->getOutput());
        }
    }

}

where $this->parallel is a variable I set to 6 on my 8 core machine, it signifies the number of processes to start. Note that this method requires that I iterate over a specific entity (it splits by that), which is always true in my use cases.

It's not perfect, but it starts completely new processes instead of threads, which I consider the better solution.

The worker command takes min and max ID numbers and does the actual work for the set between those two.

This approach works as long as the data set is reasonably well distributed. If you have no data in the 1-1000 range but every ID between 1000 and 2000 is used, the first three processes would have nothing to do.

Upvotes: 1

Joe Watkins
Joe Watkins

Reputation: 17158

The answer to the question is use Pool and Worker abstraction.

The basic idea is that you ::submit Threaded objects to the Pool, which it stacks onto the next available Worker, distributing your Threaded objects (round robin) across all Workers.

Follows is super simple code is for PHP7 (pthreads v3):

<?php
$jobs = [];
while (count($jobs) < 2000) {
    $jobs[] = mt_rand(0, 1999);
}

$pool = new Pool(8);

foreach ($jobs as $job) {
    $pool->submit(new class($job) extends Threaded {
        public function __construct(int $job) {
            $this->job = $job;
        }
        public function run() {
            var_dump($this->job);
        }
    });
}

$pool->shutdown();
?>

The jobs are pointless, obviously. In the real world, I guess your $jobs array keeps growing, so you can just swap foreach for some do {} while, and keep calling ::submit for new jobs.

In the real world, you will want to collect garbage in the same loop (just call Pool::collect with no parameters for default behaviour).

Noteworthy, none of this would be possible if it really were the case that PHP wasn't intended to work in multi-threaded environments ... it definitely is.

That is the answer to the question, but it doesn't make it the best solution to your problem.

You have mentioned in comments that you assume 8 threads executing Symfony code will take up less memory than 8 processes. This is not the case, PHP is shared nothing, all the time. You can expect 8 Symfony threads to take up as much memory as 8 Symfony processes, in fact, a little bit more. The benefit of using threads over processes is that they can communicate, synchronize and (appear to) share with each other.

Just because you can, doesn't mean you should. The best solution for the task at hand is probably to use some ready made package or software intended to do what is required.

Studying this stuff well enough to implement a robust solution is something that will take a long time, and you wouldn't want to deploy that first solution ...

If you decide to ignore my advice, and give it a go, you can find many examples in the github repository for pthreads.

Upvotes: 4

Related Questions