- parent::__construct($daemonize);
-
- if ($id) {
- $this->set_id($id);
- }
- $this->all = $allsites;
- $this->threads = $threads;
- }
-
- /**
- * How many seconds a polling-based queue manager should wait between
- * checks for new items to handle.
- *
- * Defaults to 60 seconds; override to speed up or slow down.
- *
- * @return int timeout in seconds
- */
- function timeout()
- {
- return 60;
- }
-
- function name()
- {
- return strtolower(get_class($this).'.'.$this->get_id());
- }
-
- function run()
- {
- if ($this->threads > 1) {
- return $this->runThreads();
- } else {
- return $this->runLoop();
- }
- }
-
- function runThreads()
- {
- $children = array();
- for ($i = 1; $i <= $this->threads; $i++) {
- $pid = pcntl_fork();
- if ($pid < 0) {
- print "Couldn't fork for thread $i; aborting\n";
- exit(1);
- } else if ($pid == 0) {
- $this->runChild($i);
- exit(0);
- } else {
- $this->log(LOG_INFO, "Spawned thread $i as pid $pid");
- $children[$i] = $pid;
- }
- }
-
- $this->log(LOG_INFO, "Waiting for children to complete.");
- while (count($children) > 0) {
- $status = null;
- $pid = pcntl_wait($status);
- if ($pid > 0) {
- $i = array_search($pid, $children);
- if ($i === false) {
- $this->log(LOG_ERR, "Unrecognized child pid $pid exited!");
- continue;
- }
- unset($children[$i]);
- $this->log(LOG_INFO, "Thread $i pid $pid exited.");
-
- $pid = pcntl_fork();
- if ($pid < 0) {
- print "Couldn't fork to respawn thread $i; aborting thread.\n";
- } else if ($pid == 0) {
- $this->runChild($i);
- exit(0);
- } else {
- $this->log(LOG_INFO, "Respawned thread $i as pid $pid");
- $children[$i] = $pid;
- }
- }
- }
- $this->log(LOG_INFO, "All child processes complete.");
- return true;
- }
-
- function runChild($thread)
- {
- $this->set_id($this->get_id() . "." . $thread);
- $this->resetDb();
- $this->runLoop();
- }
-
- /**
- * Reconnect to the database for each child process,
- * or they'll get very confused trying to use the
- * same socket.
- */
- function resetDb()
- {
- // @fixme do we need to explicitly open the db too
- // or is this implied?
- global $_DB_DATAOBJECT;
- unset($_DB_DATAOBJECT['CONNECTIONS']);
-
- // Reconnect main memcached, or threads will stomp on
- // each other and corrupt their requests.
- $cache = common_memcache();
- if ($cache) {
- $cache->reconnect();
- }
-
- // Also reconnect memcached for status_network table.
- if (!empty(Status_network::$cache)) {
- Status_network::$cache->close();
- Status_network::$cache = null;
- }