Overview

Namespaces

  • Net
    • Bazzline
      • Component
        • ProcessForkManager

Classes

  • Net\Bazzline\Component\ProcessForkManager\AbstractTask
  • Net\Bazzline\Component\ProcessForkManager\ForkManager
  • Net\Bazzline\Component\ProcessForkManager\ForkManagerEvent
  • Net\Bazzline\Component\ProcessForkManager\ForkManagerFactory
  • Net\Bazzline\Component\ProcessForkManager\TaskManager

Interfaces

  • Net\Bazzline\Component\ProcessForkManager\ExecutableInterface
  • Net\Bazzline\Component\ProcessForkManager\TaskInterface

Exceptions

  • Net\Bazzline\Component\ProcessForkManager\InvalidArgumentException
  • Net\Bazzline\Component\ProcessForkManager\RuntimeException
  • Overview
  • Namespace
  • Class
  1:   2:   3:   4:   5:   6:   7:   8:   9:  10:  11:  12:  13:  14:  15:  16:  17:  18:  19:  20:  21:  22:  23:  24:  25:  26:  27:  28:  29:  30:  31:  32:  33:  34:  35:  36:  37:  38:  39:  40:  41:  42:  43:  44:  45:  46:  47:  48:  49:  50:  51:  52:  53:  54:  55:  56:  57:  58:  59:  60:  61:  62:  63:  64:  65:  66:  67:  68:  69:  70:  71:  72:  73:  74:  75:  76:  77:  78:  79:  80:  81:  82:  83:  84:  85:  86:  87:  88:  89:  90:  91:  92:  93:  94:  95:  96:  97:  98:  99: 100: 101: 102: 103: 104: 105: 106: 107: 108: 109: 110: 111: 112: 113: 114: 115: 116: 117: 118: 119: 120: 121: 122: 123: 124: 125: 126: 127: 128: 129: 130: 131: 132: 133: 134: 135: 136: 137: 138: 139: 140: 141: 142: 143: 144: 145: 146: 147: 148: 149: 150: 151: 152: 153: 154: 155: 156: 157: 158: 159: 160: 161: 162: 163: 164: 165: 166: 167: 168: 169: 170: 171: 172: 173: 174: 175: 176: 177: 178: 179: 180: 181: 182: 183: 184: 185: 186: 187: 188: 189: 190: 191: 192: 193: 194: 195: 196: 197: 198: 199: 200: 201: 202: 203: 204: 205: 206: 207: 208: 209: 210: 211: 212: 213: 214: 215: 216: 217: 218: 219: 220: 221: 222: 223: 224: 225: 226: 227: 228: 229: 230: 231: 232: 233: 234: 235: 236: 237: 238: 239: 240: 241: 242: 243: 244: 245: 246: 247: 248: 249: 250: 251: 252: 253: 254: 255: 256: 257: 258: 259: 260: 261: 262: 263: 264: 265: 266: 267: 268: 269: 270: 271: 272: 273: 274: 275: 276: 277: 278: 279: 280: 281: 282: 283: 284: 285: 286: 287: 288: 289: 290: 291: 292: 293: 294: 295: 296: 297: 298: 299: 300: 301: 302: 303: 304: 305: 306: 307: 308: 309: 310: 311: 312: 313: 314: 315: 316: 317: 318: 319: 320: 321: 322: 323: 324: 325: 326: 327: 328: 329: 330: 331: 332: 333: 334: 335: 336: 337: 338: 339: 340: 341: 342: 343: 344: 345: 346: 347: 348: 349: 350: 351: 352: 353: 354: 355: 356: 357: 358: 359: 360: 361: 362: 363: 364: 365: 366: 367: 368: 369: 370: 371: 372: 373: 374: 375: 376: 377: 378: 379: 380: 381: 382: 383: 384: 385: 386: 387: 388: 389: 390: 391: 392: 393: 394: 395: 396: 397: 398: 399: 400: 401: 402: 403: 404: 405: 406: 407: 408: 409: 410: 411: 412: 413: 414: 415: 416: 417: 418: 419: 420: 421: 422: 423: 424: 425: 426: 427: 428: 429: 430: 431: 432: 433: 434: 435: 436: 437: 438: 439: 440: 441: 442: 443: 444: 445: 446: 447: 448: 449: 450: 451: 452: 453: 454: 455: 456: 457: 458: 459: 460: 461: 462: 463: 464: 465: 466: 467: 468: 469: 470: 471: 472: 473: 474: 475: 476: 477: 478: 479: 480: 481: 482: 483: 484: 485: 486: 487: 488: 489: 490: 491: 492: 493: 494: 495: 496: 497: 498: 499: 500: 501: 502: 503: 504: 505: 506: 507: 508: 509: 510: 511: 512: 513: 514: 515: 516: 517: 518: 519: 520: 521: 522: 523: 524: 525: 526: 527: 528: 529: 530: 531: 532: 533: 534: 535: 536: 537: 538: 539: 540: 541: 542: 543: 544: 545: 546: 547: 548: 549: 550: 551: 552: 553: 554: 555: 556: 557: 558: 559: 560: 561: 562: 563: 564: 565: 566: 567: 568: 569: 570: 571: 572: 573: 574: 575: 576: 577: 578: 579: 580: 581: 
