<?php
use Friendica\App;
+use Friendica\Core\System;
use Friendica\Core\Config;
use Friendica\Util\Lock;
require_once("boot.php");
function poller_run($argv, $argc){
- global $a, $db, $poller_up_start, $poller_db_duration;
+ global $a, $poller_up_start, $poller_db_duration;
$poller_up_start = microtime(true);
- $a = new App(dirname(__DIR__));
+ if (empty($a)) {
+ $a = new App(dirname(__DIR__));
+ }
- @include(".htconfig.php");
- require_once("include/dba.php");
- $db = new dba($db_host, $db_user, $db_pass, $db_data);
+ require_once ".htconfig.php";
+ require_once "include/dba.php";
+ dba::connect($db_host, $db_user, $db_pass, $db_data);
unset($db_host, $db_user, $db_pass, $db_data);
Config::load();
+ // Check the database structure and possibly fixes it
+ check_db(true);
+
// Quit when in maintenance
if (Config::get('system', 'maintenance', true)) {
return;
poller_exec_function($queue, $funcname, $argv);
$stamp = (float)microtime(true);
- dba::update('workerqueue', array('done' => true), array('id' => $queue["id"]));
+ if (dba::update('workerqueue', array('done' => true), array('id' => $queue["id"]))) {
+ Config::set('system', 'last_poller_execution', datetime_convert());
+ }
$poller_db_duration = (microtime(true) - $stamp);
} else {
logger("Function ".$funcname." does not exist");
$argc = count($argv);
- logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." ".$queue["parameter"]);
+ $new_process_id = uniqid("wrk", true);
+
+ logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." ".$queue["parameter"]." - Process PID: ".$new_process_id);
$stamp = (float)microtime(true);
// For better logging create a new process id for every worker call
// But preserve the old one for the worker
$old_process_id = $a->process_id;
- $a->process_id = uniqid("wrk", true);
+ $a->process_id = $new_process_id;
$a->queue = $queue;
$up_duration = number_format(microtime(true) - $poller_up_start, 3);
+ // Reset global data to avoid interferences
+ unset($_SESSION);
+
$funcname($argv, $argc);
$a->process_id = $old_process_id;
logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 2 minutes (".round($duration/60, 3).")", LOGGER_DEBUG);
}
- logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." - done in ".$duration." seconds.");
+ logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." - done in ".$duration." seconds. Process PID: ".$new_process_id);
// Write down the performance values into the log
if (Config::get("system", "profiler")) {
*
*/
function poller_kill_stale_workers() {
- $entries = dba::p("SELECT `id`, `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > ? AND NOT `done` AND `pid` != 0 ORDER BY `priority`, `created`", NULL_DATE);
-
+ $entries = dba::select('workerqueue', array('id', 'pid', 'executed', 'priority', 'parameter'),
+ array('`executed` > ? AND NOT `done` AND `pid` != 0', NULL_DATE),
+ array('order' => array('priority', 'created')));
while ($entry = dba::fetch($entries)) {
if (!posix_kill($entry["pid"], 0)) {
dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
// The higher the number of parallel workers, the more we prefetch to prevent concurring access
// We decrease the limit with the number of entries left in the queue
$worker_queues = Config::get("system", "worker_queues", 4);
- $queue_length = Config::get('system', 'worker_fetch_limit', $worker_queues);
+ $queue_length = Config::get('system', 'worker_fetch_limit', 1);
$lower_job_limit = $worker_queues * $queue_length * 2;
$jobs = poller_total_entries();
if (poller_passing_slow($highest_priority)) {
// Are there waiting processes with a higher priority than the currently highest?
- $result = dba::p("SELECT `id` FROM `workerqueue`
- WHERE `executed` <= ? AND `priority` < ? AND NOT `done`
- ORDER BY `priority`, `created` LIMIT ".intval($limit),
- NULL_DATE, $highest_priority);
+ $result = dba::select('workerqueue', array('id'), array("`executed` <= ? AND `priority` < ? AND NOT `done`", NULL_DATE, $highest_priority),
+ array('limit' => $limit, 'order' => array('priority', 'created'), 'only_query' => true));
while ($id = dba::fetch($result)) {
$ids[] = $id["id"];
if (!$found) {
// Give slower processes some processing time
- $result = dba::p("SELECT `id` FROM `workerqueue`
- WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
- ORDER BY `priority`, `created` LIMIT ".intval($limit),
- NULL_DATE, $highest_priority);
+ $result = dba::select('workerqueue', array('id'), array("`executed` <= ? AND `priority` > ? AND NOT `done`", NULL_DATE, $highest_priority),
+ array('limit' => $limit, 'order' => array('priority', 'created'), 'only_query' => true));
while ($id = dba::fetch($result)) {
$ids[] = $id["id"];
// If there is no result (or we shouldn't pass lower processes) we check without priority limit
if (!$found) {
- $result = dba::p("SELECT `id` FROM `workerqueue` WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit), NULL_DATE);
+ $result = dba::select('workerqueue', array('id'), array("`executed` <= ? AND NOT `done`", NULL_DATE),
+ array('limit' => $limit, 'order' => array('priority', 'created'), 'only_query' => true));
while ($id = dba::fetch($result)) {
$ids[] = $id["id"];
}
if ($found) {
- $sql = "UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `id` IN (".substr(str_repeat("?, ", count($ids)), 0, -2).") AND `pid` = 0 AND NOT `done`;";
- array_unshift($ids, datetime_convert(), $mypid);
- dba::e($sql, $ids);
+ $condition = "`id` IN (".substr(str_repeat("?, ", count($ids)), 0, -2).") AND `pid` = 0 AND NOT `done`";
+ array_unshift($ids, $condition);
+ dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid), $ids);
}
return $found;
return;
}
- $url = App::get_baseurl()."/worker";
+ $url = System::baseUrl()."/worker";
fetch_url($url, false, $redirects, 1);
}
poller_kill_stale_workers();
}
-if (array_search(__file__,get_included_files())===0){
+if (array_search(__file__,get_included_files())===0) {
poller_run($_SERVER["argv"],$_SERVER["argc"]);
poller_unclaim_process();