Reputation: 420
I am using PThreads for multi-threading in php. I have installed and run it successfully on my XAMPP server on windows. I have 100K records in my database and I want to run 20 threads in parallel.Every thread will call 5k record from database and process them.Here is my code for this
require('mailscript.php');
class My extends Thread{
function __construct() {
$this->mailscript = new mailscript();
}
function run(){
$this->mailscript->runMailScript(5000);
}
}
for($i=0;$i<20;$i++){
$pool[] = new My();
}
foreach($pool as $worker){
$worker->start();
}
foreach($pool as $worker){
$worker->join();
}
When I run this code it only run for approx 600 records per thread maximum.Is there any limit issue for number of threads in PThreads. What is the issue please help me
Upvotes: 2
Views: 237
Reputation: 31
hie, here is how I would handle pthreads with your example with a collection method to be used if nescessary. Hope this will help you.
/*pthreads batches */
$batches = array();
$nbpool = 20; // cpu 10 cores
/* job 1 */
$list = [/* data1 */];
$batches[] = array_chunk($list, 5000);
/* job 2 */
$list2 = [/* data2 */];
$batches[] = array_chunk($list2, 10000);
/*final collected results*/
$resultFinal = [];
/* loop across batches */
foreach ($batches as $key => $chunks) {
/* for intermediate collection */
$data[$key] = [];
/* how many workers */
$workCount = count($chunks);
/* set pool job up to max cpu capabilities */
$pool = new Pool($nbpool, Worker::class);
/* pool cycling submit */
foreach (range(1, $workCount) as $i) {
$chunck = $chunks[$i - 1];
$pool->submit(new processWork($chunck));
}
/* on collection cycling */
$collector = function (\Collectable $work) use (&$data) {
/* is worker complete ? */
$isGarbage = $work->isGarbage();
/* worker complete */
if ($isGarbage) {
$data[$key] = $work->result;
}
return $isGarbage;
};
do {
/* collection on pool stack */
$count = $pool->collect($collector);
$isComplete = count($data) === $workCount;
} while (!$isComplete);
/* push stack results */
array_push($resultFinal, $data);
/* close pool */
$pool->shutdown();
}
class processWork extends \Threaded implements \Collectable {
private $isGarbage;
private $process;
public $result;
public function __construct($process) {
$this->process = $process;
}
public function run() {
$workerDone = array();
foreach ($this->process as $k => $el) {
/* whatever stuff with $this->process */
}
$this->result = $workerDone;
$this->isGarbage = true; // yeah, it s done
}
public function isGarbage(): bool {
return $this->isGarbage;
}
}
Upvotes: 1