]> git.mxchange.org Git - friendica.git/blob - include/poller.php
Some added logging
[friendica.git] / include / poller.php
1 <?php
2 if (!file_exists("boot.php") AND (sizeof($_SERVER["argv"]) != 0)) {
3         $directory = dirname($_SERVER["argv"][0]);
4
5         if (substr($directory, 0, 1) != "/")
6                 $directory = $_SERVER["PWD"]."/".$directory;
7
8         $directory = realpath($directory."/..");
9
10         chdir($directory);
11 }
12
13 use \Friendica\Core\Config;
14 use \Friendica\Core\PConfig;
15
16 require_once("boot.php");
17
18 function poller_run($argv, $argc){
19         global $a, $db;
20
21         if(is_null($a)) {
22                 $a = new App;
23         }
24
25         if(is_null($db)) {
26                 @include(".htconfig.php");
27                 require_once("include/dba.php");
28                 $db = new dba($db_host, $db_user, $db_pass, $db_data);
29                 unset($db_host, $db_user, $db_pass, $db_data);
30         };
31
32         // Quit when in maintenance
33         if (Config::get('system', 'maintenance', true)) {
34                 return;
35         }
36
37         $a->start_process();
38
39         if (poller_max_connections_reached()) {
40                 return;
41         }
42
43         if ($a->maxload_reached()) {
44                 return;
45         }
46
47         if(($argc <= 1) OR ($argv[1] != "no_cron")) {
48                 poller_run_cron();
49         }
50
51         if ($a->max_processes_reached()) {
52                 return;
53         }
54
55         // Checking the number of workers
56         if (poller_too_much_workers()) {
57                 poller_kill_stale_workers();
58                 return;
59         }
60
61         $starttime = time();
62
63         while ($r = poller_worker_process()) {
64
65                 // Count active workers and compare them with a maximum value that depends on the load
66                 if (poller_too_much_workers()) {
67                         return;
68                 }
69
70                 if (!poller_execute($r[0])) {
71                         return;
72                 }
73
74                 // Quit the poller once every hour
75                 if (time() > ($starttime + 3600))
76                         return;
77         }
78
79 }
80
81 /**
82  * @brief Execute a worker entry
83  *
84  * @param array $queue Workerqueue entry
85  *
86  * @return boolean "true" if further processing should be stopped
87  */
88 function poller_execute($queue) {
89
90         $a = get_app();
91
92         $mypid = getmypid();
93
94         // Quit when in maintenance
95         if (Config::get('system', 'maintenance', true)) {
96                 return false;
97         }
98
99         // Constantly check the number of parallel database processes
100         if ($a->max_processes_reached()) {
101                 return false;
102         }
103
104         // Constantly check the number of available database connections to let the frontend be accessible at any time
105         if (poller_max_connections_reached()) {
106                 return false;
107         }
108
109         $upd = q("UPDATE `workerqueue` SET `executed` = '%s', `pid` = %d WHERE `id` = %d AND `pid` = 0",
110                 dbesc(datetime_convert()),
111                 intval($mypid),
112                 intval($queue["id"]));
113
114         if (!$upd) {
115                 logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);
116                 q("COMMIT");
117                 return true;
118         }
119
120         // Assure that there are no tasks executed twice
121         $id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
122         if (!$id) {
123                 logger("Queue item ".$queue["id"]." vanished - skip this execution", LOGGER_DEBUG);
124                 q("COMMIT");
125                 return true;
126         } elseif ((strtotime($id[0]["executed"]) <= 0) OR ($id[0]["pid"] == 0)) {
127                 logger("Entry for queue item ".$queue["id"]." wasn't stored - skip this execution", LOGGER_DEBUG);
128                 q("COMMIT");
129                 return true;
130         } elseif ($id[0]["pid"] != $mypid) {
131                 logger("Queue item ".$queue["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG);
132                 q("COMMIT");
133                 return true;
134         }
135         q("COMMIT");
136
137         $argv = json_decode($queue["parameter"]);
138
139         // Check for existance and validity of the include file
140         $include = $argv[0];
141
142         if (!validate_include($include)) {
143                 logger("Include file ".$argv[0]." is not valid!");
144                 q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
145                 return true;
146         }
147
148         require_once($include);
149
150         $funcname = str_replace(".php", "", basename($argv[0]))."_run";
151
152         if (function_exists($funcname)) {
153
154                 poller_exec_function($queue, $funcname, $argv);
155
156                 q("DELETE FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
157         } else {
158                 logger("Function ".$funcname." does not exist");
159         }
160
161         return true;
162 }
163
164 /**
165  * @brief Execute a function from the queue
166  *
167  * @param array $queue Workerqueue entry
168  * @param string $funcname name of the function
169  */
170 function poller_exec_function($queue, $funcname, $argv) {
171
172         $a = get_app();
173
174         $mypid = getmypid();
175
176         $argc = count($argv);
177
178         logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." ".$queue["parameter"]);
179
180         $stamp = (float)microtime(true);
181
182         // We use the callstack here to analyze the performance of executed worker entries.
183         // For this reason the variables have to be initialized.
184         if (Config::get("system", "profiler")) {
185                 $a->performance["start"] = microtime(true);
186                 $a->performance["database"] = 0;
187                 $a->performance["database_write"] = 0;
188                 $a->performance["network"] = 0;
189                 $a->performance["file"] = 0;
190                 $a->performance["rendering"] = 0;
191                 $a->performance["parser"] = 0;
192                 $a->performance["marktime"] = 0;
193                 $a->performance["markstart"] = microtime(true);
194                 $a->callstack = array();
195         }
196
197         // For better logging create a new process id for every worker call
198         // But preserve the old one for the worker
199         $old_process_id = $a->process_id;
200         $a->process_id = uniqid("wrk", true);
201
202         $funcname($argv, $argc);
203
204         $a->process_id = $old_process_id;
205
206         $duration = number_format(microtime(true) - $stamp, 3);
207
208         logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." - done in ".$duration." seconds.");
209
210         // Write down the performance values into the log
211         if (Config::get("system", "profiler")) {
212                 $duration = microtime(true)-$a->performance["start"];
213
214                 if (Config::get("rendertime", "callstack")) {
215                         if (isset($a->callstack["database"])) {
216                                 $o = "\nDatabase Read:\n";
217                                 foreach ($a->callstack["database"] AS $func => $time) {
218                                         $time = round($time, 3);
219                                         if ($time > 0)
220                                                 $o .= $func.": ".$time."\n";
221                                 }
222                         }
223                         if (isset($a->callstack["database_write"])) {
224                                 $o .= "\nDatabase Write:\n";
225                                 foreach ($a->callstack["database_write"] AS $func => $time) {
226                                         $time = round($time, 3);
227                                         if ($time > 0)
228                                                 $o .= $func.": ".$time."\n";
229                                 }
230                         }
231                         if (isset($a->callstack["network"])) {
232                                 $o .= "\nNetwork:\n";
233                                 foreach ($a->callstack["network"] AS $func => $time) {
234                                         $time = round($time, 3);
235                                         if ($time > 0)
236                                                 $o .= $func.": ".$time."\n";
237                                 }
238                         }
239                 } else {
240                         $o = '';
241                 }
242
243                 logger("ID ".$queue["id"].": ".$funcname.": ".sprintf("DB: %s/%s, Net: %s, I/O: %s, Other: %s, Total: %s".$o,
244                         number_format($a->performance["database"] - $a->performance["database_write"], 2),
245                         number_format($a->performance["database_write"], 2),
246                         number_format($a->performance["network"], 2),
247                         number_format($a->performance["file"], 2),
248                         number_format($duration - ($a->performance["database"] + $a->performance["network"] + $a->performance["file"]), 2),
249                         number_format($duration, 2)),
250                         LOGGER_DEBUG);
251         }
252
253         $cooldown = Config::get("system", "worker_cooldown", 0);
254
255         if ($cooldown > 0) {
256                 logger("Process ".$mypid." - Prio ".$queue["priority"]." - ID ".$queue["id"].": ".$funcname." - in cooldown for ".$cooldown." seconds");
257                 sleep($cooldown);
258         }
259 }
260
261 /**
262  * @brief Checks if the number of database connections has reached a critical limit.
263  *
264  * @return bool Are more than 3/4 of the maximum connections used?
265  */
266 function poller_max_connections_reached() {
267
268         // Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself.
269         $max = Config::get("system", "max_connections");
270
271         // Fetch the percentage level where the poller will get active
272         $maxlevel = Config::get("system", "max_connections_level", 75);
273
274         if ($max == 0) {
275                 // the maximum number of possible user connections can be a system variable
276                 $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'");
277                 if ($r)
278                         $max = $r[0]["Value"];
279
280                 // Or it can be granted. This overrides the system variable
281                 $r = q("SHOW GRANTS");
282                 if ($r)
283                         foreach ($r AS $grants) {
284                                 $grant = array_pop($grants);
285                                 if (stristr($grant, "GRANT USAGE ON"))
286                                         if (preg_match("/WITH MAX_USER_CONNECTIONS (\d*)/", $grant, $match))
287                                                 $max = $match[1];
288                         }
289         }
290
291         // If $max is set we will use the processlist to determine the current number of connections
292         // The processlist only shows entries of the current user
293         if ($max != 0) {
294                 $r = q("SHOW PROCESSLIST");
295                 if (!dbm::is_result($r))
296                         return false;
297
298                 $used = count($r);
299
300                 logger("Connection usage (user values): ".$used."/".$max, LOGGER_DEBUG);
301
302                 $level = ($used / $max) * 100;
303
304                 if ($level >= $maxlevel) {
305                         logger("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max);
306                         return true;
307                 }
308         }
309
310         // We will now check for the system values.
311         // This limit could be reached although the user limits are fine.
312         $r = q("SHOW VARIABLES WHERE `variable_name` = 'max_connections'");
313         if (!$r)
314                 return false;
315
316         $max = intval($r[0]["Value"]);
317         if ($max == 0)
318                 return false;
319
320         $r = q("SHOW STATUS WHERE `variable_name` = 'Threads_connected'");
321         if (!$r)
322                 return false;
323
324         $used = intval($r[0]["Value"]);
325         if ($used == 0)
326                 return false;
327
328         logger("Connection usage (system values): ".$used."/".$max, LOGGER_DEBUG);
329
330         $level = $used / $max * 100;
331
332         if ($level < $maxlevel)
333                 return false;
334
335         logger("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
336         return true;
337 }
338
339 /**
340  * @brief fix the queue entry if the worker process died
341  *
342  */
343 function poller_kill_stale_workers() {
344         $r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` != '0000-00-00 00:00:00'");
345
346         if (!dbm::is_result($r)) {
347                 // No processing here needed
348                 return;
349         }
350
351         foreach($r AS $pid)
352                 if (!posix_kill($pid["pid"], 0))
353                         q("UPDATE `workerqueue` SET `executed` = '0000-00-00 00:00:00', `pid` = 0 WHERE `pid` = %d",
354                                 intval($pid["pid"]));
355                 else {
356                         // Kill long running processes
357
358                         // Check if the priority is in a valid range
359                         if (!in_array($pid["priority"], array(PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE)))
360                                 $pid["priority"] = PRIORITY_MEDIUM;
361
362                         // Define the maximum durations
363                         $max_duration_defaults = array(PRIORITY_CRITICAL => 360, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 360);
364                         $max_duration = $max_duration_defaults[$pid["priority"]];
365
366                         $argv = json_decode($pid["parameter"]);
367                         $argv[0] = basename($argv[0]);
368
369                         // How long is the process already running?
370                         $duration = (time() - strtotime($pid["executed"])) / 60;
371                         if ($duration > $max_duration) {
372                                 logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") took more than ".$max_duration." minutes. It will be killed now.");
373                                 posix_kill($pid["pid"], SIGTERM);
374
375                                 // We killed the stale process.
376                                 // To avoid a blocking situation we reschedule the process at the beginning of the queue.
377                                 // Additionally we are lowering the priority.
378                                 q("UPDATE `workerqueue` SET `executed` = '0000-00-00 00:00:00', `created` = '%s',
379                                                         `priority` = %d, `pid` = 0 WHERE `pid` = %d",
380                                         dbesc(datetime_convert()),
381                                         intval(PRIORITY_NEGLIGIBLE),
382                                         intval($pid["pid"]));
383                         } else
384                                 logger("Worker process ".$pid["pid"]." (".implode(" ", $argv).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", LOGGER_DEBUG);
385                 }
386 }
387
388 /**
389  * @brief Checks if the number of active workers exceeds the given limits
390  *
391  * @return bool Are there too much workers running?
392  */
393 function poller_too_much_workers() {
394         $queues = Config::get("system", "worker_queues", 4);
395
396         $maxqueues = $queues;
397
398         $active = poller_active_workers();
399
400         // Decrease the number of workers at higher load
401         $load = current_load();
402         if($load) {
403                 $maxsysload = intval(Config::get("system", "maxloadavg", 50));
404
405                 $maxworkers = $queues;
406
407                 // Some magical mathemathics to reduce the workers
408                 $exponent = 3;
409                 $slope = $maxworkers / pow($maxsysload, $exponent);
410                 $queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent));
411
412                 $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00'");
413                 $entries = $s[0]["total"];
414
415                 if (Config::get("system", "worker_fastlane", false) AND ($queues > 0) AND ($entries > 0) AND ($active >= $queues)) {
416                         $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00' ORDER BY `priority` LIMIT 1");
417                         $top_priority = $s[0]["priority"];
418
419                         $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` != '0000-00-00 00:00:00' LIMIT 1",
420                                 intval($top_priority));
421                         $high_running = dbm::is_result($s);
422
423                         if (!$high_running AND ($top_priority > PRIORITY_UNDEFINED) AND ($top_priority < PRIORITY_NEGLIGIBLE)) {
424                                 logger("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", LOGGER_DEBUG);
425                                 $queues = $active + 1;
426                         }
427                 }
428
429                 // Create a list of queue entries grouped by their priority
430                 $running = array(PRIORITY_CRITICAL => 0,
431                                 PRIORITY_HIGH => 0,
432                                 PRIORITY_MEDIUM => 0,
433                                 PRIORITY_LOW => 0,
434                                 PRIORITY_NEGLIGIBLE => 0);
435
436                 $r = q("SELECT COUNT(*) AS `running`, `priority` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` GROUP BY `priority`");
437                 if (dbm::is_result($r))
438                         foreach ($r AS $process)
439                                 $running[$process["priority"]] = $process["running"];
440
441                 $processlist = "";
442                 $r = q("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`");
443                 if (dbm::is_result($r))
444                         foreach ($r as $entry) {
445                                 if ($processlist != "")
446                                         $processlist .= ", ";
447                                 $processlist .= $entry["priority"].":".$running[$entry["priority"]]."/".$entry["entries"];
448                         }
449
450                 logger("Load: ".$load."/".$maxsysload." - processes: ".$active."/".$entries." (".$processlist.") - maximum: ".$queues."/".$maxqueues, LOGGER_DEBUG);
451
452                 // Are there fewer workers running as possible? Then fork a new one.
453                 if (!Config::get("system", "worker_dont_fork") AND ($queues > ($active + 1)) AND ($entries > 1)) {
454                         logger("Active workers: ".$active."/".$queues." Fork a new worker.", LOGGER_DEBUG);
455                         $args = array("php", "include/poller.php", "no_cron");
456                         $a = get_app();
457                         $a->proc_run($args);
458                 }
459         }
460
461         return($active >= $queues);
462 }
463
464 /**
465  * @brief Returns the number of active poller processes
466  *
467  * @return integer Number of active poller processes
468  */
469 function poller_active_workers() {
470         $workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'poller.php'");
471
472         return($workers[0]["processes"]);
473 }
474
475 /**
476  * @brief Check if we should pass some slow processes
477  *
478  * When the active processes of the highest priority are using more than 2/3
479  * of all processes, we let pass slower processes.
480  *
481  * @param string $highest_priority Returns the currently highest priority
482  * @return bool We let pass a slower process than $highest_priority
483  */
484 function poller_passing_slow(&$highest_priority) {
485
486         $highest_priority = 0;
487
488         $r = q("SELECT `priority`
489                 FROM `process`
490                 INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid`");
491
492         // No active processes at all? Fine
493         if (!dbm::is_result($r))
494                 return(false);
495
496         $priorities = array();
497         foreach ($r AS $line)
498                 $priorities[] = $line["priority"];
499
500         // Should not happen
501         if (count($priorities) == 0)
502                 return(false);
503
504         $highest_priority = min($priorities);
505
506         // The highest process is already the slowest one?
507         // Then we quit
508         if ($highest_priority == PRIORITY_NEGLIGIBLE)
509                 return(false);
510
511         $high = 0;
512         foreach ($priorities AS $priority)
513                 if ($priority == $highest_priority)
514                         ++$high;
515
516         logger("Highest priority: ".$highest_priority." Total processes: ".count($priorities)." Count high priority processes: ".$high, LOGGER_DEBUG);
517         $passing_slow = (($high/count($priorities)) > (2/3));
518
519         if ($passing_slow)
520                 logger("Passing slower processes than priority ".$highest_priority, LOGGER_DEBUG);
521
522         return($passing_slow);
523 }
524
525 /**
526  * @brief Returns the next worker process
527  *
528  * @return string SQL statement
529  */
530 function poller_worker_process() {
531
532         q("START TRANSACTION;");
533
534         // Check if we should pass some low priority process
535         $highest_priority = 0;
536
537         if (poller_passing_slow($highest_priority)) {
538                 // Are there waiting processes with a higher priority than the currently highest?
539                 $r = q("SELECT * FROM `workerqueue`
540                                 WHERE `executed` = '0000-00-00 00:00:00' AND `priority` < %d
541                                 ORDER BY `priority`, `created` LIMIT 1", dbesc($highest_priority));
542                 if (dbm::is_result($r))
543                         return $r;
544
545                 // Give slower processes some processing time
546                 $r = q("SELECT * FROM `workerqueue`
547                                 WHERE `executed` = '0000-00-00 00:00:00' AND `priority` > %d
548                                 ORDER BY `priority`, `created` LIMIT 1", dbesc($highest_priority));
549         }
550
551         // If there is no result (or we shouldn't pass lower processes) we check without priority limit
552         if (($highest_priority == 0) OR !dbm::is_result($r))
553                 $r = q("SELECT * FROM `workerqueue` WHERE `executed` = '0000-00-00 00:00:00' ORDER BY `priority`, `created` LIMIT 1");
554
555         return $r;
556 }
557
558 /**
559  * @brief Call the front end worker
560  */
561 function call_worker() {
562         if (!Config::get("system", "frontend_worker") OR !Config::get("system", "worker")) {
563                 return;
564         }
565
566         $url = App::get_baseurl()."/worker";
567         fetch_url($url, false, $redirects, 1);
568 }
569
570 /**
571  * @brief Call the front end worker if there aren't any active
572  */
573 function call_worker_if_idle() {
574         if (!Config::get("system", "frontend_worker") OR !Config::get("system", "worker")) {
575                 return;
576         }
577
578         // Do we have "proc_open"? Then we can fork the poller
579         if (function_exists("proc_open")) {
580                 // When was the last time that we called the worker?
581                 // Less than one minute? Then we quit
582                 if ((time() - Config::get("system", "worker_started")) < 60) {
583                         return;
584                 }
585
586                 set_config("system", "worker_started", time());
587
588                 // Do we have enough running workers? Then we quit here.
589                 if (poller_too_much_workers()) {
590                         // Cleaning dead processes
591                         poller_kill_stale_workers();
592                         get_app()->remove_inactive_processes();
593
594                         return;
595                 }
596
597                 poller_run_cron();
598
599                 logger('Call poller', LOGGER_DEBUG);
600
601                 $args = array("php", "include/poller.php", "no_cron");
602                 $a = get_app();
603                 $a->proc_run($args);
604                 return;
605         }
606
607         // We cannot execute background processes.
608         // We now run the processes from the frontend.
609         // This won't work with long running processes.
610         poller_run_cron();
611
612         clear_worker_processes();
613
614         $workers = q("SELECT COUNT(*) AS `processes` FROM `process` WHERE `command` = 'worker.php'");
615
616         if ($workers[0]["processes"] == 0) {
617                 call_worker();
618         }
619 }
620
621 /**
622  * @brief Removes long running worker processes
623  */
624 function clear_worker_processes() {
625         $timeout = Config::get("system", "frontend_worker_timeout", 10);
626
627         /// @todo We should clean up the corresponding workerqueue entries as well
628         q("DELETE FROM `process` WHERE `created` < '%s' AND `command` = 'worker.php'",
629                 dbesc(datetime_convert('UTC','UTC',"now - ".$timeout." minutes")));
630 }
631
632 /**
633  * @brief Runs the cron processes
634  */
635 function poller_run_cron() {
636         logger('Add cron entries', LOGGER_DEBUG);
637
638         // Check for spooled items
639         proc_run(PRIORITY_HIGH, "include/spool_post.php");
640
641         // Run the cron job that calls all other jobs
642         proc_run(PRIORITY_MEDIUM, "include/cron.php");
643
644         // Run the cronhooks job separately from cron for being able to use a different timing
645         proc_run(PRIORITY_MEDIUM, "include/cronhooks.php");
646
647         // Cleaning dead processes
648         poller_kill_stale_workers();
649 }
650
651 if (array_search(__file__,get_included_files())===0){
652         poller_run($_SERVER["argv"],$_SERVER["argc"]);
653
654         get_app()->end_process();
655
656         killme();
657 }
658 ?>