]> git.mxchange.org Git - friendica.git/blob - include/poller.php
Replace AND and OR in PHP conditions by && and ||
[friendica.git] / include / poller.php
1 <?php
2
3 use Friendica\App;
4 use Friendica\Core\Config;
5 use Friendica\Util\Lock;
6
7 if (!file_exists("boot.php") && (sizeof($_SERVER["argv"]) != 0)) {
8         $directory = dirname($_SERVER["argv"][0]);
9
10         if (substr($directory, 0, 1) != "/") {
11                 $directory = $_SERVER["PWD"]."/".$directory;
12         }
13         $directory = realpath($directory."/..");
14
15         chdir($directory);
16 }
17
18 require_once("boot.php");
19
20 function poller_run($argv, $argc){
21         global $a, $db;
22
23         $a = new App(dirname(__DIR__));
24
25         @include(".htconfig.php");
26         require_once("include/dba.php");
27         $db = new dba($db_host, $db_user, $db_pass, $db_data);
28         unset($db_host, $db_user, $db_pass, $db_data);
29
30         Config::load();
31
32         // Quit when in maintenance
33         if (Config::get('system', 'maintenance', true)) {
34                 return;
35         }
36
37         $a->set_baseurl(Config::get('system', 'url'));
38
39         load_hooks();
40
41         // At first check the maximum load. We shouldn't continue with a high load
42         if ($a->maxload_reached()) {
43                 logger('Pre check: maximum load reached, quitting.', LOGGER_DEBUG);
44                 return;
45         }
46
47         // We now start the process. This is done after the load check since this could increase the load.
48         $a->start_process();
49
50         // At first we check the number of workers and quit if there are too much of them
51         // This is done at the top to avoid that too much code is executed without a need to do so,
52         // since the poller mostly quits here.
53         if (poller_too_much_workers()) {
54                 poller_kill_stale_workers();
55                 logger('Pre check: Active worker limit reached, quitting.', LOGGER_DEBUG);
56                 return;
57         }
58
59         // Do we have too few memory?
60         if ($a->min_memory_reached()) {
61                 logger('Pre check: Memory limit reached, quitting.', LOGGER_DEBUG);
62                 return;
63         }
64
65         // Possibly there are too much database connections
66         if (poller_max_connections_reached()) {
67                 logger('Pre check: maximum connections reached, quitting.', LOGGER_DEBUG);
68                 return;
69         }
70
71         // Possibly there are too much database processes that block the system
72         if ($a->max_processes_reached()) {
73                 logger('Pre check: maximum processes reached, quitting.', LOGGER_DEBUG);
74                 return;
75         }
76
77         // Now we start additional cron processes if we should do so
78         if (($argc <= 1) || ($argv[1] != "no_cron")) {
79                 poller_run_cron();
80         }
81
82         $starttime = time();
83
84         // We fetch the next queue entry that is about to be executed
85         while ($r = poller_worker_process()) {
86
87                 // If we got that queue entry we claim it for us
88                 if (!poller_claim_process($r[0])) {
89                         continue;
90                 }
91
92                 // To avoid the quitting of multiple pollers only one poller at a time will execute the check
93                 if (Lock::set('poller_worker', 0)) {
94                         // Count active workers and compare them with a maximum value that depends on the load
95                         if (poller_too_much_workers()) {
96                                 logger('Active worker limit reached, quitting.', LOGGER_DEBUG);
97                                 return;
98                         }
99
100                         // Check free memory
101                         if ($a->min_memory_reached()) {
102                                 logger('Memory limit reached, quitting.', LOGGER_DEBUG);
103                                 return;
104                         }
105                         Lock::remove('poller_worker');
106                 }
107
108                 // finally the work will be done
109                 if (!poller_execute($r[0])) {
110                         logger('Process execution failed, quitting.', LOGGER_DEBUG);
111                         return;
112                 }
113
114                 // Quit the poller once every hour
115                 if (time() > ($starttime + 3600)) {
116                         logger('Process lifetime reached, quitting.', LOGGER_DEBUG);
117                         return;
118                 }
119         }
120         logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
121 }
122
123 /**
124  * @brief Execute a worker entry
125  *
126  * @param array $queue Workerqueue entry
127  *
128  * @return boolean "true" if further processing should be stopped
129  */
130 function poller_execute($queue) {
131
132         $a = get_app();
133
134         $mypid = getmypid();
135
136         // Quit when in maintenance
137         if (Config::get('system', 'maintenance', true)) {
138                 logger("Maintenance mode - quit process ".$mypid, LOGGER_DEBUG);
139                 return false;
140         }
141
142         // Constantly check the number of parallel database processes
143         if ($a->max_processes_reached()) {
144                 logger("Max processes reached for process ".$mypid, LOGGER_DEBUG);
145                 return false;
146         }
147
148         // Constantly check the number of available database connections to let the frontend be accessible at any time
149         if (poller_max_connections_reached()) {
150                 logger("Max connection reached for process ".$mypid, LOGGER_DEBUG);
151                 return false;
152         }
153
154         $argv = json_decode($queue["parameter"]);
155
156         // Check for existance and validity of the include file
157         $include = $argv[0];
158
159         if (!validate_include($include)) {
160                 logger("Include file ".$argv[0]." is not valid!");
161                 dba::delete('workerqueue', array('id' => $queue["id"]));
162                 return true;
163         }
164
165         require_once($include);
166
167         $funcname = str_replace(".php", "", basename($argv[0]))."_run";
168
169         if (function_exists($funcname)) {
170                 poller_exec_function($queue, $funcname, $argv);
171                 dba::delete('workerqueue', array('id' => $queue["id"]));
172         } else {
173                 logger("Function ".$funcname." does not exist");
174         }
175
176         return true;
177 }
178
179 /**
180  * @brief Execute a function from the queue
181  *
182  * @param array $queue Workerqueue entry
183  * @param string $funcname name of the function
184  * @param array $argv Array of values to be passed to the function
185  */
186 function poller_exec_function($queue, $funcname, $argv) {
187
188         $a = get_app();
189
190         $mypid = getmypid();
191
192         $argc = count($argv);
193
194         logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." ".$queue["parameter"]);
195
196         $stamp = (float)microtime(true);
197
198         // We use the callstack here to analyze the performance of executed worker entries.
199         // For this reason the variables have to be initialized.
200         if (Config::get("system", "profiler")) {
201                 $a->performance["start"] = microtime(true);
202                 $a->performance["database"] = 0;
203                 $a->performance["database_write"] = 0;
204                 $a->performance["network"] = 0;
205                 $a->performance["file"] = 0;
206                 $a->performance["rendering"] = 0;
207                 $a->performance["parser"] = 0;
208                 $a->performance["marktime"] = 0;
209                 $a->performance["markstart"] = microtime(true);
210                 $a->callstack = array();
211         }
212
213         // For better logging create a new process id for every worker call
214         // But preserve the old one for the worker
215         $old_process_id = $a->process_id;
216         $a->process_id = uniqid("wrk", true);
217
218         $funcname($argv, $argc);
219
220         $a->process_id = $old_process_id;
221
222         $duration = number_format(microtime(true) - $stamp, 3);
223
224         if ($duration > 3600) {
225                 logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 1 hour (".round($duration/60, 3).")", LOGGER_DEBUG);
226         } elseif ($duration > 600) {
227                 logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 10 minutes (".round($duration/60, 3).")", LOGGER_DEBUG);
228         } elseif ($duration > 300) {
229                 logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 5 minutes (".round($duration/60, 3).")", LOGGER_DEBUG);
230         } elseif ($duration > 120) {
231                 logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 2 minutes (".round($duration/60, 3).")", LOGGER_DEBUG);
232         }
233
234         logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." - done in ".$duration." seconds.");
235
236         // Write down the performance values into the log
237         if (Config::get("system", "profiler")) {
238                 $duration = microtime(true)-$a->performance["start"];
239
240                 if (Config::get("rendertime", "callstack")) {
241                         if (isset($a->callstack["database"])) {
242                                 $o = "\nDatabase Read:\n";
243                                 foreach ($a->callstack["database"] AS $func => $time) {
244                                         $time = round($time, 3);
245                                         if ($time > 0) {
246                                                 $o .= $func.": ".$time."\n";
247                                         }
248                                 }
249                         }
250                         if (isset($a->callstack["database_write"])) {
251                                 $o .= "\nDatabase Write:\n";
252                                 foreach ($a->callstack["database_write"] AS $func => $time) {
253                                         $time = round($time, 3);
254                                         if ($time > 0) {
255                                                 $o .= $func.": ".$time."\n";
256                                         }
257                                 }
258                         }
259                         if (isset($a->callstack["network"])) {
260                                 $o .= "\nNetwork:\n";
261                                 foreach ($a->callstack["network"] AS $func => $time) {
262                                         $time = round($time, 3);
263                                         if ($time > 0) {
264                                                 $o .= $func.": ".$time."\n";
265                                         }
266                                 }
267                         }
268                 } else {
269                         $o = '';
270                 }
271
272                 logger("ID ".$queue["id"].": ".$funcname.": ".sprintf("DB: %s/%s, Net: %s, I/O: %s, Other: %s, Total: %s".$o,
273                         number_format($a->performance["database"] - $a->performance["database_write"], 2),
274                         number_format($a->performance["database_write"], 2),
275                         number_format($a->performance["network"], 2),
276                         number_format($a->performance["file"], 2),
277                         number_format($duration - ($a->performance["database"] + $a->performance["network"] + $a->performance["file"]), 2),
278                         number_format($duration, 2)),
279                         LOGGER_DEBUG);
280         }
281
282         $cooldown = Config::get("system", "worker_cooldown", 0);
283
284         if ($cooldown > 0) {
285                 logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." - in cooldown for ".$cooldown." seconds");
286                 sleep($cooldown);
287         }
288 }
289
290 /**
291  * @brief Checks if the number of database connections has reached a critical limit.
292  *
293  * @return bool Are more than 3/4 of the maximum connections used?
294  */
295 function poller_max_connections_reached() {
296
297         // Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself.
298         $max = Config::get("system", "max_connections");
299
300         // Fetch the percentage level where the poller will get active
301         $maxlevel = Config::get("system", "max_connections_level", 75);
302
303         if ($max == 0) {
304                 // the maximum number of possible user connections can be a system variable
305                 $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'");
306                 if (dbm::is_result($r)) {
307                         $max = $r[0]["Value"];
308                 }
309                 // Or it can be granted. This overrides the system variable
310                 $r = q("SHOW GRANTS");
311                 if (dbm::is_result($r)) {
312                         foreach ($r AS $grants) {
313                                 $grant = array_pop($grants);
314                                 if (stristr($grant, "GRANT USAGE ON")) {
315                                         if (preg_match("/WITH MAX_USER_CONNECTIONS (\d*)/", $grant, $match)) {
316                                                 $max = $match[1];
317                                         }
318                                 }
319                         }
320                 }
321         }
322
323         // If $max is set we will use the processlist to determine the current number of connections
324         // The processlist only shows entries of the current user
325         if ($max != 0) {
326                 $r = q("SHOW PROCESSLIST");
327                 if (!dbm::is_result($r)) {
328                         return false;
329                 }
330                 $used = count($r);
331
332                 logger("Connection usage (user values): ".$used."/".$max, LOGGER_DEBUG);
333
334                 $level = ($used / $max) * 100;
335
336                 if ($level >= $maxlevel) {
337                         logger("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max);
338                         return true;
339                 }
340         }
341
342         // We will now check for the system values.
343         // This limit could be reached although the user limits are fine.
344         $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_connections'");
345         if (!dbm::is_result($r)) {
346                 return false;
347         }
348         $max = intval($r[0]["Value"]);
349         if ($max == 0) {
350                 return false;
351         }
352         $r = q("SHOW STATUS WHERE `variable_name` = 'Threads_connected'");
353         if (!dbm::is_result($r)) {
354                 return false;
355         }
356         $used = intval($r[0]["Value"]);
357         if ($used == 0) {
358                 return false;
359         }
360         logger("Connection usage (system values): ".$used."/".$max, LOGGER_DEBUG);
361
362         $level = $used / $max * 100;
363
364         if ($level < $maxlevel) {
365                 return false;
366         }
367         logger("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
368         return true;
369 }
370
371 /**
372  * @brief fix the queue entry if the worker process died
373  *
374  */
375 function poller_kill_stale_workers() {
376         $r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > '%s'", dbesc(NULL_DATE));
377
378         if (!dbm::is_result($r)) {
379                 // No processing here needed
380                 return;
381         }
382
383         foreach ($r AS $pid) {
384                 if (!posix_kill($pid["pid"], 0)) {
385                         dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0),
386                                         array('pid' => $pid["pid"]));
387                 } else {
388                         // Kill long running processes
389
390                         // Check if the priority is in a valid range
391                         if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE))) {
392                                 $pid["priority"] = PRIORITY_MEDIUM;
393                         }
394
395                         // Define the maximum durations
396                         $max_duration_defaults = array(PRIORITY_CRITICAL => 360, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 360);
397                         $max_duration = $max_duration_defaults[$pid["priority"]];
398
399                         $argv = json_decode($pid["parameter"]);
400                         $argv[0] = basename($argv[0]);
401
402                         // How long is the process already running?
403                         $duration = (time() - strtotime($pid["executed"])) / 60;
404                         if ($duration > $max_duration) {
405                                 logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now.");
406                                 posix_kill($pid["pid"], SIGTERM);
407
408                                 // We killed the stale process.
409                                 // To avoid a blocking situation we reschedule the process at the beginning of the queue.
410                                 // Additionally we are lowering the priority.
411                                 dba::update('workerqueue',
412                                                 array('executed' => NULL_DATE, 'created' => datetime_convert(), 'priority' => PRIORITY_NEGLIGIBLE, 'pid' => 0),
413                                                 array('pid' => $pid["pid"]));
414                         } else {
415                                 logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
416                         }
417                 }
418         }
419 }
420
421 /**
422  * @brief Checks if the number of active workers exceeds the given limits
423  *
424  * @return bool Are there too much workers running?
425  */
426 function poller_too_much_workers() {
427         $queues = Config::get("system", "worker_queues", 4);
428
429         $maxqueues = $queues;
430
431         $active = poller_active_workers();
432
433         // Decrease the number of workers at higher load
434         $load = current_load();
435         if ($load) {
436                 $maxsysload = intval(Config::get("system", "maxloadavg", 50));
437
438                 $maxworkers = $queues;
439
440                 // Some magical mathemathics to reduce the workers
441                 $exponent = 3;
442                 $slope = $maxworkers / pow($maxsysload, $exponent);
443                 $queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent));
444
445                 // Create a list of queue entries grouped by their priority
446                 $listitem = array();
447
448                 // Adding all processes with no workerqueue entry
449                 $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)");
450                 if ($process = dba::fetch($processes)) {
451                         $listitem[0] = "0:".$process["running"];
452                 }
453                 dba::close($processes);
454
455                 // Now adding all processes with workerqueue entries
456                 $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`");
457                 while ($entry = dba::fetch($entries)) {
458                         $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]);
459                         if ($process = dba::fetch($processes)) {
460                                 $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"];
461                         }
462                         dba::close($processes);
463                 }
464                 dba::close($entries);
465                 $processlist = ' ('.implode(', ', $listitem).')';
466
467                 $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
468                 $entries = $s[0]["total"];
469
470                 if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) {
471                         $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
472                         $top_priority = $s[0]["priority"];
473
474                         $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1",
475                                 intval($top_priority), dbesc(NULL_DATE));
476                         $high_running = dbm::is_result($s);
477
478                         if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) {
479                                 logger("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", LOGGER_DEBUG);
480                                 $queues = $active + 1;
481                         }
482                 }
483
484                 logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries.$processlist." - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG);
485
486                 // Are there fewer workers running as possible? Then fork a new one.
487                 if (!Config::get("system", "worker_dont_fork") && ($queues > ($active + 1)) && ($entries > 1)) {
488                         logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG);
489                         $args = array("include/poller.php", "no_cron");
490                         $a = get_app();
491                         $a->proc_run($args);
492                 }
493         }
494
495         return $active >= $queues;
496 }
497
498 /**
499  * @brief Returns the number of active poller processes
500  *
501  * @return integer Number of active poller processes
502  */
503 function poller_active_workers() {
504         $workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'poller.php'");
505
506         return $workers[0]["processes"];
507 }
508
509 /**
510  * @brief Check if we should pass some slow processes
511  *
512  * When the active processes of the highest priority are using more than 2/3
513  * of all processes, we let pass slower processes.
514  *
515  * @param string $highest_priority Returns the currently highest priority
516  * @return bool We let pass a slower process than $highest_priority
517  */
518 function poller_passing_slow(&$highest_priority) {
519
520         $highest_priority = 0;
521
522         $r = q("SELECT `priority`
523                 FROM `process`
524                 INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid`");
525
526         // No active processes at all? Fine
527         if (!dbm::is_result($r)) {
528                 return false;
529         }
530         $priorities = array();
531         foreach ($r AS $line) {
532                 $priorities[] = $line["priority"];
533         }
534         // Should not happen
535         if (count($priorities) == 0) {
536                 return false;
537         }
538         $highest_priority = min($priorities);
539
540         // The highest process is already the slowest one?
541         // Then we quit
542         if ($highest_priority == PRIORITY_NEGLIGIBLE) {
543                 return false;
544         }
545         $high = 0;
546         foreach ($priorities AS $priority) {
547                 if ($priority == $highest_priority) {
548                         ++$high;
549                 }
550         }
551         logger("Highest priority: ".$highest_priority." Total processes: ".count($priorities)." Count high priority processes: ".$high, LOGGER_DEBUG);
552         $passing_slow = (($high/count($priorities)) > (2/3));
553
554         if ($passing_slow) {
555                 logger("Passing slower processes than priority ".$highest_priority, LOGGER_DEBUG);
556         }
557         return $passing_slow;
558 }
559
560 /**
561  * @brief Returns the next worker process
562  *
563  * @return string SQL statement
564  */
565 function poller_worker_process() {
566
567         // Check if we should pass some low priority process
568         $highest_priority = 0;
569
570         if (poller_passing_slow($highest_priority)) {
571                 dba::lock('workerqueue');
572
573                 // Are there waiting processes with a higher priority than the currently highest?
574                 $r = q("SELECT * FROM `workerqueue`
575                                 WHERE `executed` <= '%s' AND `priority` < %d
576                                 ORDER BY `priority`, `created` LIMIT 1",
577                                 dbesc(NULL_DATE),
578                                 intval($highest_priority));
579                 if (dbm::is_result($r)) {
580                         return $r;
581                 }
582                 // Give slower processes some processing time
583                 $r = q("SELECT * FROM `workerqueue`
584                                 WHERE `executed` <= '%s' AND `priority` > %d
585                                 ORDER BY `priority`, `created` LIMIT 1",
586                                 dbesc(NULL_DATE),
587                                 intval($highest_priority));
588
589                 if (dbm::is_result($r)) {
590                         return $r;
591                 }
592         } else {
593                 dba::lock('workerqueue');
594         }
595
596         // If there is no result (or we shouldn't pass lower processes) we check without priority limit
597         if (!dbm::is_result($r)) {
598                 $r = q("SELECT * FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority`, `created` LIMIT 1", dbesc(NULL_DATE));
599         }
600
601         // We only unlock the tables here, when we got no data
602         if (!dbm::is_result($r)) {
603                 dba::unlock();
604         }
605
606         return $r;
607 }
608
609 /**
610  * @brief Assigns a workerqueue entry to the current process
611  *
612  * When we are sure that the table locks are working correctly, we can remove the checks from here
613  *
614  * @param array $queue Workerqueue entry
615  *
616  * @return boolean "true" if the claiming was successful
617  */
618 function poller_claim_process($queue) {
619         $mypid = getmypid();
620
621         $success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid),
622                         array('id' => $queue["id"], 'pid' => 0));
623         dba::unlock();
624
625         if (!$success) {
626                 logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);
627                 return false;
628         }
629
630         // Assure that there are no tasks executed twice
631         $id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
632         if (!$id) {
633                 logger("Queue item ".$queue["id"]." vanished - skip this execution", LOGGER_DEBUG);
634                 return false;
635         } elseif ((strtotime($id[0]["executed"]) <= 0) || ($id[0]["pid"] == 0)) {
636                 logger("Entry for queue item ".$queue["id"]." wasn't stored - skip this execution", LOGGER_DEBUG);
637                 return false;
638         } elseif ($id[0]["pid"] != $mypid) {
639                 logger("Queue item ".$queue["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG);
640                 return false;
641         }
642         return true;
643 }
644
645 /**
646  * @brief Removes a workerqueue entry from the current process
647  */
648 function poller_unclaim_process() {
649         $mypid = getmypid();
650
651         dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), array('pid' => $mypid));
652 }
653
654 /**
655  * @brief Call the front end worker
656  */
657 function call_worker() {
658         if (!Config::get("system", "frontend_worker")) {
659                 return;
660         }
661
662         $url = App::get_baseurl()."/worker";
663         fetch_url($url, false, $redirects, 1);
664 }
665
666 /**
667  * @brief Call the front end worker if there aren't any active
668  */
669 function call_worker_if_idle() {
670         if (!Config::get("system", "frontend_worker")) {
671                 return;
672         }
673
674         // Do we have "proc_open"? Then we can fork the poller
675         if (function_exists("proc_open")) {
676                 // When was the last time that we called the worker?
677                 // Less than one minute? Then we quit
678                 if ((time() - Config::get("system", "worker_started")) < 60) {
679                         return;
680                 }
681
682                 set_config("system", "worker_started", time());
683
684                 // Do we have enough running workers? Then we quit here.
685                 if (poller_too_much_workers()) {
686                         // Cleaning dead processes
687                         poller_kill_stale_workers();
688                         get_app()->remove_inactive_processes();
689
690                         return;
691                 }
692
693                 poller_run_cron();
694
695                 logger('Call poller', LOGGER_DEBUG);
696
697                 $args = array("include/poller.php", "no_cron");
698                 $a = get_app();
699                 $a->proc_run($args);
700                 return;
701         }
702
703         // We cannot execute background processes.
704         // We now run the processes from the frontend.
705         // This won't work with long running processes.
706         poller_run_cron();
707
708         clear_worker_processes();
709
710         $workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'worker.php'");
711
712         if ($workers[0]["processes"] == 0) {
713                 call_worker();
714         }
715 }
716
717 /**
718  * @brief Removes long running worker processes
719  */
720 function clear_worker_processes() {
721         $timeout = Config::get("system", "frontend_worker_timeout", 10);
722
723         /// @todo We should clean up the corresponding workerqueue entries as well
724         q("DELETE FROM `process` WHERE `created` < '%s' AND `command` = 'worker.php'",
725                 dbesc(datetime_convert('UTC','UTC',"now - ".$timeout." minutes")));
726 }
727
728 /**
729  * @brief Runs the cron processes
730  */
731 function poller_run_cron() {
732         logger('Add cron entries', LOGGER_DEBUG);
733
734         // Check for spooled items
735         proc_run(PRIORITY_HIGH, "include/spool_post.php");
736
737         // Run the cron job that calls all other jobs
738         proc_run(PRIORITY_MEDIUM, "include/cron.php");
739
740         // Run the cronhooks job separately from cron for being able to use a different timing
741         proc_run(PRIORITY_MEDIUM, "include/cronhooks.php");
742
743         // Cleaning dead processes
744         poller_kill_stale_workers();
745 }
746
747 if (array_search(__file__,get_included_files())===0){
748         poller_run($_SERVER["argv"],$_SERVER["argc"]);
749
750         poller_unclaim_process();
751
752         get_app()->end_process();
753
754         Lock::remove('poller_worker');
755
756         killme();
757 }