Push notification daemon example for @PeeHaa
<?php
$pidFile = __DIR__ . '/push-notify.pid';
$localSockAddr = __DIR__ . '/push-notify.sock'; // used for sending updates
$publicSockAddr = 'tcp://0.0.0.0:1337'; // bind address for push clients
$remoteSockAddr = 'tcp://127.0.0.1:1337'; // push client remote address for testing
<?php
// controller for parent process
class ProcessController
{
private $pidFile;
private $pidFileHandle;
private $childPid;
private $childPidPos;
private $childScript;
private $childCleanup;
public function __construct($pidFile, $childScript)
{
$this->pidFile = $pidFile;
$this->childScript = $childScript;
}
public function __destruct()
{
$this->closePidFile();
}
public function run()
{
_log("Parent starting");
$this->openPidFile();
$this->registerSignalHandlers();
while (true) {
$this->fork();
}
}
public function registerChildCleanup(callable $callback)
{
$this->childCleanup = $callback; // this is a dirty hack, obviously a proper OOP impl
// would find a better way to do this
}
private function openPidFile()
{
$this->pidFileHandle = fopen($this->pidFile, 'a+');
rewind($this->pidFileHandle);
if (!flock($this->pidFileHandle, LOCK_EX | LOCK_NB)) {
_error("Process is already running: pid=" . trim(stream_get_contents($this->pidFileHandle)));
}
ftruncate($this->pidFileHandle, 0);
$this->childPidPos = fwrite($this->pidFileHandle, posix_getpid() . "\n");
}
private function writeChildPid($pid)
{
ftruncate($this->pidFileHandle, $this->childPidPos);
fseek($this->pidFileHandle, $this->childPidPos);
fwrite($this->pidFileHandle, $pid . "\n");
}
private function closePidFile()
{
if ($this->pidFileHandle !== null && $this->childPid) {
flock($this->pidFileHandle, LOCK_UN);
fclose($this->pidFileHandle);
unlink($this->pidFile);
$this->pidFileHandle = null;
}
}
private function registerSignalHandlers()
{
pcntl_signal(SIGINT, function($sigNo) {
$this->handleShutdown();
});
pcntl_signal(SIGTERM, function($sigNo) {
$this->handleShutdown();
});
pcntl_signal(SIGSEGV, function($sigNo) {
// this probably won't work but we might as well try
$this->handleShutdown();
});
pcntl_signal(SIGUSR1, function($sigNo) {
// posix has no official restart signal
$this->handleRestart();
});
pcntl_signal(SIGHUP, function($sigNo) {
// ignore hangups, so service keeps running when started via a terminal
// this may not be desired and you may want to
// $this->handleShutdown();
});
}
private function handleShutdown()
{
if ($this->childPid) {
_log("Shutdown signal received in parent");
$this->closePidFile();
} else {
_log("Shutdown signal received in child");
if ($this->childCleanup) {
call_user_func($this->childCleanup);
}
}
exit(0);
}
private function handleRestart()
{
if ($this->childPid) {
_log("Restart signal received in parent");
} else {
_log("Restart signal received in child");
if ($this->childCleanup) {
call_user_func($this->childCleanup);
}
exit(0);
}
}
private function fork()
{
_log("Forking process");
$childPid = pcntl_fork();
if ($childPid === -1) {
_error("Unable to fork process!", 1);
} else if ($childPid) {
$this->runParent($childPid);
} else {
$this->runChild();
}
}
private function runParent($childPid)
{
$this->childPid = $childPid;
$this->writeChildPid($childPid);
_log("Child process started successfully, PID: $childPid");
if (pcntl_waitpid($childPid, $status) == -1) {
_error("Error occured waiting for child process!", 2);
}
pcntl_signal_dispatch();
$this->childPid = null;
_log("The child process exited with code $status, let's go round again...");
}
private function runChild()
{
require $this->childScript;
}
}
// crappy logging is crappy
function _log($msg) {
fwrite(STDOUT, date('H:i:s | ') . $msg . "\n");
}
function _warn($msg) {
fwrite(STDERR, date('H:i:s | ') . "WARNING: $msg\n");
}
function _error($msg, $code) {
fwrite(STDERR, date('H:i:s | ') . "ERROR: $msg\n");
exit((int) $code);
}
// helpers for server-child.php
function read_socket_data($socket) {
$buffer = '';
while ((string) ($chunk = fread($socket, 1024)) !== '') {
$buffer .= $chunk;
}
return $buffer;
}
function store_notification($data) {
/*
this routine would store data for the notification in persistent storage
i.e. the database you currently poll for updates
*/
_log("Storing persistent data: $data");
}
function send_push_notification($sockets, $data) {
/*
This routine would handle the sending of the push notification to the client
*/
_log("Sending push notification: $data");
foreach ($sockets as $sock) {
fwrite($sock, $data);
}
}
function get_pids($pidFile) {
@list($parent, $child) = explode("\n", trim(file_get_contents($pidFile)), 2);
return [(int) $parent, (int) $child];
}
#!/usr/bin/php
<?php
/*
This script represents a push notification client - a persistent connection that just waits
for data on the socket
*/
require __DIR__ . '/config.php';
require __DIR__ . '/helpers.php';
if (!$sock = stream_socket_client($remoteSockAddr, $errNo, $errStr)) {
_error("Creating remote socket failed: $errNo: $errStr", 1);
}
stream_set_blocking($sock, 0);
while (true) {
$r = [$sock];
$w = $e = $t = null;
stream_select($r, $w, $e, $t);
if (feof($sock)) {
fclose($sock);
_warn('Server closed connection');
exit;
} else {
_log('Push data recieved: ' . read_socket_data($sock));
}
}
<?php
/*
This is the script that actually does server-y things and is therefore the more likely
candidate to die unexpectedly because of a problem with your code
*/
// init routine
_log("Initialising...");
require __DIR__ . '/config.php';
if (!$localSock = stream_socket_server("unix://$localSockAddr", $errNo, $errStr)) {
_error("Creating local socket failed: $errNo: $errStr", 1);
}
$localSockId = (int) $localSock;
_log("Created local socket #$localSockId");
if (!$publicSock = stream_socket_server($publicSockAddr, $errNo, $errStr)) {
_error("Creating public socket failed: $errNo: $errStr", 2);
}
$publicSockId = (int) $publicSock;
_log("Created public socket #$publicSockId");
$localClients = $publicClients = [];
$this->registerChildCleanup(function() use($localSock, $localSockAddr, $localClients, $publicSock, $publicClients) {
foreach ($localClients as $sock) {
fclose($sock);
}
fclose($localSock);
$localSock = $localClients = null;
@unlink($localSockAddr);
foreach ($publicClients as $sock) {
fclose($sock);
}
fclose($publicSock);
$localSock = $publicClients = null;
});
// main loop
_log("Init complete, entering main service loop");
while (true) {
$r = array_merge([$localSock, $publicSock], $localClients, $publicClients);
$w = $e = $t = null;
_log("Waiting for something to do");
$count = @stream_select($r, $w, $e, $t);
_log("$count sockets have pending data");
foreach ($r as $socket) {
$sockId = (int) $socket;
_log("Processing data on socket $sockId");
switch (true) {
case $sockId == $localSockId: // new client on local socket
$client = stream_socket_accept($localSock);
$clientId = (int) $client;
$localClients[$clientId] = $client;
stream_set_blocking($client, 0);
_log("Local client $clientId connected");
break;
case $sockId == $publicSockId: // new client on public socket
$client = stream_socket_accept($publicSock);
$clientId = (int) $client;
$publicClients[$clientId] = $client;
stream_set_blocking($client, 0);
_log("Public client $clientId connected");
break;
case isset($localClients[$sockId]): // new data on local client
if (feof($socket)) {
_log("Local client $sockId disconnected");
fclose($socket);
unset($localClients[$sockId]);
} else {
_log("Data recieved from local client $sockId");
$data = read_socket_data($socket);
_log("Data: " . trim($data));
store_notification($data);
send_push_notification($publicClients, $data);
}
break;
case isset($publicClients[$sockId]): // new data on public client
if (feof($socket)) {
_log("Public client $sockId disconnected");
fclose($socket);
unset($publicClients[$sockId]);
} else {
$data = read_socket_data($socket);
_log("Data recieved from public client: " . trim($data));
}
break;
default:
_warn("Unknown socket ID '$sockId' encountered :-S");
break;
}
}
_log("All pending data handled");
}
#!/usr/bin/php
<?php
/*
This script just respawns the child if it dies, and handles shutdown/restart signals
gracefully.
*/
declare(ticks = 1);
require __DIR__ . '/config.php';
require __DIR__ . '/helpers.php';
$controller = new ProcessController($pidFile, __DIR__ . '/server-child.php');
$controller->run();
#!/usr/bin/php
<?php
/*
Service controller script
*/
require __DIR__ . '/config.php';
require __DIR__ . '/helpers.php';
switch (@$argv[1]) {
case 'start':
$cmd = 'php ' . escapeshellarg(__DIR__ . '/server-parent.php') . ' > /dev/null 2>&1 &';
exec($cmd);
exit(0);
case 'stop':
list($parent, $child) = get_pids($pidFile);
posix_kill($parent, SIGTERM);
posix_kill($child, SIGTERM);
exit(0);
case 'restart':
list($parent, $child) = get_pids($pidFile);
posix_kill($parent, SIGUSR1);
posix_kill($child, SIGTERM);
exit(0);
default:
echo "Y U NO MAEK SENSE?????";
exit(1);
}
#!/usr/bin/php
<?php
/*
This script represents an updater - something that triggers a push notification to be sent
*/
require __DIR__ . '/config.php';
require __DIR__ . '/helpers.php';
if (!$sock = stream_socket_client("unix://$localSockAddr", $errNo, $errStr)) {
_error("Creating local socket failed: $errNo: $errStr", 1);
}
fwrite($sock, 'Push notification sent at ' . date('H:i:s') . ': ' . @$argv[1]);
fclose($sock);