*/
function run()
{
+ $this->initPipes();
+
$children = array();
for ($i = 1; $i <= $this->threads; $i++) {
$pid = pcntl_fork();
$this->log(LOG_INFO, "Spawned thread $i as pid $pid");
$children[$i] = $pid;
}
+ sleep(common_config('queue', 'spawndelay'));
}
$this->log(LOG_INFO, "Waiting for children to complete.");
while (count($children) > 0) {
$status = null;
$pid = pcntl_wait($status);
- if ($pid > 0 && pcntl_wifexited($status)) {
- $exitCode = pcntl_wexitstatus($status);
-
+ if ($pid > 0) {
$i = array_search($pid, $children);
if ($i === false) {
- $this->log(LOG_ERR, "Unrecognized child pid $pid exited with status $exitCode");
+ $this->log(LOG_ERR, "Ignoring exit of unrecognized child pid $pid");
continue;
}
+ if (pcntl_wifexited($status)) {
+ $exitCode = pcntl_wexitstatus($status);
+ $info = "status $exitCode";
+ } else if (pcntl_wifsignaled($status)) {
+ $exitCode = self::EXIT_ERR;
+ $signal = pcntl_wtermsig($status);
+ $info = "signal $signal";
+ }
unset($children[$i]);
if ($this->shouldRespawn($exitCode)) {
- $this->log(LOG_INFO, "Thread $i pid $pid exited with status $exitCode; respawing.");
+ $this->log(LOG_INFO, "Thread $i pid $pid exited with $info; respawing.");
$pid = pcntl_fork();
if ($pid < 0) {
$this->log(LOG_INFO, "Respawned thread $i as pid $pid");
$children[$i] = $pid;
}
+ sleep(common_config('queue', 'spawndelay'));
} else {
$this->log(LOG_INFO, "Thread $i pid $pid exited with status $exitCode; closing out thread.");
}
return true;
}
+ /**
+ * Create an IPC socket pair which child processes can use to detect
+ * if the parent process has been killed.
+ */
+ function initPipes()
+ {
+ $sockets = stream_socket_pair(STREAM_PF_UNIX, STREAM_SOCK_STREAM, 0);
+ if ($sockets) {
+ $this->parentWriter = $sockets[0];
+ $this->parentReader = $sockets[1];
+ } else {
+ $this->log(LOG_ERROR, "Couldn't create inter-process sockets");
+ exit(1);
+ }
+ }
+
+ /**
+ * Build an IOManager that simply ensures that we have a connection
+ * to the parent process open. If it breaks, the child process will
+ * die.
+ *
+ * @return ProcessManager
+ */
+ public function processManager()
+ {
+ return new ProcessManager($this->parentReader);
+ }
+
/**
* Determine whether to respawn an exited subprocess based on its exit code.
* Otherwise we'll respawn all exits by default.
*/
protected function initAndRunChild($thread)
{
+ // Close the writer end of our parent<->children pipe.
+ fclose($this->parentWriter);
$this->set_id($this->get_id() . "." . $thread);
$this->resetDb();
$exitCode = $this->runThread();
// Reconnect main memcached, or threads will stomp on
// each other and corrupt their requests.
- $cache = common_memcache();
+ $cache = Cache::instance();
if ($cache) {
$cache->reconnect();
}