]> git.mxchange.org Git - friendica.git/blob - include/poller.php
29a31a96f8b15b6f1139565010790549dd3a4965
[friendica.git] / include / poller.php
1 <?php
2
3 use Friendica\App;
4 use Friendica\Core\Config;
5 use Friendica\Util\Lock;
6
7 if (!file_exists("boot.php") && (sizeof($_SERVER["argv"]) != 0)) {
8         $directory = dirname($_SERVER["argv"][0]);
9
10         if (substr($directory, 0, 1) != "/") {
11                 $directory = $_SERVER["PWD"]."/".$directory;
12         }
13         $directory = realpath($directory."/..");
14
15         chdir($directory);
16 }
17
18 require_once("boot.php");
19
20 function poller_run($argv, $argc){
21         global $a, $db, $poller_up_start;
22
23         $poller_up_start = microtime(true);
24
25         $a = new App(dirname(__DIR__));
26
27         @include(".htconfig.php");
28         require_once("include/dba.php");
29         $db = new dba($db_host, $db_user, $db_pass, $db_data);
30         unset($db_host, $db_user, $db_pass, $db_data);
31
32         Config::load();
33
34         // Quit when in maintenance
35         if (Config::get('system', 'maintenance', true)) {
36                 return;
37         }
38
39         $a->set_baseurl(Config::get('system', 'url'));
40
41         load_hooks();
42
43         // At first check the maximum load. We shouldn't continue with a high load
44         if ($a->maxload_reached()) {
45                 logger('Pre check: maximum load reached, quitting.', LOGGER_DEBUG);
46                 return;
47         }
48
49         // We now start the process. This is done after the load check since this could increase the load.
50         $a->start_process();
51
52         // Kill stale processes every 5 minutes
53         $last_cleanup = Config::get('system', 'poller_last_cleaned', 0);
54         if (time() > ($last_cleanup + 300)) {
55                 Config::set('system', 'poller_last_cleaned', time());
56                 poller_kill_stale_workers();
57         }
58
59         // Count active workers and compare them with a maximum value that depends on the load
60         if (poller_too_much_workers()) {
61                 logger('Pre check: Active worker limit reached, quitting.', LOGGER_DEBUG);
62                 return;
63         }
64
65         // Do we have too few memory?
66         if ($a->min_memory_reached()) {
67                 logger('Pre check: Memory limit reached, quitting.', LOGGER_DEBUG);
68                 return;
69         }
70
71         // Possibly there are too much database connections
72         if (poller_max_connections_reached()) {
73                 logger('Pre check: maximum connections reached, quitting.', LOGGER_DEBUG);
74                 return;
75         }
76
77         // Possibly there are too much database processes that block the system
78         if ($a->max_processes_reached()) {
79                 logger('Pre check: maximum processes reached, quitting.', LOGGER_DEBUG);
80                 return;
81         }
82
83         // Now we start additional cron processes if we should do so
84         if (($argc <= 1) || ($argv[1] != "no_cron")) {
85                 poller_run_cron();
86         }
87
88         $starttime = time();
89
90         // We fetch the next queue entry that is about to be executed
91         while ($r = poller_worker_process()) {
92                 foreach ($r AS $entry) {
93                         // Assure that the priority is an integer value
94                         $entry['priority'] = (int)$entry['priority'];
95
96                         // The work will be done
97                         if (!poller_execute($entry)) {
98                                 logger('Process execution failed, quitting.', LOGGER_DEBUG);
99                                 return;
100                         }
101                 }
102
103                 // To avoid the quitting of multiple pollers only one poller at a time will execute the check
104                 if (Lock::set('poller_worker', 0)) {
105                         // Count active workers and compare them with a maximum value that depends on the load
106                         if (poller_too_much_workers()) {
107                                 logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
108                                 return;
109                         }
110
111                         // Check free memory
112                         if ($a->min_memory_reached()) {
113                                 logger('Memory limit reached, quitting.', LOGGER_DEBUG);
114                                 return;
115                         }
116                         Lock::remove('poller_worker');
117                 }
118
119                 // Quit the poller once every 5 minutes
120                 if (time() > ($starttime + 300)) {
121                         logger('Process lifetime reached, quitting.', LOGGER_DEBUG);
122                         return;
123                 }
124         }
125         logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
126 }
127
128 /**
129  * @brief Returns the number of non executed entries in the worker queue
130  *
131  * @return integer Number of non executed entries in the worker queue
132  */
133 function poller_total_entries() {
134         $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s' AND NOT `done`", dbesc(NULL_DATE));
135         if (dbm::is_result($s)) {
136                 return $s[0]["total"];
137         } else {
138                 return 0;
139         }
140 }
141
142 /**
143  * @brief Returns the highest priority in the worker queue that isn't executed
144  *
145  * @return integer Number of active poller processes
146  */
147 function poller_highest_priority() {
148         $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' AND NOT `done` ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
149         if (dbm::is_result($s)) {
150                 return $s[0]["priority"];
151         } else {
152                 return 0;
153         }
154 }
155
156 /**
157  * @brief Returns if a process with the given priority is running
158  *
159  * @param integer $priority The priority that should be checked
160  *
161  * @return integer Is there a process running with that priority?
162  */
163 function poller_process_with_priority_active($priority) {
164         $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' AND NOT `done` LIMIT 1",
165                         intval($priority), dbesc(NULL_DATE));
166         return dbm::is_result($s);
167 }
168
169 /**
170  * @brief Execute a worker entry
171  *
172  * @param array $queue Workerqueue entry
173  *
174  * @return boolean "true" if further processing should be stopped
175  */
176 function poller_execute($queue) {
177         global $poller_db_duration;
178
179         $a = get_app();
180
181         $mypid = getmypid();
182
183         // Quit when in maintenance
184         if (Config::get('system', 'maintenance', true)) {
185                 logger("Maintenance mode - quit process ".$mypid, LOGGER_DEBUG);
186                 return false;
187         }
188
189         // Constantly check the number of parallel database processes
190         if ($a->max_processes_reached()) {
191                 logger("Max processes reached for process ".$mypid, LOGGER_DEBUG);
192                 return false;
193         }
194
195         // Constantly check the number of available database connections to let the frontend be accessible at any time
196         if (poller_max_connections_reached()) {
197                 logger("Max connection reached for process ".$mypid, LOGGER_DEBUG);
198                 return false;
199         }
200
201         $argv = json_decode($queue["parameter"]);
202
203         // Check for existance and validity of the include file
204         $include = $argv[0];
205
206         if (!validate_include($include)) {
207                 logger("Include file ".$argv[0]." is not valid!");
208                 dba::delete('workerqueue', array('id' => $queue["id"]));
209                 return true;
210         }
211
212         require_once($include);
213
214         $funcname = str_replace(".php", "", basename($argv[0]))."_run";
215
216         if (function_exists($funcname)) {
217                 poller_exec_function($queue, $funcname, $argv);
218
219                 $stamp = (float)microtime(true);
220                 dba::update('workerqueue', array('done' => true), array('id' => $queue["id"]));
221                 $poller_db_duration = (microtime(true) - $stamp);
222         } else {
223                 logger("Function ".$funcname." does not exist");
224                 dba::delete('workerqueue', array('id' => $queue["id"]));
225         }
226
227         return true;
228 }
229
230 /**
231  * @brief Execute a function from the queue
232  *
233  * @param array $queue Workerqueue entry
234  * @param string $funcname name of the function
235  * @param array $argv Array of values to be passed to the function
236  */
237 function poller_exec_function($queue, $funcname, $argv) {
238         global $poller_up_start, $poller_db_duration;
239
240         $a = get_app();
241
242         $mypid = getmypid();
243
244         $argc = count($argv);
245
246         logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." ".$queue["parameter"]);
247
248         $stamp = (float)microtime(true);
249
250         // We use the callstack here to analyze the performance of executed worker entries.
251         // For this reason the variables have to be initialized.
252         if (Config::get("system", "profiler")) {
253                 $a->performance["start"] = microtime(true);
254                 $a->performance["database"] = 0;
255                 $a->performance["database_write"] = 0;
256                 $a->performance["network"] = 0;
257                 $a->performance["file"] = 0;
258                 $a->performance["rendering"] = 0;
259                 $a->performance["parser"] = 0;
260                 $a->performance["marktime"] = 0;
261                 $a->performance["markstart"] = microtime(true);
262                 $a->callstack = array();
263         }
264
265         // For better logging create a new process id for every worker call
266         // But preserve the old one for the worker
267         $old_process_id = $a->process_id;
268         $a->process_id = uniqid("wrk", true);
269         $a->queue = $queue;
270
271         $up_duration = number_format(microtime(true) - $poller_up_start, 3);
272
273         $funcname($argv, $argc);
274
275         $a->process_id = $old_process_id;
276         unset($a->queue);
277
278         $duration = number_format(microtime(true) - $stamp, 3);
279
280         $poller_up_start = microtime(true);
281
282         /* With these values we can analyze how effective the worker is.
283          * The database and rest time should be low since this is the unproductive time.
284          * The execution time is the productive time.
285          * By changing parameters like the maximum number of workers we can check the effectivness.
286         */
287         logger('DB: '.number_format($poller_db_duration, 2).' - Rest: '.number_format($up_duration - $poller_db_duration, 2).' - Execution: '.number_format($duration, 2), LOGGER_DEBUG);
288
289         if ($duration > 3600) {
290                 logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 1 hour (".round($duration/60, 3).")", LOGGER_DEBUG);
291         } elseif ($duration > 600) {
292                 logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 10 minutes (".round($duration/60, 3).")", LOGGER_DEBUG);
293         } elseif ($duration > 300) {
294                 logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 5 minutes (".round($duration/60, 3).")", LOGGER_DEBUG);
295         } elseif ($duration > 120) {
296                 logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 2 minutes (".round($duration/60, 3).")", LOGGER_DEBUG);
297         }
298
299         logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." - done in ".$duration." seconds.");
300
301         // Write down the performance values into the log
302         if (Config::get("system", "profiler")) {
303                 $duration = microtime(true)-$a->performance["start"];
304
305                 if (Config::get("rendertime", "callstack")) {
306                         if (isset($a->callstack["database"])) {
307                                 $o = "\nDatabase Read:\n";
308                                 foreach ($a->callstack["database"] AS $func => $time) {
309                                         $time = round($time, 3);
310                                         if ($time > 0) {
311                                                 $o .= $func.": ".$time."\n";
312                                         }
313                                 }
314                         }
315                         if (isset($a->callstack["database_write"])) {
316                                 $o .= "\nDatabase Write:\n";
317                                 foreach ($a->callstack["database_write"] AS $func => $time) {
318                                         $time = round($time, 3);
319                                         if ($time > 0) {
320                                                 $o .= $func.": ".$time."\n";
321                                         }
322                                 }
323                         }
324                         if (isset($a->callstack["network"])) {
325                                 $o .= "\nNetwork:\n";
326                                 foreach ($a->callstack["network"] AS $func => $time) {
327                                         $time = round($time, 3);
328                                         if ($time > 0) {
329                                                 $o .= $func.": ".$time."\n";
330                                         }
331                                 }
332                         }
333                 } else {
334                         $o = '';
335                 }
336
337                 logger("ID ".$queue["id"].": ".$funcname.": ".sprintf("DB: %s/%s, Net: %s, I/O: %s, Other: %s, Total: %s".$o,
338                         number_format($a->performance["database"] - $a->performance["database_write"], 2),
339                         number_format($a->performance["database_write"], 2),
340                         number_format($a->performance["network"], 2),
341                         number_format($a->performance["file"], 2),
342                         number_format($duration - ($a->performance["database"] + $a->performance["network"] + $a->performance["file"]), 2),
343                         number_format($duration, 2)),
344                         LOGGER_DEBUG);
345         }
346
347         $cooldown = Config::get("system", "worker_cooldown", 0);
348
349         if ($cooldown > 0) {
350                 logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." - in cooldown for ".$cooldown." seconds");
351                 sleep($cooldown);
352         }
353 }
354
355 /**
356  * @brief Checks if the number of database connections has reached a critical limit.
357  *
358  * @return bool Are more than 3/4 of the maximum connections used?
359  */
360 function poller_max_connections_reached() {
361
362         // Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself.
363         $max = Config::get("system", "max_connections");
364
365         // Fetch the percentage level where the poller will get active
366         $maxlevel = Config::get("system", "max_connections_level", 75);
367
368         if ($max == 0) {
369                 // the maximum number of possible user connections can be a system variable
370                 $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'");
371                 if (dbm::is_result($r)) {
372                         $max = $r[0]["Value"];
373                 }
374                 // Or it can be granted. This overrides the system variable
375                 $r = q("SHOW GRANTS");
376                 if (dbm::is_result($r)) {
377                         foreach ($r AS $grants) {
378                                 $grant = array_pop($grants);
379                                 if (stristr($grant, "GRANT USAGE ON")) {
380                                         if (preg_match("/WITH MAX_USER_CONNECTIONS (\d*)/", $grant, $match)) {
381                                                 $max = $match[1];
382                                         }
383                                 }
384                         }
385                 }
386         }
387
388         // If $max is set we will use the processlist to determine the current number of connections
389         // The processlist only shows entries of the current user
390         if ($max != 0) {
391                 $r = q("SHOW PROCESSLIST");
392                 if (!dbm::is_result($r)) {
393                         return false;
394                 }
395                 $used = count($r);
396
397                 logger("Connection usage (user values): ".$used."/".$max, LOGGER_DEBUG);
398
399                 $level = ($used / $max) * 100;
400
401                 if ($level >= $maxlevel) {
402                         logger("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max);
403                         return true;
404                 }
405         }
406
407         // We will now check for the system values.
408         // This limit could be reached although the user limits are fine.
409         $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_connections'");
410         if (!dbm::is_result($r)) {
411                 return false;
412         }
413         $max = intval($r[0]["Value"]);
414         if ($max == 0) {
415                 return false;
416         }
417         $r = q("SHOW STATUS WHERE `variable_name` = 'Threads_connected'");
418         if (!dbm::is_result($r)) {
419                 return false;
420         }
421         $used = intval($r[0]["Value"]);
422         if ($used == 0) {
423                 return false;
424         }
425         logger("Connection usage (system values): ".$used."/".$max, LOGGER_DEBUG);
426
427         $level = $used / $max * 100;
428
429         if ($level < $maxlevel) {
430                 return false;
431         }
432         logger("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
433         return true;
434 }
435
436 /**
437  * @brief fix the queue entry if the worker process died
438  *
439  */
440 function poller_kill_stale_workers() {
441         $r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > '%s' AND NOT `done`", dbesc(NULL_DATE));
442
443         if (!dbm::is_result($r)) {
444                 // No processing here needed
445                 return;
446         }
447
448         foreach ($r AS $pid) {
449                 if (!posix_kill($pid["pid"], 0)) {
450                         dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
451                                         array('pid' => $pid["pid"]));
452                 } else {
453                         // Kill long running processes
454
455                         // Check if the priority is in a valid range
456                         if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) {
457                                 $pid["priority"] = PRIORITY_MEDIUM;
458                         }
459
460                         // Define the maximum durations
461                         $max_duration_defaults = array(PRIORITY_CRITICAL => 360, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 360);
462                         $max_duration = $max_duration_defaults[$pid["priority"]];
463
464                         $argv = json_decode($pid["parameter"]);
465                         $argv[0] = basename($argv[0]);
466
467                         // How long is the process already running?
468                         $duration = (time() - strtotime($pid["executed"])) / 60;
469                         if ($duration > $max_duration) {
470                                 logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now.");
471                                 posix_kill($pid["pid"], SIGTERM);
472
473                                 // We killed the stale process.
474                                 // To avoid a blocking situation we reschedule the process at the beginning of the queue.
475                                 // Additionally we are lowering the priority.
476                                 dba::update('workerqueue',
477                                                 array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0),
478                                                 array('pid' => $pid["pid"]));
479                         } else {
480                                 logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
481                         }
482                 }
483         }
484 }
485
486 /**
487  * @brief Checks if the number of active workers exceeds the given limits
488  *
489  * @return bool Are there too much workers running?
490  */
491 function poller_too_much_workers() {
492         $queues = Config::get("system", "worker_queues", 4);
493
494         $maxqueues = $queues;
495
496         $active = poller_active_workers();
497
498         // Decrease the number of workers at higher load
499         $load = current_load();
500         if ($load) {
501                 $maxsysload = intval(Config::get("system", "maxloadavg", 50));
502
503                 $maxworkers = $queues;
504
505                 // Some magical mathemathics to reduce the workers
506                 $exponent = 3;
507                 $slope = $maxworkers / pow($maxsysload, $exponent);
508                 $queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent));
509
510                 if (Config::get('system', 'worker_debug')) {
511                         // Create a list of queue entries grouped by their priority
512                         $listitem = array();
513
514                         // Adding all processes with no workerqueue entry
515                         $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid` AND NOT `done`)");
516                         if ($process = dba::fetch($processes)) {
517                                 $listitem[0] = "0:".$process["running"];
518                         }
519                         dba::close($processes);
520
521                         // Now adding all processes with workerqueue entries
522                         $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` GROUP BY `priority`");
523                         while ($entry = dba::fetch($entries)) {
524                                 $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done` WHERE `priority` = ?", $entry["priority"]);
525                                 if ($process = dba::fetch($processes)) {
526                                         $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"];
527                                 }
528                                 dba::close($processes);
529                         }
530                         dba::close($entries);
531                         $processlist = ' ('.implode(', ', $listitem).')';
532                 }
533
534                 $entries = poller_total_entries();
535
536                 if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) {
537                         $top_priority = poller_highest_priority();
538                         $high_running = poller_process_with_priority_active($top_priority);
539
540                         if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) {
541                                 logger("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", LOGGER_DEBUG);
542                                 $queues = $active + 1;
543                         }
544                 }
545
546                 logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries.$processlist." - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG);
547
548                 // Are there fewer workers running as possible? Then fork a new one.
549                 if (!Config::get("system", "worker_dont_fork") && ($queues > ($active + 1)) && ($entries > 1)) {
550                         logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG);
551                         $args = array("include/poller.php", "no_cron");
552                         $a = get_app();
553                         $a->proc_run($args);
554                 }
555         }
556
557         return $active >= $queues;
558 }
559
560 /**
561  * @brief Returns the number of active poller processes
562  *
563  * @return integer Number of active poller processes
564  */
565 function poller_active_workers() {
566         $workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'poller.php'");
567
568         return $workers[0]["processes"];
569 }
570
571 /**
572  * @brief Check if we should pass some slow processes
573  *
574  * When the active processes of the highest priority are using more than 2/3
575  * of all processes, we let pass slower processes.
576  *
577  * @param string $highest_priority Returns the currently highest priority
578  * @return bool We let pass a slower process than $highest_priority
579  */
580 function poller_passing_slow(&$highest_priority) {
581
582         $highest_priority = 0;
583
584         $r = q("SELECT `priority`
585                 FROM `process`
586                 INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done`");
587
588         // No active processes at all? Fine
589         if (!dbm::is_result($r)) {
590                 return false;
591         }
592         $priorities = array();
593         foreach ($r AS $line) {
594                 $priorities[] = $line["priority"];
595         }
596         // Should not happen
597         if (count($priorities) == 0) {
598                 return false;
599         }
600         $highest_priority = min($priorities);
601
602         // The highest process is already the slowest one?
603         // Then we quit
604         if ($highest_priority == PRIORITY_NEGLIGIBLE) {
605                 return false;
606         }
607         $high = 0;
608         foreach ($priorities AS $priority) {
609                 if ($priority == $highest_priority) {
610                         ++$high;
611                 }
612         }
613         logger("Highest priority: ".$highest_priority." Total processes: ".count($priorities)." Count high priority processes: ".$high, LOGGER_DEBUG);
614         $passing_slow = (($high/count($priorities)) > (2/3));
615
616         if ($passing_slow) {
617                 logger("Passing slower processes than priority ".$highest_priority, LOGGER_DEBUG);
618         }
619         return $passing_slow;
620 }
621
622 /**
623  * @brief Find and claim the next worker process for us
624  *
625  * @return boolean Have we found something?
626  */
627 function find_worker_processes() {
628         // Check if we should pass some low priority process
629         $highest_priority = 0;
630         $found = false;
631         $limit = Config::get('system', 'worker_fetch_limit', 5);
632
633         if (poller_passing_slow($highest_priority)) {
634                 // Are there waiting processes with a higher priority than the currently highest?
635                 $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
636                                         WHERE `executed` <= ? AND `priority` < ? AND NOT `done`
637                                         ORDER BY `priority`, `created` LIMIT ".intval($limit),
638                                 datetime_convert(), getmypid(), NULL_DATE, $highest_priority);
639                 if ($result) {
640                         $found = (dba::affected_rows() > 0);
641                 }
642
643                 if (!$found) {
644                         // Give slower processes some processing time
645                         $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
646                                                 WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
647                                                 ORDER BY `priority`, `created` LIMIT 1",
648                                         datetime_convert(), getmypid(), NULL_DATE, $highest_priority);
649                         if ($result) {
650                                 $found = (dba::affected_rows() > 0);
651                         }
652                 }
653         }
654
655         // If there is no result (or we shouldn't pass lower processes) we check without priority limit
656         if (!$found) {
657                 $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit),
658                                 datetime_convert(), getmypid(), NULL_DATE);
659                 if ($result) {
660                         $found = (dba::affected_rows() > 0);
661                 }
662         }
663         return $found;
664 }
665
666 /**
667  * @brief Returns the next worker process
668  *
669  * @return string SQL statement
670  */
671 function poller_worker_process() {
672         global $poller_db_duration;
673
674         $stamp = (float)microtime(true);
675
676         $found = find_worker_processes();
677
678         $poller_db_duration += (microtime(true) - $stamp);
679
680         if ($found) {
681                 $r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d", intval(getmypid()));
682         }
683         return $r;
684 }
685
686 /**
687  * @brief Removes a workerqueue entry from the current process
688  */
689 function poller_unclaim_process() {
690         $mypid = getmypid();
691
692         dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), array('pid' => $mypid));
693 }
694
695 /**
696  * @brief Call the front end worker
697  */
698 function call_worker() {
699         if (!Config::get("system", "frontend_worker")) {
700                 return;
701         }
702
703         $url = App::get_baseurl()."/worker";
704         fetch_url($url, false, $redirects, 1);
705 }
706
707 /**
708  * @brief Call the front end worker if there aren't any active
709  */
710 function call_worker_if_idle() {
711         if (!Config::get("system", "frontend_worker")) {
712                 return;
713         }
714
715         // Do we have "proc_open"? Then we can fork the poller
716         if (function_exists("proc_open")) {
717                 // When was the last time that we called the worker?
718                 // Less than one minute? Then we quit
719                 if ((time() - Config::get("system", "worker_started")) < 60) {
720                         return;
721                 }
722
723                 set_config("system", "worker_started", time());
724
725                 // Do we have enough running workers? Then we quit here.
726                 if (poller_too_much_workers()) {
727                         // Cleaning dead processes
728                         poller_kill_stale_workers();
729                         get_app()->remove_inactive_processes();
730
731                         return;
732                 }
733
734                 poller_run_cron();
735
736                 logger('Call poller', LOGGER_DEBUG);
737
738                 $args = array("include/poller.php", "no_cron");
739                 $a = get_app();
740                 $a->proc_run($args);
741                 return;
742         }
743
744         // We cannot execute background processes.
745         // We now run the processes from the frontend.
746         // This won't work with long running processes.
747         poller_run_cron();
748
749         clear_worker_processes();
750
751         $workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'worker.php'");
752
753         if ($workers[0]["processes"] == 0) {
754                 call_worker();
755         }
756 }
757
758 /**
759  * @brief Removes long running worker processes
760  */
761 function clear_worker_processes() {
762         $timeout = Config::get("system", "frontend_worker_timeout", 10);
763
764         /// @todo We should clean up the corresponding workerqueue entries as well
765         q("DELETE FROM `process` WHERE `created` < '%s' AND `command` = 'worker.php'",
766                 dbesc(datetime_convert('UTC','UTC',"now - ".$timeout." minutes")));
767 }
768
769 /**
770  * @brief Runs the cron processes
771  */
772 function poller_run_cron() {
773         logger('Add cron entries', LOGGER_DEBUG);
774
775         // Check for spooled items
776         proc_run(PRIORITY_HIGH, "include/spool_post.php");
777
778         // Run the cron job that calls all other jobs
779         proc_run(PRIORITY_MEDIUM, "include/cron.php");
780
781         // Run the cronhooks job separately from cron for being able to use a different timing
782         proc_run(PRIORITY_MEDIUM, "include/cronhooks.php");
783
784         // Cleaning dead processes
785         poller_kill_stale_workers();
786 }
787
788 if (array_search(__file__,get_included_files())===0){
789         poller_run($_SERVER["argv"],$_SERVER["argc"]);
790
791         poller_unclaim_process();
792
793         get_app()->end_process();
794
795         Lock::remove('poller_worker');
796
797         killme();
798 }