<?php
/**
 * @author stev leibelt <artodeto@bazzline.net>
 * @since 2014-07-20 
 */

namespace Net\Bazzline\Component\ProcessForkManager;

use Net\Bazzline\Component\MemoryLimitManager\MemoryLimitManager;
use Net\Bazzline\Component\MemoryLimitManager\MemoryLimitManagerDependentInterface;
use Net\Bazzline\Component\TimeLimitManager\TimeLimitManager;
use Net\Bazzline\Component\TimeLimitManager\TimeLimitManagerDependentInterface;
use Symfony\Component\EventDispatcher\EventDispatcher;

/**
 * Class ForkManager
 * @package Net\Bazzline\Component\ProcessForkManager
 */
class ForkManager implements ExecutableInterface, MemoryLimitManagerDependentInterface, TimeLimitManagerDependentInterface
{
    /**
     * @var ForkManagerEvent
     */
    private $event;

    /**
     * @var EventDispatcher
     */
    private $eventDispatcher;

    /**
     * @var int
     */
    private $maximumNumberOfThreads;

    /**
     * @var MemoryLimitManager
     */
    private $memoryLimitManager;

    /**
     * @var int
     */
    private $numberOfMicrosecondsToCheckThreadStatus;

    /**
     * @var int
     */
    private $processId;

    /**
     * @var bool
     */
    private $noShutdownSignalReceived;

    /**
     * @var TaskManager
     */
    private $taskManager;

    /**
     * @var TimeLimitManager
     */
    private $timeLimitManager;

    /**
     * @var array
     */
    private $threads;

    /**
     * @param bool $validateEnvironment
     * @throws RuntimeException
     */
    public function __construct($validateEnvironment = true)
    {
        if ($validateEnvironment) {
            //@todo add all needed
            $mandatoryPHPFunctions = array(
                'getmypid',
                'memory_get_usage',
                'pcntl_fork',
                'pcntl_signal',
                'pcntl_signal_dispatch',
                'posix_getpid',
                'spl_object_hash'
            );

            foreach ($mandatoryPHPFunctions as $mandatoryPHPFunction) {
                if (!function_exists($mandatoryPHPFunction)) {
                    throw new RuntimeException(
                        'mandatory php function "' . $mandatoryPHPFunction . '" is not available'
                    );
                }
            }
        }

        declare(ticks = 10);

        $this->processId = posix_getpid();
        $this->noShutdownSignalReceived = true;
        $this->threads = array();
    }

    /**
     * @param AbstractTask $task
     * @return $this
     */
    public function addTask(AbstractTask $task)
    {
        $task->setParentProcessId($this->processId);
        $this->taskManager->addOpenTask($task);

        return $this;
    }

    /**
     * @return EventDispatcher
     */
    public function getEventDispatcher()
    {
        return $this->eventDispatcher;
    }

    /**
     * @return MemoryLimitManager
     */
    public function getMemoryLimitManager()
    {
        return $this->memoryLimitManager;
    }

    /**
     * @return TaskManager
     */
    public function getTaskManager()
    {
        return $this->taskManager;
    }

    /**
     * @return TimeLimitManager
     */
    public function getTimeLimitManager()
    {
        return $this->timeLimitManager;
    }

    /**
     * @param ForkManagerEvent $event
     */
    public function injectEvent(ForkManagerEvent $event)
    {
        $this->event = $event;
    }

