ekodedypurnomo
7/26/2018 - 12:50 AM

Push notification daemon example for @PeeHaa

Push notification daemon example for @PeeHaa

<?php

    $pidFile = __DIR__ . '/push-notify.pid';

    $localSockAddr = __DIR__ . '/push-notify.sock'; // used for sending updates
    $publicSockAddr = 'tcp://0.0.0.0:1337';         // bind address for push clients

    $remoteSockAddr = 'tcp://127.0.0.1:1337';       // push client remote address for testing
<?php

// controller for parent process
class ProcessController
{
    private $pidFile;
    private $pidFileHandle;

    private $childPid;
    private $childPidPos;
    private $childScript;
    private $childCleanup;

    public function __construct($pidFile, $childScript)
    {
        $this->pidFile = $pidFile;
        $this->childScript = $childScript;
    }

    public function __destruct()
    {
        $this->closePidFile();
    }

    public function run()
    {
        _log("Parent starting");

        $this->openPidFile();
        $this->registerSignalHandlers();

        while (true) {
            $this->fork();
        }
    }

    public function registerChildCleanup(callable $callback)
    {
        $this->childCleanup = $callback; // this is a dirty hack, obviously a proper OOP impl
                                         // would find a better way to do this
    }

    private function openPidFile()
    {
        $this->pidFileHandle = fopen($this->pidFile, 'a+');
        rewind($this->pidFileHandle);

        if (!flock($this->pidFileHandle, LOCK_EX | LOCK_NB)) {
            _error("Process is already running: pid=" . trim(stream_get_contents($this->pidFileHandle)));
        }

        ftruncate($this->pidFileHandle, 0);
        $this->childPidPos = fwrite($this->pidFileHandle, posix_getpid() . "\n");
    }

    private function writeChildPid($pid)
    {
        ftruncate($this->pidFileHandle, $this->childPidPos);
        fseek($this->pidFileHandle, $this->childPidPos);
        fwrite($this->pidFileHandle, $pid . "\n");
    }

    private function closePidFile()
    {
        if ($this->pidFileHandle !== null && $this->childPid) {
            flock($this->pidFileHandle, LOCK_UN);
            fclose($this->pidFileHandle);
            unlink($this->pidFile);

            $this->pidFileHandle = null;
        }
    }

    private function registerSignalHandlers()
    {
        pcntl_signal(SIGINT, function($sigNo) {
            $this->handleShutdown();
        });
        pcntl_signal(SIGTERM, function($sigNo) {
            $this->handleShutdown();
        });
        pcntl_signal(SIGSEGV, function($sigNo) {
            // this probably won't work but we might as well try
            $this->handleShutdown();
        });
        pcntl_signal(SIGUSR1, function($sigNo) {
            // posix has no official restart signal
            $this->handleRestart();
        });
        pcntl_signal(SIGHUP, function($sigNo) {
            // ignore hangups, so service keeps running when started via a terminal
            // this may not be desired and you may want to
            // $this->handleShutdown();
        });
    }

    private function handleShutdown()
    {
        if ($this->childPid) {
            _log("Shutdown signal received in parent");
            $this->closePidFile();
        } else {
            _log("Shutdown signal received in child");
            if ($this->childCleanup) {
                call_user_func($this->childCleanup);
            }
        }

        exit(0);
    }

    private function handleRestart()
    {
        if ($this->childPid) {
            _log("Restart signal received in parent");
        } else {
            _log("Restart signal received in child");
            if ($this->childCleanup) {
                call_user_func($this->childCleanup);
            }
            exit(0);
        }
    }

    private function fork()
    {
        _log("Forking process");
        $childPid = pcntl_fork();

        if ($childPid === -1) {
            _error("Unable to fork process!", 1);
        } else if ($childPid) {
            $this->runParent($childPid);
        } else {
            $this->runChild();
        }
    }
    
    private function runParent($childPid)
    {
        $this->childPid = $childPid;
        $this->writeChildPid($childPid);
        _log("Child process started successfully, PID: $childPid");

        if (pcntl_waitpid($childPid, $status) == -1) {
            _error("Error occured waiting for child process!", 2);
        }

        pcntl_signal_dispatch();
        $this->childPid = null;

        _log("The child process exited with code $status, let's go round again...");
    }

    private function runChild()
    {
        require $this->childScript;
    }
}

// crappy logging is crappy
function _log($msg) {
    fwrite(STDOUT, date('H:i:s | ') . $msg . "\n");
}
function _warn($msg) {
    fwrite(STDERR, date('H:i:s | ') . "WARNING: $msg\n");
}
function _error($msg, $code) {
    fwrite(STDERR, date('H:i:s | ') . "ERROR: $msg\n");
    exit((int) $code);
}

// helpers for server-child.php
function read_socket_data($socket) {
    $buffer = '';
    while ((string) ($chunk = fread($socket, 1024)) !== '') {
        $buffer .= $chunk;
    }
    return $buffer;
}
function store_notification($data) {
    /*
      this routine would store data for the notification in persistent storage
      i.e. the database you currently poll for updates
    */
    _log("Storing persistent data: $data");
}
function send_push_notification($sockets, $data) {
    /*
      This routine would handle the sending of the push notification to the client
    */
    _log("Sending push notification: $data");

    foreach ($sockets as $sock) {
        fwrite($sock, $data);
    }
}

