define ( 'GRAVITY_COMMENT', 6);
/* @}*/
+/**
+ * @name Priority
+ *
+ * Process priority for the worker
+ * @{
+ */
+define('PRIORITY_HIGH', 1);
+define('PRIORITY_MEDIUM', 2);
+define('PRIORITY_LOW', 3);
+/* @}*/
+
// Normally this constant is defined - but not if "pcntl" isn't installed
if (!defined("SIGTERM"))
logger("killed stale process");
// Calling a new instance
if ($task != "")
- proc_run('php', $task);
+ proc_run(PRIORITY_MEDIUM, $task);
}
return true;
}
}
return false;
}
+
+ function proc_run($args) {
+
+ // Add the php path if it is a php call
+ if (count($args) && $args[0] === 'php')
+ $args[0] = ((x($this->config,'php_path')) && (strlen($this->config['php_path'])) ? $this->config['php_path'] : 'php');
+
+ // add baseurl to args. cli scripts can't construct it
+ $args[] = $this->get_baseurl();
+
+ for($x = 0; $x < count($args); $x ++)
+ $args[$x] = escapeshellarg($args[$x]);
+
+ $cmdline = implode($args," ");
+
+ if(get_config('system','proc_windows'))
+ proc_close(proc_open('cmd /c start /b ' . $cmdline,array(),$foo,dirname(__FILE__)));
+ else
+ proc_close(proc_open($cmdline." &",array(),$foo,dirname(__FILE__)));
+
+ }
}
/**
$build = DB_UPDATE_VERSION;
}
if($build != DB_UPDATE_VERSION)
- proc_run('php', 'include/dbupdate.php');
+ proc_run(PRIORITY_HIGH, 'include/dbupdate.php');
}
* @brief Wrap calls to proc_close(proc_open()) and call hook
* so plugins can take part in process :)
*
- * @param string $cmd program to run
+ * @param (string|integer) $cmd program to run or priority
*
* next args are passed as $cmd command line
* e.g.: proc_run("ls","-la","/tmp");
+ * or: proc_run(PRIORITY_HIGH, "include/notifier.php", "drop", $drop_id);
*
* @note $cmd and string args are surrounded with ""
*
$args = func_get_args();
$newargs = array();
- if(! count($args))
+ if (!count($args))
return;
// expand any arrays
foreach($arg as $n) {
$newargs[] = $n;
}
- }
- else
+ } else
$newargs[] = $arg;
}
$arr = array('args' => $args, 'run_cmd' => true);
call_hooks("proc_run", $arr);
- if(! $arr['run_cmd'])
+ if (!$arr['run_cmd'] OR !count($args))
return;
- if(count($args) && $args[0] === 'php') {
-
- if (get_config("system", "worker")) {
- $argv = $args;
- array_shift($argv);
-
- $parameters = json_encode($argv);
- $found = q("SELECT `id` FROM `workerqueue` WHERE `parameter` = '%s'",
- dbesc($parameters));
-
- $funcname = str_replace(".php", "", basename($argv[0]))."_run";
-
- // Define the processes that have priority over any other process
- /// @todo Better check for priority processes
- $high_priority = array("delivery_run", "notifier_run", "pubsubpublish_run");
- $low_priority = array("queue_run", "gprobe_run", "discover_poco_run");
-
- if (in_array($funcname, $high_priority))
- $priority = 1;
- elseif (in_array($funcname, $low_priority))
- $priority = 3;
- else
- $priority = 2;
-
- if (!$found)
- // quickfix for the delivery problems, 2106-07-28
- /// @todo find better solution
- //q("INSERT INTO `workerqueue` (`function`, `parameter`, `created`, `priority`)
- // VALUES ('%s', '%s', '%s', %d)",
- // dbesc($funcname),
- q("INSERT INTO `workerqueue` (`parameter`, `created`, `priority`)
- VALUES ('%s', '%s', %d)",
- dbesc($parameters),
- dbesc(datetime_convert()),
- intval($priority));
-
- // Should we quit and wait for the poller to be called as a cronjob?
- if (get_config("system", "worker_dont_fork"))
- return;
-
- // Checking number of workers
- $workers = q("SELECT COUNT(*) AS `workers` FROM `workerqueue` WHERE `executed` != '0000-00-00 00:00:00'");
-
- // Get number of allowed number of worker threads
- $queues = intval(get_config("system", "worker_queues"));
-
- if ($queues == 0)
- $queues = 4;
-
- // If there are already enough workers running, don't fork another one
- if ($workers[0]["workers"] >= $queues)
- return;
-
- // Now call the poller to execute the jobs that we just added to the queue
- $args = array("php", "include/poller.php", "no_cron");
- }
-
- $args[0] = ((x($a->config,'php_path')) && (strlen($a->config['php_path'])) ? $a->config['php_path'] : 'php');
+ if (!get_config("system", "worker") OR
+ (($args[0] != 'php') AND !is_int($args[0]))) {
+ $a->proc_run($args);
+ return;
}
- // add baseurl to args. cli scripts can't construct it
- $args[] = $a->get_baseurl();
+ if (is_int($args[0]))
+ $priority = $args[0];
+ else
+ $priority = PRIORITY_MEDIUM;
- for($x = 0; $x < count($args); $x ++)
- $args[$x] = escapeshellarg($args[$x]);
+ $argv = $args;
+ array_shift($argv);
- $cmdline = implode($args," ");
+ $parameters = json_encode($argv);
+ $found = q("SELECT `id` FROM `workerqueue` WHERE `parameter` = '%s'",
+ dbesc($parameters));
- if(get_config('system','proc_windows'))
- proc_close(proc_open('cmd /c start /b ' . $cmdline,array(),$foo,dirname(__FILE__)));
- else
- proc_close(proc_open($cmdline." &",array(),$foo,dirname(__FILE__)));
+ if (!$found)
+ q("INSERT INTO `workerqueue` (`parameter`, `created`, `priority`)
+ VALUES ('%s', '%s', %d)",
+ dbesc($parameters),
+ dbesc(datetime_convert()),
+ intval($priority));
+
+ // Should we quit and wait for the poller to be called as a cronjob?
+ if (get_config("system", "worker_dont_fork"))
+ return;
+
+ // Checking number of workers
+ $workers = q("SELECT COUNT(*) AS `workers` FROM `workerqueue` WHERE `executed` != '0000-00-00 00:00:00'");
+
+ // Get number of allowed number of worker threads
+ $queues = intval(get_config("system", "worker_queues"));
+
+ if ($queues == 0)
+ $queues = 4;
+
+ // If there are already enough workers running, don't fork another one
+ if ($workers[0]["workers"] >= $queues)
+ return;
+
+ // Now call the poller to execute the jobs that we just added to the queue
+ $args = array("php", "include/poller.php", "no_cron");
+
+ $a->proc_run($args);
}
function current_theme(){
// don't delete yet, will be done later when contacts have deleted my stuff
// q("DELETE FROM `user` WHERE `uid` = %d", intval($uid));
q("UPDATE `user` SET `account_removed` = 1, `account_expires_on` = UTC_TIMESTAMP() WHERE `uid` = %d", intval($uid));
- proc_run('php', "include/notifier.php", "removeme", $uid);
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "removeme", $uid);
// Send an update to the directory
- proc_run('php', "include/directory.php", $r[0]['url']);
+ proc_run(PRIORITY_LOW, "include/directory.php", $r[0]['url']);
if($uid == local_user()) {
unset($_SESSION['authenticated']);
if ((($profile["addr"] == "") OR ($profile["name"] == "")) AND ($profile["gid"] != 0) AND
in_array($profile["network"], array(NETWORK_DFRN, NETWORK_DIASPORA, NETWORK_OSTATUS)))
- proc_run('php',"include/update_gcontact.php", $profile["gid"]);
+ proc_run(PRIORITY_LOW, "include/update_gcontact.php", $profile["gid"]);
// Show contact details of Diaspora contacts only if connected
if (($profile["cid"] == 0) AND ($profile["network"] == NETWORK_DIASPORA)) {
// run queue delivery process in the background
- proc_run('php',"include/queue.php");
+ proc_run(PRIORITY_LOW,"include/queue.php");
// run the process to discover global contacts in the background
- proc_run('php',"include/discover_poco.php");
+ proc_run(PRIORITY_LOW,"include/discover_poco.php");
// run the process to update locally stored global contacts in the background
- proc_run('php',"include/discover_poco.php", "checkcontact");
+ proc_run(PRIORITY_LOW,"include/discover_poco.php", "checkcontact");
// expire any expired accounts
update_contact_birthdays();
- proc_run('php',"include/discover_poco.php", "suggestions");
+ proc_run(PRIORITY_LOW,"include/discover_poco.php", "suggestions");
set_config('system','last_expire_day',$d2);
- proc_run('php','include/expire.php');
+ proc_run(PRIORITY_LOW,'include/expire.php');
}
// Clear cache entries
logger("Polling ".$contact["network"]." ".$contact["id"]." ".$contact["nick"]." ".$contact["name"]);
- proc_run('php','include/onepoll.php',$contact['id']);
+ proc_run(PRIORITY_MEDIUM,'include/onepoll.php',$contact['id']);
if($interval)
@time_sleep_until(microtime(true) + (float) $interval);
// pull feed and consume it, which should subscribe to the hub.
- proc_run('php',"include/onepoll.php","$contact_id", "force");
+ proc_run(PRIORITY_MEDIUM, "include/onepoll.php", $contact_id, "force");
// create a follow slap
}
}
- proc_run('php','include/gprobe.php',bin2hex($tmp_str));
+ proc_run(PRIORITY_LOW, 'include/gprobe.php',bin2hex($tmp_str));
$arr = array('zrl' => $tmp_str, 'url' => $a->cmd);
call_hooks('zrl_init',$arr);
}
check_item_notification($current_post, $uid);
if ($notify)
- proc_run('php', "include/notifier.php", $notify_type, $current_post);
+ proc_run(PRIORITY_HIGH, "include/notifier.php", $notify_type, $current_post);
return $current_post;
}
);
update_thread($item_id);
- proc_run('php','include/notifier.php','tgroup',$item_id);
+ proc_run(PRIORITY_HIGH,'include/notifier.php', 'tgroup', $item_id);
}
drop_item($item['id'],false);
}
- proc_run('php',"include/notifier.php","expire","$uid");
+ proc_run(PRIORITY_HIGH,"include/notifier.php", "expire", $uid);
}
// multiple threads may have been deleted, send an expire notification
if($uid)
- proc_run('php',"include/notifier.php","expire","$uid");
+ proc_run(PRIORITY_HIGH,"include/notifier.php", "expire", $uid);
}
// send the notification upstream/downstream as the case may be
- proc_run('php',"include/notifier.php","drop","$drop_id");
+ proc_run(PRIORITY_HIGH,"include/notifier.php", "drop", $drop_id);
if(! $interactive)
return $owner;
);
$like_item_id = $like_item['id'];
- proc_run('php',"include/notifier.php","like","$like_item_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "like", $like_item_id);
return true;
}
call_hooks('post_local_end', $arr);
- proc_run('php',"include/notifier.php","like","$post_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "like", $post_id);
return true;
}
}
if($post_id) {
- proc_run('php',"include/notifier.php","mail","$post_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "mail", $post_id);
return intval($post_id);
} else {
return -3;
/*
* The notifier is typically called with:
*
- * proc_run('php', "include/notifier.php", COMMAND, ITEM_ID);
+ * proc_run(PRIORITY_HIGH, "include/notifier.php", COMMAND, ITEM_ID);
*
* where COMMAND is one of the following:
*
// a delivery fork. private groups (forum_mode == 2) do not uplink
if((intval($parent['forum_mode']) == 1) && (! $top_level) && ($cmd !== 'uplink')) {
- proc_run('php','include/notifier.php','uplink',$item_id);
+ proc_run(PRIORITY_HIGH,'include/notifier.php','uplink',$item_id);
}
$conversants = array();
$this_batch[] = $contact['id'];
if(count($this_batch) >= $deliveries_per_process) {
- proc_run('php','include/delivery.php',$cmd,$item_id,$this_batch);
+ proc_run(PRIORITY_HIGH,'include/delivery.php',$cmd,$item_id,$this_batch);
$this_batch = array();
if($interval)
@time_sleep_until(microtime(true) + (float) $interval);
// be sure to pick up any stragglers
if(count($this_batch))
- proc_run('php','include/delivery.php',$cmd,$item_id,$this_batch);
+ proc_run(PRIORITY_HIGH,'include/delivery.php',$cmd,$item_id,$this_batch);
}
// send salmon slaps to mentioned remote tags (@foo@example.com) in OStatus posts
if((! $mail) && (! $fsuggest) && (! $followup)) {
logger('notifier: delivery agent: '.$rr['name'].' '.$rr['id'].' '.$rr['network'].' '.$target_item["guid"]);
- proc_run('php','include/delivery.php',$cmd,$item_id,$rr['id']);
+ proc_run(PRIORITY_HIGH,'include/delivery.php',$cmd,$item_id,$rr['id']);
if($interval)
@time_sleep_until(microtime(true) + (float) $interval);
}
}
// Handling the pubsubhubbub requests
- proc_run('php','include/pubsubpublish.php');
+ proc_run(PRIORITY_HIGH,'include/pubsubpublish.php');
}
logger('notifier: calling hooks', LOGGER_DEBUG);
logger("Current load: ".$load." - maximum: ".$maxsysload." - current queues: ".$active."/".$entries." - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG);
+ // Are there fewer workers running as possible? Then fork a new one.
+ if (!get_config("system", "worker_dont_fork") AND ($queues > ($active + 1)) AND ($entries > 1)) {
+ logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG);
+ $args = array("php", "include/poller.php", "no_cron");
+ $a = get_app();
+ $a->proc_run($args);
+ }
}
return($active >= $queues);
logger('queue: start');
// Handling the pubsubhubbub requests
- proc_run('php','include/pubsubpublish.php');
+ proc_run(PRIORITY_HIGH,'include/pubsubpublish.php');
$interval = ((get_config('system','delivery_interval') === false) ? 2 : intval(get_config('system','delivery_interval')));
if($r) {
foreach($r as $rr) {
logger('queue: deliverq');
- proc_run('php','include/delivery.php',$rr['cmd'],$rr['item'],$rr['contact']);
+ proc_run(PRIORITY_HIGH,'include/delivery.php',$rr['cmd'],$rr['item'],$rr['contact']);
if($interval)
@time_sleep_until(microtime(true) + (float) $interval);
}
if ($doprobing) {
logger("Last Contact: ". $last_contact_str." - Last Failure: ".$last_failure_str." - Checking: ".$contact["url"], LOGGER_DEBUG);
- proc_run('php', 'include/gprobe.php', bin2hex($contact["url"]));
+ proc_run(PRIORITY_LOW, 'include/gprobe.php', bin2hex($contact["url"]));
}
if ((count($r) > 1) AND ($gcontact_id > 0) AND ($contact["url"] != ""))
}\r
\r
// send relocate messages\r
- proc_run('php', 'include/notifier.php', 'relocate', $newuid);\r
+ proc_run(PRIORITY_HIGH, 'include/notifier.php', 'relocate', $newuid);\r
\r
info(t("Done. You can now login with your username and password"));\r
goaway($a->get_baseurl() . "/login");\r
$users = q("SELECT `uid` FROM `user` WHERE `account_removed` = 0 AND `account_expired` = 0");
foreach ($users as $user) {
- proc_run('php', 'include/notifier.php', 'relocate', $user['uid']);
+ proc_run(PRIORITY_HIGH, 'include/notifier.php', 'relocate', $user['uid']);
}
info("Relocation started. Could take a while to complete.");
intval($contact_id));
} else
// pull feed and consume it, which should subscribe to the hub.
- proc_run('php',"include/onepoll.php","$contact_id", "force");
+ proc_run(PRIORITY_MEDIUM, "include/onepoll.php", $contact_id, "force");
}
function _contact_update_profile($contact_id) {
$i = item_store($arr);
if($i)
- proc_run('php',"include/notifier.php","activity","$i");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "activity", $i);
}
}
}
$i = item_store($arr);
if($i)
- proc_run('php',"include/notifier.php","activity","$i");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "activity", $i);
}
}
}
// Add found profiles from the global directory to the local directory
- proc_run('php','include/discover_poco.php', "dirsearch", urlencode($search));
+ proc_run(PRIORITY_LOW, 'include/discover_poco.php', "dirsearch", urlencode($search));
} else {
$p = (($a->pager['page'] != 1) ? '&p=' . $a->pager['page'] : '');
$item_id = event_store($datarray);
if(! $cid)
- proc_run('php',"include/notifier.php","event","$item_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "event", $item_id);
goaway($_SESSION['return_url']);
}
intval($fsuggest_id),
intval(local_user())
);
- proc_run('php', 'include/notifier.php', 'suggest' , $fsuggest_id);
+ proc_run(PRIORITY_HIGH, 'include/notifier.php', 'suggest', $fsuggest_id);
}
info( t('Friend suggestion sent.') . EOL);
// update filetags in pconfig
file_tag_update_pconfig($uid,$categories_old,$categories_new,'category');
- proc_run('php', "include/notifier.php", 'edit_post', "$post_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", 'edit_post', $post_id);
if((x($_REQUEST,'return')) && strlen($return_path)) {
logger('return: ' . $return_path);
goaway($a->get_baseurl() . "/" . $return_path );
// Currently the only realistic fixes are to use a reliable server - which precludes shared hosting,
// or cut back on plugins which do remote deliveries.
- proc_run('php', "include/notifier.php", $notify_type, "$post_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", $notify_type, $post_id);
logger('post_complete');
intval($uid),
intval($item_id)
);
- proc_run('php',"include/notifier.php","tag","$item_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "tag", $item_id);
}
call_hooks('post_local_end', $arr);
- proc_run('php',"include/notifier.php","like","$post_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "like", $post_id);
return;
}
// send the notification upstream/downstream as the case may be
if($rr['visible'])
- proc_run('php',"include/notifier.php","drop","$drop_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "drop", $drop_id);
}
}
}
$drop_id = intval($i[0]['id']);
if($i[0]['visible'])
- proc_run('php',"include/notifier.php","drop","$drop_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "drop", $drop_id);
}
}
$item_id = item_store($arr);
if($item_id) {
- proc_run('php',"include/notifier.php","tag","$item_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "tag", $item_id);
}
}
$item_id = item_store($arr);
if($visible)
- proc_run('php', "include/notifier.php", 'wall-new', $item_id);
+ proc_run(PRIORITY_HIGH, "include/notifier.php", 'wall-new', $item_id);
call_hooks('photo_post_end',intval($item_id));
// intval($uid),
// intval($item_id)
//);
- proc_run('php',"include/notifier.php","tag","$item_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "tag", $item_id);
}
call_hooks('post_local_end', $arr);
- proc_run('php',"include/notifier.php","like","$post_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "like", $post_id);
return;
}
// Update global directory in background
$url = $a->get_baseurl() . '/profile/' . $a->user['nickname'];
if($url && strlen(get_config('system','directory')))
- proc_run('php',"include/directory.php","$url");
+ proc_run(PRIORITY_LOW, "include/directory.php", $url);
require_once('include/profile_update.php');
profile_change();
// Update global directory in background
$url = $_SESSION['my_url'];
if($url && strlen(get_config('system','directory')))
- proc_run('php',"include/directory.php","$url");
+ proc_run(PRIORITY_LOW, "include/directory.php", $url);
goaway($a->get_baseurl() . '/profiles');
return; // NOTREACHED
// Update global directory in background
$url = $_SESSION['my_url'];
if($url && strlen(get_config('system','directory')))
- proc_run('php',"include/directory.php","$url");
+ proc_run(PRIORITY_LOW, "include/directory.php", $url);
require_once('include/profile_update.php');
profile_change();
$arr['deny_gid'] = $a->user['deny_gid'];
$i = item_store($arr);
- if($i) {
- proc_run('php',"include/notifier.php","activity","$i");
- }
+ if($i)
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "activity", $i);
}
if($netpublish && $a->config['register_policy'] != REGISTER_APPROVE) {
$url = $a->get_baseurl() . '/profile/' . $user['nickname'];
- proc_run('php',"include/directory.php","$url");
+ proc_run(PRIORITY_LOW, "include/directory.php", $url);
}
$using_invites = get_config('system','invitation_only');
if(count($r) && $r[0]['net-publish']) {
$url = $a->get_baseurl() . '/profile/' . $user[0]['nickname'];
if($url && strlen(get_config('system','directory')))
- proc_run('php',"include/directory.php","$url");
+ proc_run(PRIORITY_LOW, "include/directory.php", $url);
}
push_lang($register[0]['language']);
check_form_security_token_redirectOnErr('/settings', 'settings');
if (x($_POST,'resend_relocate')) {
- proc_run('php', 'include/notifier.php', 'relocate', local_user());
+ proc_run(PRIORITY_HIGH, 'include/notifier.php', 'relocate', local_user());
info(t("Relocate message has been send to your contacts"));
goaway('settings');
}
// Update global directory in background
$url = $_SESSION['my_url'];
if($url && strlen(get_config('system','directory')))
- proc_run('php',"include/directory.php","$url");
+ proc_run(PRIORITY_LOW, "include/directory.php", $url);
}
require_once('include/profile_update.php');
call_hooks('post_local_end', $arr);
- proc_run('php',"include/notifier.php","tag","$post_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "tag", $post_id);
killme();
$drop_id = intval($i[0]['id']);
if($i[0]['visible'])
- proc_run('php',"include/notifier.php","drop","$drop_id");
+ proc_run(PRIORITY_HIGH, "include/notifier.php", "drop", $drop_id);
}
}
if (!$r)
return UPDATE_FAILED;
- proc_run('php',"include/threadupdate.php");
+ proc_run(PRIORITY_LOW, "include/threadupdate.php");
return UPDATE_SUCCESS;
}
set_config('system','community_page_style', CP_NO_COMMUNITY_PAGE);
// Update the central item storage with uid=0
- proc_run('php',"include/threadupdate.php");
+ proc_run(PRIORITY_LOW, "include/threadupdate.php");
return UPDATE_SUCCESS;
}
function update_1180() {
// Fill the new fields in the term table.
- proc_run('php',"include/tagupdate.php");
+ proc_run(PRIORITY_LOW, "include/tagupdate.php");
return UPDATE_SUCCESS;
}