    /**
     * @param EventDispatcher $dispatcher
     */
    public function injectEventDispatcher(EventDispatcher $dispatcher)
    {
        $this->eventDispatcher = $dispatcher;
    }

    /**
     * @param MemoryLimitManager $manager
     */
    public function injectMemoryLimitManager(MemoryLimitManager $manager)
    {
        $this->memoryLimitManager = $manager;
    }

    /**
     * @param TimeLimitManager $manager
     */
    public function injectTimeLimitManager(TimeLimitManager $manager)
    {
        $this->timeLimitManager = $manager;
    }

    /**
     * @param TaskManager $manager
     */
    public function injectTaskManager(TaskManager $manager)
    {
        $this->taskManager = $manager;
    }

    /**
     * @param int $maximumNumberOfThreads
     */
    public function setMaximumNumberOfThreads($maximumNumberOfThreads)
    {
        $this->maximumNumberOfThreads = (int) $maximumNumberOfThreads;
    }

    /**
     * @param int $numberOfMicrosecondsToCheckThreadStatus
     */
    public function setNumberOfMicrosecondsToCheckThreadStatus($numberOfMicrosecondsToCheckThreadStatus)
    {
        $this->numberOfMicrosecondsToCheckThreadStatus = (int) $numberOfMicrosecondsToCheckThreadStatus;
    }

    /**
     * @throws RuntimeException
     */
    public function execute()
    {
        $this->assertMandatoryPropertiesAreSet();
        $this->setUpSignalHandling('signalHandler');

        $this->eventDispatcher->dispatch(
            ForkManagerEvent::STARTING_EXECUTION,
            $this->createNewEvent(__METHOD__)
        );

        while ($this->taskManager->areThereOpenTasksLeft()
            && $this->noShutdownSignalReceived) {
            if ($this->timeLimitManager->isLimitReached()) {
                $this->eventDispatcher->dispatch(
                    ForkManagerEvent::REACHING_TIME_LIMIT,
                    $this->createNewEvent(__METHOD__, $this)
                );
                $this->stopAllThreads();
            } else if ($this->isMaximumMemoryLimitOfWholeThreadsReached()) {
                $this->eventDispatcher->dispatch(
                    ForkManagerEvent::REACHING_TIME_LIMIT,
                    $this->createNewEvent(__METHOD__, $this)
                );
                $this->stopNewestThread();
                $this->sleep();
            } else {
                if ($this->isMaximumNumberOfThreadsReached()) {
                    $this->updateNumberOfRunningThreads();
                    $this->sleep();
                } else {
                    $task = $this->taskManager->getOpenTask();
                    if ($task instanceof AbstractTask) {
                        $this->startThread($task);
                    }
                }
            }
        }

        $this->eventDispatcher->dispatch(
            ForkManagerEvent::FINISHED_EXECUTION_OF_OPEN_TASK,
            $this->createNewEvent(__METHOD__)
        );
        $this->eventDispatcher->dispatch(
            ForkManagerEvent::STARTING_WAITING_FOR_RUNNING_TASKS,
            $this->createNewEvent(__METHOD__)
        );

        while ($this->notAllThreadsAreFinished()
            && $this->noShutdownSignalReceived) {
            if ($this->timeLimitManager->isLimitReached()) {
                $this->eventDispatcher->dispatch(
                    ForkManagerEvent::REACHING_TIME_LIMIT,
                    $this->createNewEvent(__METHOD__, $this)
                );
                $this->stopAllThreads();
            } else if ($this->isMaximumMemoryLimitOfWholeThreadsReached()) {
                $this->eventDispatcher->dispatch(
                    ForkManagerEvent::REACHING_TIME_LIMIT,
                    $this->createNewEvent(__METHOD__, $this)
                );
                $this->stopNewestThread();
                $this->sleep();
            } else {
                $this->updateNumberOfRunningThreads();
                $this->sleep();
            }
        }

        $this->eventDispatcher->dispatch(
            ForkManagerEvent::FINISHED_WAITING_FOR_RUNNING_TASKS,
            $this->createNewEvent(__METHOD__)
        );
        $this->eventDispatcher->dispatch(
            ForkManagerEvent::FINISHED_EXECUTION,
            $this->createNewEvent(__METHOD__)
        );
    }

    /**
     * @return bool
     */
    private function notAllThreadsAreFinished()
    {
        return ($this->countNumberOfThreads() !== 0);
    }

