php process pool parallel
<?php
// Depends on the symfony/process
// @see http://symfony.com/doc/current/components/process.html
use Symfony\Component\Process\Process;
//Usage:
$pool = new ProcessPool(
'php child.php',
[
'one',
'two',
'three',
],
6
);
$pool->run(function ($arg, $result) {
echo "{$arg}: {$result}";
});
<?php
namespace Plariumed\Utils;
/**
* Depends on the symfony/process
* @see http://symfony.com/doc/current/components/process.html
*/
use Symfony\Component\Process\Process;
/**
* Class ProcessPool
* <code>
* $pool = new ProcessPool(
* 'php child.php', // command
* [ // arguments
* 'one',
* 'two',
* 'three',
* ],
* 6 // number processes (running at same time)
* );
*
* $pool->run(function ($arg, $result) {
* echo "{$arg}: {$result}";
* });
* </code>
*/
class ProcessPool
{
/**
* Number of processes that running at same time
* @var int
*/
protected $numProcesses;
/**
* Command to run (php run_child.php)
* @var string
*/
protected $command;
/**
* Array of tasks to pass to command
*
* $commands = ['one', 'two', 'three']
*
* php run_child.php one
* php run_child.php two
* php run_child.php three
* @var array
*/
protected $arguments;
/**
* Current processes
* (Now running)
*
* [
* ['process' => Process, 'argument' => Argument that process running with],
* ...
* ]
*
* @var array
*/
protected $processes;
/**
* milliseconds
* sleep between ticks
* @var int
*/
protected $tick;
/**
* @param string $command Example: php run_child.php
* @param array $arguments this params will be passed to command
* @param int $numProcesses number of running processes at same time
* @param int $tick sleep between ticks
*/
public function __construct($command, array $arguments, $numProcesses = 3, $tick = 100)
{
$this->tick = $tick;
$this->command = $command;
$this->arguments = $arguments;
$this->numProcesses = $numProcesses;
}
/**
* @param callable $callback called when process is done
*/
public function run(callable $callback = null)
{
for ($i = 0; $i < $this->numProcesses; $i++) {
$argument = array_shift($this->arguments);
if ($argument) {
$this->startProcess($argument);
}
}
while (count($this->processes)) {
/**
* @var Process[] $process
*/
foreach ($this->processes as $key => $process) {
if (!$process['process']->isRunning()) {
if (is_callable($callback)) {
$callback($process['argument'], $process['process']->getOutput());
}
unset($this->processes[$key]);
$argument = array_shift($this->arguments);
if ($argument) {
$this->startProcess($argument);
}
}
}
usleep($this->tick);
}
}
/**
* @param string $argument
*/
protected function startProcess($argument)
{
$process = new Process("{$this->command} {$argument}");
$process->start();
$this->processes[] = [
'argument' => $argument,
'process' => $process
];
}
}
<?php
$child_one = popen('php child.php first', 'r');
$child_two = popen('php child.php second', 'r');
$child_three = popen('php child.php third', 'r');
echo "Childs are running\n";
$response = stream_get_contents($child_one);
$response .= stream_get_contents($child_two);
$response .= stream_get_contents($child_three);
echo $response;
<?php
$arg = isset($argv[1]) ? $argv[1] : 'undefined';
$sleep = [
'one' => 3,
'two' => 2,
'three' => 1,
'six' => 4
];
$toSleep = isset($sleep[$arg]) ? $sleep[$arg] : 0;
sleep($toSleep);
echo "ok {$argv[1]}\n";