function get_pids($pidFile) {
    @list($parent, $child) = explode("\n", trim(file_get_contents($pidFile)), 2);
    return [(int) $parent, (int) $child];
}
#!/usr/bin/php
<?php

    /*
      This script represents a push notification client - a persistent connection that just waits
      for data on the socket
    */

    require __DIR__ . '/config.php';
    require __DIR__ . '/helpers.php';

    if (!$sock = stream_socket_client($remoteSockAddr, $errNo, $errStr)) {
        _error("Creating remote socket failed: $errNo: $errStr", 1);
    }
    stream_set_blocking($sock, 0);

    while (true) {
        $r = [$sock];
        $w = $e = $t = null;
        stream_select($r, $w, $e, $t);

        if (feof($sock)) {
            fclose($sock);
            _warn('Server closed connection');
            exit;
        } else {
            _log('Push data recieved: ' . read_socket_data($sock));
        }
    }
<?php

    /*
      This is the script that actually does server-y things and is therefore the more likely
      candidate to die unexpectedly because of a problem with your code
    */

    // init routine
    _log("Initialising...");

    require __DIR__ . '/config.php';

    if (!$localSock = stream_socket_server("unix://$localSockAddr", $errNo, $errStr)) {
        _error("Creating local socket failed: $errNo: $errStr", 1);
    }
    $localSockId = (int) $localSock;
    _log("Created local socket #$localSockId");

    if (!$publicSock = stream_socket_server($publicSockAddr, $errNo, $errStr)) {
        _error("Creating public socket failed: $errNo: $errStr", 2);
    }
    $publicSockId = (int) $publicSock;
    _log("Created public socket #$publicSockId");

    $localClients = $publicClients = [];

    $this->registerChildCleanup(function() use($localSock, $localSockAddr, $localClients, $publicSock, $publicClients) {
        foreach ($localClients as $sock) {
            fclose($sock);
        }
        fclose($localSock);
        $localSock = $localClients = null;
        @unlink($localSockAddr);

        foreach ($publicClients as $sock) {
            fclose($sock);
        }
        fclose($publicSock);
        $localSock = $publicClients = null;
    });

    // main loop
    _log("Init complete, entering main service loop");

    while (true) {
        $r = array_merge([$localSock, $publicSock], $localClients, $publicClients);
        $w = $e = $t = null;

        _log("Waiting for something to do");
        $count = @stream_select($r, $w, $e, $t);
        _log("$count sockets have pending data");
        
        foreach ($r as $socket) {
            $sockId = (int) $socket;
            _log("Processing data on socket $sockId");

            switch (true) {
                case $sockId == $localSockId: // new client on local socket
                    $client = stream_socket_accept($localSock);
                    $clientId = (int) $client;
                    $localClients[$clientId] = $client;
                    stream_set_blocking($client, 0);
                    _log("Local client $clientId connected");
                    break;

                case $sockId == $publicSockId: // new client on public socket
                    $client = stream_socket_accept($publicSock);
                    $clientId = (int) $client;
                    $publicClients[$clientId] = $client;
                    stream_set_blocking($client, 0);
                    _log("Public client $clientId connected");
                    break;

                case isset($localClients[$sockId]): // new data on local client
                    if (feof($socket)) {
                        _log("Local client $sockId disconnected");
                        fclose($socket);
                        unset($localClients[$sockId]);
                    } else {
                        _log("Data recieved from local client $sockId");
                        $data = read_socket_data($socket);
                        _log("Data: " . trim($data));

                        store_notification($data);
                        send_push_notification($publicClients, $data);
                    }
                    break;

                case isset($publicClients[$sockId]): // new data on public client
                    if (feof($socket)) {
                        _log("Public client $sockId disconnected");
                        fclose($socket);
                        unset($publicClients[$sockId]);
                    } else {
                        $data = read_socket_data($socket);
                        _log("Data recieved from public client: " . trim($data));
                    }
                    break;

                default:
                    _warn("Unknown socket ID '$sockId' encountered :-S");
                    break;
            }
        }

        _log("All pending data handled");
    }
#!/usr/bin/php
<?php

    /*
      This script just respawns the child if it dies, and handles shutdown/restart signals
      gracefully.
    */

    declare(ticks = 1);

    require __DIR__ . '/config.php';
    require __DIR__ . '/helpers.php';

    $controller = new ProcessController($pidFile, __DIR__ . '/server-child.php');
    $controller->run();
#!/usr/bin/php
<?php

    /*
      Service controller script
    */

    require __DIR__ . '/config.php';
    require __DIR__ . '/helpers.php';

    switch (@$argv[1]) {
        case 'start':
            $cmd = 'php ' . escapeshellarg(__DIR__ . '/server-parent.php') . ' > /dev/null 2>&1 &';
            exec($cmd);
            exit(0);

        case 'stop':
            list($parent, $child) = get_pids($pidFile);
            posix_kill($parent, SIGTERM);
            posix_kill($child, SIGTERM);
            exit(0);

        case 'restart':
            list($parent, $child) = get_pids($pidFile);
            posix_kill($parent, SIGUSR1);
            posix_kill($child, SIGTERM);
            exit(0);

        default:
            echo "Y U NO MAEK SENSE?????";
            exit(1);
    }
#!/usr/bin/php
<?php

    /*
      This script represents an updater - something that triggers a push notification to be sent
    */

    require __DIR__ . '/config.php';
    require __DIR__ . '/helpers.php';

    if (!$sock = stream_socket_client("unix://$localSockAddr", $errNo, $errStr)) {
        _error("Creating local socket failed: $errNo: $errStr", 1);
    }

    fwrite($sock, 'Push notification sent at ' . date('H:i:s') . ': ' . @$argv[1]);
    fclose($sock);