    private function updateNumberOfRunningThreads()
    {
        foreach ($this->threads as $processId => $data) {
            if ($this->hasThreadFinished($processId)) {
                $this->taskManager->markRunningTaskAsFinished($data['task']);
                unset($this->threads[$processId]);
                $this->eventDispatcher->dispatch(
                    ForkManagerEvent::FINISHED_TASK,
                    $this->createNewEvent(__METHOD__, null, $data['task'])
                );
            }
        }
    }

    /**
     * @param AbstractTask $task
     * @throws RuntimeException
     */
    private function startThread(AbstractTask $task)
    {
        $time = time();
        $processId = pcntl_fork();

        if ($processId < 0) {
            throw new RuntimeException(
                'can not fork process'
            );
        } else if ($processId === 0) {
            //child
            $task->execute();
            exit(0);
        } else {
            //parent
            //$processId > 0
            $this->eventDispatcher->dispatch(
                ForkManagerEvent::STARTING_TASK,
                $this->createNewEvent(__METHOD__, null, $task)
            );
            $this->threads[$processId] = array(
                'startTime' => $time,
                'task' => $task
            );
            $this->taskManager->markOpenTaskAsRunning($task);
        }
    }

    /**
     * @param $processId
     * @throws RuntimeException
     */
    private function stopThread($processId)
    {
        if ($processId > 0) {
            if (isset($this->threads[$processId])) {
                $isStopped = posix_kill($processId, SIGTERM);
                if ($isStopped) {
                    $task = $this->threads[$processId]['task'];
                    unset($this->threads[$processId]);
                    $this->taskManager->markRunningTaskAsAborted($task);
                    $this->eventDispatcher->dispatch(
                        ForkManagerEvent::STOPPING_TASK,
                        $this->createNewEvent(__METHOD__, null, $task)
                    );
                } else {
                    $this->sleep(10);
                    if (!$this->hasThreadFinished($processId)) {
                        throw new RuntimeException(
                            'thread with process id "' . $processId . '" could not be stopped'
                        );
                    }
                }
            }
        }
    }

    /**
     * @throws RuntimeException
     * @todo think about the idea to put this in a "HandleMaximumMemoryLimitReachedStrategy"
     */
    private function stopNewestThread()
    {
        $newestProcessId = null;
        $newestStartTime = 0;

        foreach ($this->threads as $processId => $data) {
            if ($data['startTime'] > $newestStartTime) {
                $newestProcessId = $processId;
            }
        }

        if (!is_null($newestProcessId)) {
            $this->stopThread($newestProcessId);
        }
    }

    /**
     * @throws RuntimeException
     */
    private function stopAllThreads()
    {
        foreach ($this->threads as $processId => $data) {
            $this->stopThread($processId);
        }
    }

    /**
     * @param int $processId
     * @return int
     * @throws RuntimeException
     */
    private function hasThreadFinished($processId)
    {
        if ($processId > 0) {
            $statusCode = 0;
            $result = pcntl_waitpid($processId, $statusCode, WUNTRACED OR WNOHANG);

            if ($statusCode > 0) {
                throw new RuntimeException(
                    'thread with process id "' . $processId .
                    '" returned statusCode code "' . $statusCode . '"'
                );
            }

            $threadHasFinished = ($result === $processId);
        } else {
            $threadHasFinished = true;
        }

        return $threadHasFinished;
    }

    /**
     * @return int
     */
    private function countNumberOfThreads()
    {
        return count($this->threads);
    }

    /**
     * @return bool
     */
    private function isMaximumNumberOfThreadsReached()
    {
        return ($this->countNumberOfThreads() >= $this->maximumNumberOfThreads);
    }

    /**
     * @return bool
     */
    private function isMaximumMemoryLimitOfWholeThreadsReached()
    {
        $processIds = array_keys($this->threads);

        $isReached = $this->memoryLimitManager->isLimitReached($processIds);

        return $isReached;
    }

    /**
     * @param int $steps
     */
    private function sleep($steps = 1)
    {
        $this->dispatchSignal();

        for ($iterator = 0; $iterator < $steps; ++$iterator) {
            usleep($this->numberOfMicrosecondsToCheckThreadStatus);
        }
    }

    //begin of posix signal handling
    /**
     * @param int $signal
     * @codeCoverageIgnore
     */
    private function signalHandler($signal)
    {
        //dispatch event caught signal
        switch ($signal) {
            case SIGCHLD:
                $this->updateNumberOfRunningThreads();
                break;
            case SIGABRT:
            case SIGALRM:
            case SIGHUP:
            case SIGINT:
            default:
                echo $signal . PHP_EOL;
                $this->shutdown();
        }
    }

    private function shutdown()
    {
        $this->eventDispatcher->dispatch(
            ForkManagerEvent::STOPPING_EXECUTION,
            $this->createNewEvent(__METHOD__, $this)
        );
        $this->stopAllThreads();
        $this->noShutdownSignalReceived = false;
    }

    private function dispatchSignal()
    {
        pcntl_signal_dispatch();
    }

    /**
     * @param $nameOfSignalHandlerMethod
     * @throws InvalidArgumentException
     * @codeCoverageIgnore
     */
    private function setUpSignalHandling($nameOfSignalHandlerMethod)
    {
        if (!is_callable(array($this, $nameOfSignalHandlerMethod))) {
            throw new InvalidArgumentException(
                'provided method name "' . $nameOfSignalHandlerMethod . '" is not available'
            );
        }

        pcntl_signal(SIGHUP,    array($this, $nameOfSignalHandlerMethod));    //controlling terminal is closed
        pcntl_signal(SIGINT,    array($this, $nameOfSignalHandlerMethod));  //interrupt this processing | ctrl+c
        //pcntl_signal(SIGUSR1,   array($this, $nameOfSignalHandlerMethod));    //user defined conditions
        //pcntl_signal(SIGUSR2,   array($this, $nameOfSignalHandlerMethod));    //user defined conditions
        //pcntl_signal(SIGQUIT,   array($this, $nameOfSignalHandlerMethod));    //quit your processing
        //pcntl_signal(SIGILL,    array($this, $nameOfSignalHandlerMethod));    //illegal instruction performed
        pcntl_signal(SIGABRT,   array($this, $nameOfSignalHandlerMethod));    //abort process
        //pcntl_signal(SIGFPE,    array($this, $nameOfSignalHandlerMethod));    //error on arithmetic
        //pcntl_signal(SIGSEGV,   array($this, $nameOfSignalHandlerMethod));    //invalid virtual memory reference
        //pcntl_signal(SIGPIPE,   array($this, $nameOfSignalHandlerMethod));    //write to a pipe without other process is connected to it
        pcntl_signal(SIGALRM,   array($this, $nameOfSignalHandlerMethod));    //some kind of limit is reached
        //pcntl_signal(SIGTERM,   array($this, $nameOfSignalHandlerMethod));  //termination signal | kill <pid>
        pcntl_signal(SIGCHLD,   array($this, $nameOfSignalHandlerMethod));    //child is terminated
        //pcntl_signal(SIGCONT,   array($this, $nameOfSignalHandlerMethod));    //continue your work
        //pcntl_signal(SIGTSTP,   array($this, $nameOfSignalHandlerMethod));    //terminal stop signal
        //pcntl_signal(SIGTTIN,   array($this, $nameOfSignalHandlerMethod));    //background process attempting read
        //pcntl_signal(SIGTTOU,   array($this, $nameOfSignalHandlerMethod));    //background process attempting write
    }
    //end of posix signal handling

    /**
     * @throws RuntimeException
     */
    private function assertMandatoryPropertiesAreSet()
    {
        $properties = array(
            'event',
            'eventDispatcher',
            'memoryLimitManager',
            'timeLimitManager',
            'taskManager'
        );

        foreach ($properties as $property) {
            if (is_null($this->$property)) {
                throw new RuntimeException(
                    'mandatory property "' . $property . '" not set'
                );
            }
        }
    }

    /**
     * @param string $source
     * @param ForkManager $forkManager
     * @param TaskInterface $task
     * @return ForkManagerEvent
     */
    private function createNewEvent($source = null, ForkManager $forkManager = null, TaskInterface $task = null)
    {
        $event = clone $this->event;

        if ($forkManager instanceof ForkManager) {
            $event->setForkManager($this);
        }

        if ($task instanceof TaskInterface) {
            $event->setTask($task);
        }

        if (!is_null($source)) {
            $event->setSource($source);
        }

        return $event;
    }
}
PHP Process Fork Manager Component by bazzline.net API documentation generated by ApiGen