]> git.mxchange.org Git - friendica.git/blob - src/Core/Worker.php
Merge remote-tracking branch 'upstream/2019.03-RC' into worker2
[friendica.git] / src / Core / Worker.php
1 <?php
2 /**
3  * @file src/Core/Worker.php
4  */
5 namespace Friendica\Core;
6
7 use Friendica\BaseObject;
8 use Friendica\Database\DBA;
9 use Friendica\Model\Process;
10 use Friendica\Util\DateTimeFormat;
11 use Friendica\Util\Logger\WorkerLogger;
12 use Friendica\Util\Network;
13
14 /**
15  * @file src/Core/Worker.php
16  *
17  * @brief Contains the class for the worker background job processing
18  */
19
20 /**
21  * @brief Worker methods
22  */
23 class Worker
24 {
25         private static $up_start;
26         private static $db_duration = 0;
27         private static $db_duration_count = 0;
28         private static $db_duration_write = 0;
29         private static $db_duration_stat = 0;
30         private static $lock_duration = 0;
31         private static $last_update;
32         private static $mode = 0;
33
34         /**
35          * @brief Processes the tasks that are in the workerqueue table
36          *
37          * @param boolean $run_cron Should the cron processes be executed?
38          * @return void
39          * @throws \Friendica\Network\HTTPException\InternalServerErrorException
40          */
41         public static function processQueue($run_cron = true)
42         {
43                 $a = \get_app();
44
45                 // Ensure that all "strtotime" operations do run timezone independent
46                 date_default_timezone_set('UTC');
47
48                 self::$up_start = microtime(true);
49
50                 // At first check the maximum load. We shouldn't continue with a high load
51                 if ($a->isMaxLoadReached()) {
52                         Logger::log('Pre check: maximum load reached, quitting.', Logger::DEBUG);
53                         return;
54                 }
55
56                 // We now start the process. This is done after the load check since this could increase the load.
57                 self::startProcess();
58
59                 // Kill stale processes every 5 minutes
60                 $last_cleanup = Config::get('system', 'worker_last_cleaned', 0);
61                 if (time() > ($last_cleanup + 300)) {
62                         Config::set('system', 'worker_last_cleaned', time());
63                         self::killStaleWorkers();
64                 }
65
66                 // Count active workers and compare them with a maximum value that depends on the load
67                 if (self::tooMuchWorkers()) {
68                         Logger::log('Pre check: Active worker limit reached, quitting.', Logger::DEBUG);
69                         return;
70                 }
71
72                 // Do we have too few memory?
73                 if ($a->isMinMemoryReached()) {
74                         Logger::log('Pre check: Memory limit reached, quitting.', Logger::DEBUG);
75                         return;
76                 }
77
78                 // Possibly there are too much database connections
79                 if (self::maxConnectionsReached()) {
80                         Logger::log('Pre check: maximum connections reached, quitting.', Logger::DEBUG);
81                         return;
82                 }
83
84                 // Possibly there are too much database processes that block the system
85                 if ($a->isMaxProcessesReached()) {
86                         Logger::log('Pre check: maximum processes reached, quitting.', Logger::DEBUG);
87                         return;
88                 }
89
90                 // Now we start additional cron processes if we should do so
91                 if ($run_cron) {
92                         self::runCron();
93                 }
94
95                 $starttime = time();
96                 self::$mode = 1;
97
98                 // We fetch the next queue entry that is about to be executed
99                 while ($r = self::workerProcess()) {
100                         foreach ($r as $entry) {
101                                 // Assure that the priority is an integer value
102                                 $entry['priority'] = (int)$entry['priority'];
103
104                                 // The work will be done
105                                 if (!self::execute($entry)) {
106                                         Logger::log('Process execution failed, quitting.', Logger::DEBUG);
107                                         return;
108                                 }
109
110                                 // If possible we will fetch new jobs for this worker
111                                 if (!self::getWaitingJobForPID() && Lock::acquire('worker_process', 0)) {
112                                         self::findWorkerProcesses();
113                                         Lock::release('worker_process');
114                                         self::$mode = 3;
115                                 }
116                         }
117
118                         self::$mode = 4;
119
120                         // Quit the worker once every cron interval
121                         if (time() > ($starttime + Config::get('system', 'cron_interval'))) {
122                                 Logger::log('Process lifetime reached, quitting.', Logger::DEBUG);
123                                 return;
124                         }
125
126                         // To avoid the quitting of multiple workers only one worker at a time will execute the check
127                         if (Lock::acquire('worker', 0)) {
128                                 // Count active workers and compare them with a maximum value that depends on the load
129                                 if (self::tooMuchWorkers()) {
130                                         Logger::log('Active worker limit reached, quitting.', Logger::DEBUG);
131                                         Lock::release('worker');
132                                         return;
133                                 }
134
135                                 // Check free memory
136                                 if ($a->isMinMemoryReached()) {
137                                         Logger::log('Memory limit reached, quitting.', Logger::DEBUG);
138                                         Lock::release('worker');
139                                         return;
140                                 }
141                                 Lock::release('worker');
142                         }
143                 }
144
145                 // Cleaning up. Possibly not needed, but it doesn't harm anything.
146                 if (Config::get('system', 'worker_daemon_mode', false)) {
147                         self::IPCSetJobState(false);
148                 }
149                 Logger::log("Couldn't select a workerqueue entry, quitting process " . getmypid() . ".", Logger::DEBUG);
150         }
151
152         /**
153          * @brief Check if non executed tasks do exist in the worker queue
154          *
155          * @return boolean Returns "true" if tasks are existing
156          * @throws \Exception
157          */
158         private static function entriesExists()
159         {
160                 $stamp = (float)microtime(true);
161                 $exists = DBA::exists('workerqueue', ["NOT `done` AND `pid` = 0 AND `next_try` < ?", DateTimeFormat::utcNow()]);
162                 self::$db_duration += (microtime(true) - $stamp);
163                 return $exists;
164         }
165
166         /**
167          * @brief Returns the number of deferred entries in the worker queue
168          *
169          * @return integer Number of deferred entries in the worker queue
170          * @throws \Exception
171          */
172         private static function deferredEntries()
173         {
174                 $stamp = (float)microtime(true);
175                 $count = DBA::count('workerqueue', ["NOT `done` AND `pid` = 0 AND `next_try` > ?", DateTimeFormat::utcNow()]);
176                 self::$db_duration += (microtime(true) - $stamp);
177                 self::$db_duration_count += (microtime(true) - $stamp);
178                 return $count;
179         }
180
181         /**
182          * @brief Returns the number of non executed entries in the worker queue
183          *
184          * @return integer Number of non executed entries in the worker queue
185          * @throws \Exception
186          */
187         private static function totalEntries()
188         {
189                 $stamp = (float)microtime(true);
190                 $count = DBA::count('workerqueue', ['done' => false, 'pid' => 0]);
191                 self::$db_duration += (microtime(true) - $stamp);
192                 self::$db_duration_count += (microtime(true) - $stamp);
193                 return $count;
194         }
195
196         /**
197          * @brief Returns the highest priority in the worker queue that isn't executed
198          *
199          * @return integer Number of active worker processes
200          * @throws \Exception
201          */
202         private static function highestPriority()
203         {
204                 $stamp = (float)microtime(true);
205                 $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
206                 $workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]);
207                 self::$db_duration += (microtime(true) - $stamp);
208                 if (DBA::isResult($workerqueue)) {
209                         return $workerqueue["priority"];
210                 } else {
211                         return 0;
212                 }
213         }
214
215         /**
216          * @brief Returns if a process with the given priority is running
217          *
218          * @param integer $priority The priority that should be checked
219          *
220          * @return integer Is there a process running with that priority?
221          * @throws \Exception
222          */
223         private static function processWithPriorityActive($priority)
224         {
225                 $condition = ["`priority` <= ? AND `pid` != 0 AND NOT `done`", $priority];
226                 return DBA::exists('workerqueue', $condition);
227         }
228
229         /**
230          * @brief Execute a worker entry
231          *
232          * @param array $queue Workerqueue entry
233          *
234          * @return boolean "true" if further processing should be stopped
235          * @throws \Friendica\Network\HTTPException\InternalServerErrorException
236          */
237         public static function execute($queue)
238         {
239                 $a = \get_app();
240
241                 $mypid = getmypid();
242
243                 // Quit when in maintenance
244                 if (Config::get('system', 'maintenance', false, true)) {
245                         Logger::log("Maintenance mode - quit process ".$mypid, Logger::DEBUG);
246                         return false;
247                 }
248
249                 // Constantly check the number of parallel database processes
250                 if ($a->isMaxProcessesReached()) {
251                         Logger::log("Max processes reached for process ".$mypid, Logger::DEBUG);
252                         return false;
253                 }
254
255                 // Constantly check the number of available database connections to let the frontend be accessible at any time
256                 if (self::maxConnectionsReached()) {
257                         Logger::log("Max connection reached for process ".$mypid, Logger::DEBUG);
258                         return false;
259                 }
260
261                 $argv = json_decode($queue["parameter"], true);
262
263                 // Check for existance and validity of the include file
264                 $include = $argv[0];
265
266                 if (method_exists(sprintf('Friendica\Worker\%s', $include), 'execute')) {
267                         // We constantly update the "executed" date every minute to avoid being killed too soon
268                         if (!isset(self::$last_update)) {
269                                 self::$last_update = strtotime($queue["executed"]);
270                         }
271
272                         $age = (time() - self::$last_update) / 60;
273                         self::$last_update = time();
274
275                         if ($age > 1) {
276                                 $stamp = (float)microtime(true);
277                                 DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow()], ['pid' => $mypid, 'done' => false]);
278                                 self::$db_duration += (microtime(true) - $stamp);
279                                 self::$db_duration_write += (microtime(true) - $stamp);
280                         }
281
282                         array_shift($argv);
283
284                         self::execFunction($queue, $include, $argv, true);
285
286                         $stamp = (float)microtime(true);
287                         $condition = ["`id` = ? AND `next_try` < ?", $queue['id'], DateTimeFormat::utcNow()];
288                         if (DBA::update('workerqueue', ['done' => true], $condition)) {
289                                 Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow());
290                         }
291                         self::$db_duration = (microtime(true) - $stamp);
292                         self::$db_duration_write += (microtime(true) - $stamp);
293
294                         return true;
295                 }
296
297                 // The script could be provided as full path or only with the function name
298                 if ($include == basename($include)) {
299                         $include = "include/".$include.".php";
300                 }
301
302                 if (!validate_include($include)) {
303                         Logger::log("Include file ".$argv[0]." is not valid!");
304                         $stamp = (float)microtime(true);
305                         DBA::delete('workerqueue', ['id' => $queue["id"]]);
306                         self::$db_duration = (microtime(true) - $stamp);
307                         self::$db_duration_write += (microtime(true) - $stamp);
308                         return true;
309                 }
310
311                 require_once $include;
312
313                 $funcname = str_replace(".php", "", basename($argv[0]))."_run";
314
315                 if (function_exists($funcname)) {
316                         // We constantly update the "executed" date every minute to avoid being killed too soon
317                         if (!isset(self::$last_update)) {
318                                 self::$last_update = strtotime($queue["executed"]);
319                         }
320
321                         $age = (time() - self::$last_update) / 60;
322                         self::$last_update = time();
323
324                         if ($age > 1) {
325                                 $stamp = (float)microtime(true);
326                                 DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow()], ['pid' => $mypid, 'done' => false]);
327                                 self::$db_duration += (microtime(true) - $stamp);
328                                 self::$db_duration_write += (microtime(true) - $stamp);
329                         }
330
331                         self::execFunction($queue, $funcname, $argv, false);
332
333                         $stamp = (float)microtime(true);
334                         if (DBA::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) {
335                                 Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow());
336                         }
337                         self::$db_duration = (microtime(true) - $stamp);
338                         self::$db_duration_write += (microtime(true) - $stamp);
339                 } else {
340                         Logger::log("Function ".$funcname." does not exist");
341                         $stamp = (float)microtime(true);
342                         DBA::delete('workerqueue', ['id' => $queue["id"]]);
343                         self::$db_duration = (microtime(true) - $stamp);
344                         self::$db_duration_write += (microtime(true) - $stamp);
345                 }
346
347                 return true;
348         }
349
350         /**
351          * @brief Execute a function from the queue
352          *
353          * @param array   $queue       Workerqueue entry
354          * @param string  $funcname    name of the function
355          * @param array   $argv        Array of values to be passed to the function
356          * @param boolean $method_call boolean
357          * @return void
358          * @throws \Friendica\Network\HTTPException\InternalServerErrorException
359          */
360         private static function execFunction($queue, $funcname, $argv, $method_call)
361         {
362                 $a = \get_app();
363
364                 $argc = count($argv);
365
366                 $logger = $a->getLogger();
367                 $workerLogger = new WorkerLogger($logger, $funcname);
368
369                 $workerLogger ->info("Process start.", ['priority' => $queue["priority"], 'id' => $queue["id"]]);
370
371                 $stamp = (float)microtime(true);
372
373                 // We use the callstack here to analyze the performance of executed worker entries.
374                 // For this reason the variables have to be initialized.
375                 $a->getProfiler()->reset();
376
377                 $a->queue = $queue;
378
379                 $up_duration = microtime(true) - self::$up_start;
380
381                 // Reset global data to avoid interferences
382                 unset($_SESSION);
383
384                 // Set the workerLogger as new default logger
385                 Logger::init($workerLogger);
386                 if ($method_call) {
387                         call_user_func_array(sprintf('Friendica\Worker\%s::execute', $funcname), $argv);
388                 } else {
389                         $funcname($argv, $argc);
390                 }
391                 Logger::init($logger);
392
393                 unset($a->queue);
394
395                 $duration = (microtime(true) - $stamp);
396
397                 /* With these values we can analyze how effective the worker is.
398                  * The database and rest time should be low since this is the unproductive time.
399                  * The execution time is the productive time.
400                  * By changing parameters like the maximum number of workers we can check the effectivness.
401                 */
402                 $dbtotal = round(self::$db_duration, 2);
403                 $dbread  = round(self::$db_duration - (self::$db_duration_count + self::$db_duration_write + self::$db_duration_stat), 2);
404                 $dbcount = round(self::$db_duration_count, 2);
405                 $dbstat  = round(self::$db_duration_stat, 2);
406                 $dbwrite = round(self::$db_duration_write, 2);
407                 $dblock  = round(self::$lock_duration, 2);
408                 $rest    = round(max(0, $up_duration - (self::$db_duration + self::$lock_duration)), 2);
409                 $exec    = round($duration, 2);
410
411                 $logger->info('Performance log.', ['mode' => self::$mode, 'count' => $dbcount, 'stat' => $dbstat, 'write' => $dbwrite, 'lock' => $dblock, 'total' => $dbtotal, 'rest' => $rest, 'exec' => $exec]);
412
413                 self::$up_start = microtime(true);
414                 self::$db_duration = 0;
415                 self::$db_duration_count = 0;
416                 self::$db_duration_stat = 0;
417                 self::$db_duration_write = 0;
418                 self::$lock_duration = 0;
419                 self::$mode = 2;
420
421                 if ($duration > 3600) {
422                         $logger->info('Longer than 1 hour.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' =>   round($duration/60, 3)]);
423                 } elseif ($duration > 600) {
424                         $logger->info('Longer than 10 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' =>   round($duration/60, 3)]);
425                 } elseif ($duration > 300) {
426                         $logger->info('Longer than 5 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' =>   round($duration/60, 3)]);
427                 } elseif ($duration > 120) {
428                         $logger->info('Longer than 2 minutes.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' =>   round($duration/60, 3)]);
429                 }
430
431                 $workerLogger->info('Process done. ', ['priority' => $queue["priority"], 'id' => $queue["id"], 'duration' =>   number_format($duration, 4)]);
432
433                 $a->getProfiler()->saveLog($a->getLogger(), "ID " . $queue["id"] . ": " . $funcname);
434
435                 $cooldown = Config::get("system", "worker_cooldown", 0);
436
437                 if ($cooldown > 0) {
438                         $logger->info('Cooldown.', ['priority' => $queue["priority"], 'id' => $queue["id"], 'cooldown' => $cooldown]);
439                         sleep($cooldown);
440                 }
441         }
442
443         /**
444          * @brief Checks if the number of database connections has reached a critical limit.
445          *
446          * @return bool Are more than 3/4 of the maximum connections used?
447          * @throws \Friendica\Network\HTTPException\InternalServerErrorException
448          */
449         private static function maxConnectionsReached()
450         {
451                 // Fetch the max value from the config. This is needed when the system cannot detect the correct value by itself.
452                 $max = Config::get("system", "max_connections");
453
454                 // Fetch the percentage level where the worker will get active
455                 $maxlevel = Config::get("system", "max_connections_level", 75);
456
457                 if ($max == 0) {
458                         // the maximum number of possible user connections can be a system variable
459                         $r = DBA::fetchFirst("SHOW VARIABLES WHERE `variable_name` = 'max_user_connections'");
460                         if (DBA::isResult($r)) {
461                                 $max = $r["Value"];
462                         }
463                         // Or it can be granted. This overrides the system variable
464                         $stamp = (float)microtime(true);
465                         $r = DBA::p('SHOW GRANTS');
466                         self::$db_duration += (microtime(true) - $stamp);
467                         while ($grants = DBA::fetch($r)) {
468                                 $grant = array_pop($grants);
469                                 if (stristr($grant, "GRANT USAGE ON")) {
470                                         if (preg_match("/WITH MAX_USER_CONNECTIONS (\d*)/", $grant, $match)) {
471                                                 $max = $match[1];
472                                         }
473                                 }
474                         }
475                         DBA::close($r);
476                 }
477
478                 // If $max is set we will use the processlist to determine the current number of connections
479                 // The processlist only shows entries of the current user
480                 if ($max != 0) {
481                         $stamp = (float)microtime(true);
482                         $r = DBA::p('SHOW PROCESSLIST');
483                         self::$db_duration += (microtime(true) - $stamp);
484                         $used = DBA::numRows($r);
485                         DBA::close($r);
486
487                         Logger::log("Connection usage (user values): ".$used."/".$max, Logger::DEBUG);
488
489                         $level = ($used / $max) * 100;
490
491                         if ($level >= $maxlevel) {
492                                 Logger::log("Maximum level (".$maxlevel."%) of user connections reached: ".$used."/".$max);
493                                 return true;
494                         }
495                 }
496
497                 // We will now check for the system values.
498                 // This limit could be reached although the user limits are fine.
499                 $r = DBA::fetchFirst("SHOW VARIABLES WHERE `variable_name` = 'max_connections'");
500                 if (!DBA::isResult($r)) {
501                         return false;
502                 }
503                 $max = intval($r["Value"]);
504                 if ($max == 0) {
505                         return false;
506                 }
507                 $r = DBA::fetchFirst("SHOW STATUS WHERE `variable_name` = 'Threads_connected'");
508                 if (!DBA::isResult($r)) {
509                         return false;
510                 }
511                 $used = intval($r["Value"]);
512                 if ($used == 0) {
513                         return false;
514                 }
515                 Logger::log("Connection usage (system values): ".$used."/".$max, Logger::DEBUG);
516
517                 $level = $used / $max * 100;
518
519                 if ($level < $maxlevel) {
520                         return false;
521                 }
522                 Logger::log("Maximum level (".$level."%) of system connections reached: ".$used."/".$max);
523                 return true;
524         }
525
526         /**
527          * @brief fix the queue entry if the worker process died
528          * @return void
529          * @throws \Exception
530          */
531         private static function killStaleWorkers()
532         {
533                 $stamp = (float)microtime(true);
534                 $entries = DBA::select(
535                         'workerqueue',
536                         ['id', 'pid', 'executed', 'priority', 'parameter'],
537                         ['NOT `done` AND `pid` != 0'],
538                         ['order' => ['priority', 'created']]
539                 );
540                 self::$db_duration += (microtime(true) - $stamp);
541
542                 while ($entry = DBA::fetch($entries)) {
543                         if (!posix_kill($entry["pid"], 0)) {
544                                 $stamp = (float)microtime(true);
545                                 DBA::update(
546                                         'workerqueue',
547                                         ['executed' => DBA::NULL_DATETIME, 'pid' => 0],
548                                         ['id' => $entry["id"]]
549                                 );
550                                 self::$db_duration += (microtime(true) - $stamp);
551                                 self::$db_duration_write += (microtime(true) - $stamp);
552                         } else {
553                                 // Kill long running processes
554                                 // Check if the priority is in a valid range
555                                 if (!in_array($entry["priority"], [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE])) {
556                                         $entry["priority"] = PRIORITY_MEDIUM;
557                                 }
558
559                                 // Define the maximum durations
560                                 $max_duration_defaults = [PRIORITY_CRITICAL => 720, PRIORITY_HIGH => 10, PRIORITY_MEDIUM => 60, PRIORITY_LOW => 180, PRIORITY_NEGLIGIBLE => 720];
561                                 $max_duration = $max_duration_defaults[$entry["priority"]];
562
563                                 $argv = json_decode($entry["parameter"], true);
564                                 $argv[0] = basename($argv[0]);
565
566                                 // How long is the process already running?
567                                 $duration = (time() - strtotime($entry["executed"])) / 60;
568                                 if ($duration > $max_duration) {
569                                         Logger::log("Worker process ".$entry["pid"]." (".substr(json_encode($argv), 0, 50).") took more than ".$max_duration." minutes. It will be killed now.");
570                                         posix_kill($entry["pid"], SIGTERM);
571
572                                         // We killed the stale process.
573                                         // To avoid a blocking situation we reschedule the process at the beginning of the queue.
574                                         // Additionally we are lowering the priority. (But not PRIORITY_CRITICAL)
575                                         $new_priority = $entry["priority"];
576                                         if ($entry["priority"] == PRIORITY_HIGH) {
577                                                 $new_priority = PRIORITY_MEDIUM;
578                                         } elseif ($entry["priority"] == PRIORITY_MEDIUM) {
579                                                 $new_priority = PRIORITY_LOW;
580                                         } elseif ($entry["priority"] != PRIORITY_CRITICAL) {
581                                                 $new_priority = PRIORITY_NEGLIGIBLE;
582                                         }
583                                         $stamp = (float)microtime(true);
584                                         DBA::update(
585                                                 'workerqueue',
586                                                 ['executed' => DBA::NULL_DATETIME, 'created' => DateTimeFormat::utcNow(), 'priority' => $new_priority, 'pid' => 0],
587                                                 ['id' => $entry["id"]]
588                                         );
589                                         self::$db_duration += (microtime(true) - $stamp);
590                                         self::$db_duration_write += (microtime(true) - $stamp);
591                                 } else {
592                                         Logger::log("Worker process ".$entry["pid"]." (".substr(json_encode($argv), 0, 50).") now runs for ".round($duration)." of ".$max_duration." allowed minutes. That's okay.", Logger::DEBUG);
593                                 }
594                         }
595                 }
596         }
597
598         /**
599          * @brief Checks if the number of active workers exceeds the given limits
600          *
601          * @return bool Are there too much workers running?
602          * @throws \Friendica\Network\HTTPException\InternalServerErrorException
603          */
604         private static function tooMuchWorkers()
605         {
606                 $queues = Config::get("system", "worker_queues", 4);
607
608                 $maxqueues = $queues;
609
610                 $active = self::activeWorkers();
611
612                 // Decrease the number of workers at higher load
613                 $load = System::currentLoad();
614                 if ($load) {
615                         $maxsysload = intval(Config::get("system", "maxloadavg", 50));
616
617                         /* Default exponent 3 causes queues to rapidly decrease as load increases.
618                          * If you have 20 max queues at idle, then you get only 5 queues at 37.1% of $maxsysload.
619                          * For some environments, this rapid decrease is not needed.
620                          * With exponent 1, you could have 20 max queues at idle and 13 at 37% of $maxsysload.
621                          */
622                         $exponent = intval(Config::get('system', 'worker_load_exponent', 3));
623                         $slope = pow(max(0, $maxsysload - $load) / $maxsysload, $exponent);
624                         $queues = intval(ceil($slope * $maxqueues));
625
626                         $processlist = '';
627
628                         if (Config::get('system', 'worker_jpm')) {
629                                 $intervals = explode(',', Config::get('system', 'worker_jpm_range'));
630                                 $jobs_per_minute = [];
631                                 foreach ($intervals as $interval) {
632                                         if ($interval == 0) {
633                                                 continue;
634                                         } else {
635                                                 $interval = (int)$interval;
636                                         }
637
638                                         $stamp = (float)microtime(true);
639                                         $jobs = DBA::p("SELECT COUNT(*) AS `jobs` FROM `workerqueue` WHERE `done` AND `executed` > UTC_TIMESTAMP() - INTERVAL ? MINUTE", $interval);
640                                         self::$db_duration += (microtime(true) - $stamp);
641                                         self::$db_duration_stat += (microtime(true) - $stamp);
642                                         if ($job = DBA::fetch($jobs)) {
643                                                 $jobs_per_minute[$interval] = number_format($job['jobs'] / $interval, 0);
644                                         }
645                                         DBA::close($jobs);
646                                 }
647                                 $processlist = ' - jpm: '.implode('/', $jobs_per_minute);
648                         }
649
650                         // Create a list of queue entries grouped by their priority
651                         $listitem = [0 => ''];
652
653                         $idle_workers = $active;
654
655                         $deferred = self::deferredEntries();
656
657                         if (Config::get('system', 'worker_debug')) {
658                                 $waiting_processes = 0;
659                                 // Now adding all processes with workerqueue entries
660                                 $stamp = (float)microtime(true);
661                                 $jobs = DBA::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` AND `next_try` < ? GROUP BY `priority`", DateTimeFormat::utcNow());
662                                 self::$db_duration += (microtime(true) - $stamp);
663                                 self::$db_duration_stat += (microtime(true) - $stamp);
664                                 while ($entry = DBA::fetch($jobs)) {
665                                         $stamp = (float)microtime(true);
666                                         $processes = DBA::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE NOT `done` AND `priority` = ?", $entry["priority"]);
667                                         self::$db_duration += (microtime(true) - $stamp);
668                                         self::$db_duration_stat += (microtime(true) - $stamp);
669                                         if ($process = DBA::fetch($processes)) {
670                                                 $idle_workers -= $process["running"];
671                                                 $waiting_processes += $entry["entries"];
672                                                 $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"];
673                                         }
674                                         DBA::close($processes);
675                                 }
676                                 DBA::close($jobs);
677                                 $entries = $deferred + $waiting_processes;
678                         } else {
679                                 $entries = self::totalEntries();
680                                 $waiting_processes = max(0, $entries - $deferred);
681                                 $stamp = (float)microtime(true);
682                                 $jobs = DBA::p("SELECT COUNT(*) AS `running`, `priority` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done` GROUP BY `priority` ORDER BY `priority`");
683                                 self::$db_duration += (microtime(true) - $stamp);
684                                 self::$db_duration_stat += (microtime(true) - $stamp);
685
686                                 while ($entry = DBA::fetch($jobs)) {
687                                         $idle_workers -= $entry["running"];
688                                         $listitem[$entry["priority"]] = $entry["priority"].":".$entry["running"];
689                                 }
690                                 DBA::close($jobs);
691                         }
692
693                         $listitem[0] = "0:" . max(0, $idle_workers);
694
695                         $processlist .= ' ('.implode(', ', $listitem).')';
696
697                         if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && self::entriesExists() && ($active >= $queues)) {
698                                 $top_priority = self::highestPriority();
699                                 $high_running = self::processWithPriorityActive($top_priority);
700
701                                 if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) {
702                                         Logger::log("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", Logger::DEBUG);
703                                         $queues = $active + 1;
704                                 }
705                         }
706
707                         Logger::log("Load: " . $load ."/" . $maxsysload . " - processes: " . $deferred . "/" . $active . "/" . $waiting_processes . $processlist . " - maximum: " . $queues . "/" . $maxqueues, Logger::DEBUG);
708
709                         // Are there fewer workers running as possible? Then fork a new one.
710                         if (!Config::get("system", "worker_dont_fork", false) && ($queues > ($active + 1)) && ($entries > 1)) {
711                                 Logger::log("Active workers: ".$active."/".$queues." Fork a new worker.", Logger::DEBUG);
712                                 if (Config::get('system', 'worker_daemon_mode', false)) {
713                                         self::IPCSetJobState(true);
714                                 } else {
715                                         self::spawnWorker();
716                                 }
717                         }
718                 }
719
720                 // if there are too much worker, we don't spawn a new one.
721                 if (Config::get('system', 'worker_daemon_mode', false) && ($active > $queues)) {
722                         self::IPCSetJobState(false);
723                 }
724
725                 return $active > $queues;
726         }
727
728         /**
729          * @brief Returns the number of active worker processes
730          *
731          * @return integer Number of active worker processes
732          * @throws \Exception
733          */
734         private static function activeWorkers()
735         {
736                 $stamp = (float)microtime(true);
737                 $count = DBA::count('process', ['command' => 'Worker.php']);
738                 self::$db_duration += (microtime(true) - $stamp);
739                 return $count;
740         }
741
742         /**
743          * @brief Returns waiting jobs for the current process id
744          *
745          * @return array waiting workerqueue jobs
746          * @throws \Exception
747          */
748         private static function getWaitingJobForPID()
749         {
750                 $stamp = (float)microtime(true);
751                 $r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]);
752                 self::$db_duration += (microtime(true) - $stamp);
753                 if (DBA::isResult($r)) {
754                         return DBA::toArray($r);
755                 }
756                 DBA::close($r);
757
758                 return false;
759         }
760
761         /**
762          * @brief Returns the next jobs that should be executed
763          *
764          * @return array array with next jobs
765          * @throws \Exception
766          */
767         private static function nextProcess()
768         {
769                 $priority = self::nextPriority();
770                 if (empty($priority)) {
771                         Logger::info('No tasks found');
772                         return [];
773                 }
774
775                 if ($priority <= PRIORITY_MEDIUM) {
776                         $limit = Config::get('system', 'worker_fetch_limit', 1);
777                 } else {
778                         $limit = 1;
779                 }
780
781                 $ids = [];
782                 $stamp = (float)microtime(true);
783                 $condition = ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()];
784                 $tasks = DBA::select('workerqueue', ['id'], $condition, ['limit' => $limit, 'order' => ['created']]);
785                 self::$db_duration += (microtime(true) - $stamp);
786                 while ($task = DBA::fetch($tasks)) {
787                         $ids[] = $task['id'];
788                 }
789                 DBA::close($tasks);
790
791                 Logger::info('Found:', ['id' => $ids, 'priority' => $priority]);
792                 return $ids;
793         }
794
795         /**
796          * @brief Returns the priority of the next workerqueue job
797          *
798          * @return string priority
799          * @throws \Exception
800          */
801         private static function nextPriority()
802         {
803                 $waiting = [];
804                 $priorities = [PRIORITY_CRITICAL, PRIORITY_HIGH, PRIORITY_MEDIUM, PRIORITY_LOW, PRIORITY_NEGLIGIBLE];
805                 foreach ($priorities as $priority) {
806                         $stamp = (float)microtime(true);
807                         if (DBA::exists('workerqueue', ["`priority` = ? AND `pid` = 0 AND NOT `done` AND `next_try` < ?", $priority, DateTimeFormat::utcNow()])) {
808                                 $waiting[$priority] = true;
809                         }
810                         self::$db_duration += (microtime(true) - $stamp);
811                 }
812
813                 if (!empty($waiting[PRIORITY_CRITICAL])) {
814                         return PRIORITY_CRITICAL;
815                 }
816
817                 $running = [];
818                 $running_total = 0;
819                 $stamp = (float)microtime(true);
820                 $processes = DBA::p("SELECT COUNT(DISTINCT(`process`.`pid`)) AS `running`, `priority` FROM `process`
821                         INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid`
822                         WHERE NOT `done` GROUP BY `priority`");
823                 self::$db_duration += (microtime(true) - $stamp);
824                 while ($process = DBA::fetch($processes)) {
825                         $running[$process['priority']] = $process['running'];
826                         $running_total += $process['running'];
827                 }
828                 DBA::close($processes);
829
830                 foreach ($priorities as $priority) {
831                         if (!empty($waiting[$priority]) && empty($running[$priority])) {
832                                 Logger::info('No running worker found with priority {priority} - assigning it.', ['priority' => $priority]);
833                                 return $priority;
834                         }
835                 }
836
837                 $active = max(self::activeWorkers(), $running_total);
838                 $priorities = max(count($waiting), count($running));
839                 $exponent = 2;
840
841                 $total = 0;
842                 for ($i = 1; $i <= $priorities; ++$i) {
843                         $total += pow($i, $exponent);
844                 }
845
846                 $limit = [];
847                 for ($i = 1; $i <= $priorities; ++$i) {
848                         $limit[$priorities - $i] = max(1, round($active * (pow($i, $exponent) / $total)));
849                 }
850
851                 $i = 0;
852                 foreach ($running as $priority => $workers) {
853                         if ($workers < $limit[$i++]) {
854                                 Logger::info('Priority {priority} has got {workers} workers out of a limit of {limit}', ['priority' => $priority, 'workers' => $workers, 'limit' => $limit[$i - 1]]);
855                                 return $priority;
856                         }
857                 }
858
859                 if (!empty($waiting)) {
860                         $priority = array_keys($waiting)[0];
861                         Logger::info('No underassigned priority found, now taking the highest priority.', ['priority' => $priority]);
862                         return $priority;
863                 }
864
865                 return false;
866         }
867
868         /**
869          * @brief Find and claim the next worker process for us
870          *
871          * @return boolean Have we found something?
872          * @throws \Friendica\Network\HTTPException\InternalServerErrorException
873          */
874         private static function findWorkerProcesses()
875         {
876                 $mypid = getmypid();
877
878                 $ids = self::nextProcess();
879
880                 // If there is no result we check without priority limit
881                 if (empty($ids)) {
882                         $stamp = (float)microtime(true);
883                         $condition = ["`pid` = 0 AND NOT `done` AND `next_try` < ?", DateTimeFormat::utcNow()];
884                         $result = DBA::select('workerqueue', ['id'], $condition, ['limit' => 1, 'order' => ['priority', 'created']]);
885                         self::$db_duration += (microtime(true) - $stamp);
886
887                         while ($id = DBA::fetch($result)) {
888                                 $ids[] = $id["id"];
889                         }
890                         DBA::close($result);
891                 }
892
893                 if (!empty($ids)) {
894                         $stamp = (float)microtime(true);
895                         $condition = ['id' => $ids, 'done' => false, 'pid' => 0];
896                         DBA::update('workerqueue', ['executed' => DateTimeFormat::utcNow(), 'pid' => $mypid], $condition);
897                         self::$db_duration += (microtime(true) - $stamp);
898                         self::$db_duration_write += (microtime(true) - $stamp);
899                 }
900
901                 return !empty($ids);
902         }
903
904         /**
905          * @brief Returns the next worker process
906          *
907          * @return string SQL statement
908          * @throws \Friendica\Network\HTTPException\InternalServerErrorException
909          */
910         public static function workerProcess()
911         {
912                 // There can already be jobs for us in the queue.
913                 $waiting = self::getWaitingJobForPID();
914                 if (!empty($waiting)) {
915                         return $waiting;
916                 }
917
918                 $stamp = (float)microtime(true);
919                 if (!Lock::acquire('worker_process')) {
920                         return false;
921                 }
922                 self::$lock_duration += (microtime(true) - $stamp);
923
924                 $found = self::findWorkerProcesses();
925
926                 Lock::release('worker_process');
927
928                 if ($found) {
929                         $stamp = (float)microtime(true);
930                         $r = DBA::select('workerqueue', [], ['pid' => getmypid(), 'done' => false]);
931                         self::$db_duration += (microtime(true) - $stamp);
932                         return DBA::toArray($r);
933                 }
934                 return false;
935         }
936
937         /**
938          * @brief Removes a workerqueue entry from the current process
939          * @return void
940          * @throws \Exception
941          */
942         public static function unclaimProcess()
943         {
944                 $mypid = getmypid();
945
946                 $stamp = (float)microtime(true);
947                 DBA::update('workerqueue', ['executed' => DBA::NULL_DATETIME, 'pid' => 0], ['pid' => $mypid, 'done' => false]);
948                 self::$db_duration += (microtime(true) - $stamp);
949                 self::$db_duration_write += (microtime(true) - $stamp);
950         }
951
952         /**
953          * @brief Call the front end worker
954          * @return void
955          * @throws \Friendica\Network\HTTPException\InternalServerErrorException
956          */
957         public static function callWorker()
958         {
959                 if (!Config::get("system", "frontend_worker")) {
960                         return;
961                 }
962
963                 $url = System::baseUrl()."/worker";
964                 Network::fetchUrl($url, false, $redirects, 1);
965         }
966
967         /**
968          * @brief Call the front end worker if there aren't any active
969          * @return void
970          * @throws \Friendica\Network\HTTPException\InternalServerErrorException
971          */
972         public static function executeIfIdle()
973         {
974                 if (!Config::get("system", "frontend_worker")) {
975                         return;
976                 }
977
978                 // Do we have "proc_open"? Then we can fork the worker
979                 if (function_exists("proc_open")) {
980                         // When was the last time that we called the worker?
981                         // Less than one minute? Then we quit
982                         if ((time() - Config::get("system", "worker_started")) < 60) {
983                                 return;
984                         }
985
986                         Config::set("system", "worker_started", time());
987
988                         // Do we have enough running workers? Then we quit here.
989                         if (self::tooMuchWorkers()) {
990                                 // Cleaning dead processes
991                                 self::killStaleWorkers();
992                                 Process::deleteInactive();
993
994                                 return;
995                         }
996
997                         self::runCron();
998
999                         Logger::log('Call worker', Logger::DEBUG);
1000                         self::spawnWorker();
1001                         return;
1002                 }
1003
1004                 // We cannot execute background processes.
1005                 // We now run the processes from the frontend.
1006                 // This won't work with long running processes.
1007                 self::runCron();
1008
1009                 self::clearProcesses();
1010
1011                 $workers = self::activeWorkers();
1012
1013                 if ($workers == 0) {
1014                         self::callWorker();
1015                 }
1016         }
1017
1018         /**
1019          * @brief Removes long running worker processes
1020          * @return void
1021          * @throws \Friendica\Network\HTTPException\InternalServerErrorException
1022          */
1023         public static function clearProcesses()
1024         {
1025                 $timeout = Config::get("system", "frontend_worker_timeout", 10);
1026
1027                 /// @todo We should clean up the corresponding workerqueue entries as well
1028                 $stamp = (float)microtime(true);
1029                 $condition = ["`created` < ? AND `command` = 'worker.php'",
1030                                 DateTimeFormat::utc("now - ".$timeout." minutes")];
1031                 DBA::delete('process', $condition);
1032                 self::$db_duration = (microtime(true) - $stamp);
1033                 self::$db_duration_write += (microtime(true) - $stamp);
1034         }
1035
1036         /**
1037          * @brief Runs the cron processes
1038          * @return void
1039          * @throws \Friendica\Network\HTTPException\InternalServerErrorException
1040          */
1041         private static function runCron()
1042         {
1043                 Logger::log('Add cron entries', Logger::DEBUG);
1044
1045                 // Check for spooled items
1046                 self::add(['priority' => PRIORITY_HIGH, 'force_priority' => true], 'SpoolPost');
1047
1048                 // Run the cron job that calls all other jobs
1049                 self::add(['priority' => PRIORITY_MEDIUM, 'force_priority' => true], 'Cron');
1050
1051                 // Cleaning dead processes
1052                 self::killStaleWorkers();
1053         }
1054
1055         /**
1056          * @brief Spawns a new worker
1057          * @param bool $do_cron
1058          * @return void
1059          * @throws \Friendica\Network\HTTPException\InternalServerErrorException
1060          */
1061         public static function spawnWorker($do_cron = false)
1062         {
1063                 $command = 'bin/worker.php';
1064
1065                 $args = ['no_cron' => !$do_cron];
1066
1067                 get_app()->proc_run($command, $args);
1068
1069                 // after spawning we have to remove the flag.
1070                 if (Config::get('system', 'worker_daemon_mode', false)) {
1071                         self::IPCSetJobState(false);
1072                 }
1073         }
1074
1075         /**
1076          * @brief Adds tasks to the worker queue
1077          *
1078          * @param (integer|array) priority or parameter array, strings are deprecated and are ignored
1079          *
1080          * next args are passed as $cmd command line
1081          * or: Worker::add(PRIORITY_HIGH, "Notifier", "drop", $drop_id);
1082          * or: Worker::add(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "CreateShadowEntry", $post_id);
1083          *
1084          * @return boolean "false" if proc_run couldn't be executed
1085          * @throws \Friendica\Network\HTTPException\InternalServerErrorException
1086          * @note $cmd and string args are surrounded with ""
1087          *
1088          * @hooks 'proc_run'
1089          *    array $arr
1090          *
1091          */
1092         public static function add($cmd)
1093         {
1094                 $args = func_get_args();
1095
1096                 if (!count($args)) {
1097                         return false;
1098                 }
1099
1100                 $arr = ['args' => $args, 'run_cmd' => true];
1101
1102                 Hook::callAll("proc_run", $arr);
1103                 if (!$arr['run_cmd'] || !count($args)) {
1104                         return true;
1105                 }
1106
1107                 $priority = PRIORITY_MEDIUM;
1108                 $dont_fork = Config::get("system", "worker_dont_fork", false);
1109                 $created = DateTimeFormat::utcNow();
1110                 $force_priority = false;
1111
1112                 $run_parameter = array_shift($args);
1113
1114                 if (is_int($run_parameter)) {
1115                         $priority = $run_parameter;
1116                 } elseif (is_array($run_parameter)) {
1117                         if (isset($run_parameter['priority'])) {
1118                                 $priority = $run_parameter['priority'];
1119                         }
1120                         if (isset($run_parameter['created'])) {
1121                                 $created = $run_parameter['created'];
1122                         }
1123                         if (isset($run_parameter['dont_fork'])) {
1124                                 $dont_fork = $run_parameter['dont_fork'];
1125                         }
1126                         if (isset($run_parameter['force_priority'])) {
1127                                 $force_priority = $run_parameter['force_priority'];
1128                         }
1129                 }
1130
1131                 $parameters = json_encode($args);
1132                 $found = DBA::exists('workerqueue', ['parameter' => $parameters, 'done' => false]);
1133
1134                 // Quit if there was a database error - a precaution for the update process to 3.5.3
1135                 if (DBA::errorNo() != 0) {
1136                         return false;
1137                 }
1138
1139                 if (!$found) {
1140                         DBA::insert('workerqueue', ['parameter' => $parameters, 'created' => $created, 'priority' => $priority]);
1141                 } elseif ($force_priority) {
1142                         DBA::update('workerqueue', ['priority' => $priority], ['parameter' => $parameters, 'done' => false, 'pid' => 0]);
1143                 }
1144
1145                 // Should we quit and wait for the worker to be called as a cronjob?
1146                 if ($dont_fork) {
1147                         return true;
1148                 }
1149
1150                 // If there is a lock then we don't have to check for too much worker
1151                 if (!Lock::acquire('worker', 0)) {
1152                         return true;
1153                 }
1154
1155                 // If there are already enough workers running, don't fork another one
1156                 $quit = self::tooMuchWorkers();
1157                 Lock::release('worker');
1158
1159                 if ($quit) {
1160                         return true;
1161                 }
1162
1163                 // We tell the daemon that a new job entry exists
1164                 if (Config::get('system', 'worker_daemon_mode', false)) {
1165                         // We don't have to set the IPC flag - this is done in "tooMuchWorkers"
1166                         return true;
1167                 }
1168
1169                 // Now call the worker to execute the jobs that we just added to the queue
1170                 self::spawnWorker();
1171
1172                 return true;
1173         }
1174
1175         /**
1176          * Defers the current worker entry
1177          */
1178         public static function defer()
1179         {
1180                 if (empty(BaseObject::getApp()->queue)) {
1181                         return;
1182                 }
1183
1184                 $queue = BaseObject::getApp()->queue;
1185
1186                 $retrial = $queue['retrial'];
1187                 $id = $queue['id'];
1188                 $priority = $queue['priority'];
1189
1190                 if ($retrial > 14) {
1191                         Logger::log('Id ' . $id . ' had been tried 14 times. We stop now.', Logger::DEBUG);
1192                         return;
1193                 }
1194
1195                 // Calculate the delay until the next trial
1196                 $delay = (($retrial + 3) ** 4) + (rand(1, 30) * ($retrial + 1));
1197                 $next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
1198
1199                 if (($priority < PRIORITY_MEDIUM) && ($retrial > 2)) {
1200                         $priority = PRIORITY_MEDIUM;
1201                 } elseif (($priority < PRIORITY_LOW) && ($retrial > 5)) {
1202                         $priority = PRIORITY_LOW;
1203                 } elseif (($priority < PRIORITY_NEGLIGIBLE) && ($retrial > 7)) {
1204                         $priority = PRIORITY_NEGLIGIBLE;
1205                 }
1206
1207                 Logger::log('Defer execution ' . $retrial . ' of id ' . $id . ' to ' . $next . ' - priority old/new: ' . $queue['priority'] . '/' . $priority, Logger::DEBUG);
1208
1209                 $stamp = (float)microtime(true);
1210                 $fields = ['retrial' => $retrial + 1, 'next_try' => $next, 'executed' => DBA::NULL_DATETIME, 'pid' => 0, 'priority' => $priority];
1211                 DBA::update('workerqueue', $fields, ['id' => $id]);
1212                 self::$db_duration += (microtime(true) - $stamp);
1213                 self::$db_duration_write += (microtime(true) - $stamp);
1214         }
1215
1216         /**
1217          * Log active processes into the "process" table
1218          *
1219          * @brief Log active processes into the "process" table
1220          */
1221         public static function startProcess()
1222         {
1223                 $trace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1);
1224
1225                 $command = basename($trace[0]['file']);
1226
1227                 Process::deleteInactive();
1228
1229                 Process::insert($command);
1230         }
1231
1232         /**
1233          * Remove the active process from the "process" table
1234          *
1235          * @brief Remove the active process from the "process" table
1236          * @return bool
1237          * @throws \Exception
1238          */
1239         public static function endProcess()
1240         {
1241                 return Process::deleteByPid();
1242         }
1243
1244         /**
1245          * Set the flag if some job is waiting
1246          *
1247          * @brief Set the flag if some job is waiting
1248          * @param boolean $jobs Is there a waiting job?
1249          * @throws \Exception
1250          */
1251         public static function IPCSetJobState($jobs)
1252         {
1253                 $stamp = (float)microtime(true);
1254                 DBA::update('worker-ipc', ['jobs' => $jobs], ['key' => 1], true);
1255                 self::$db_duration += (microtime(true) - $stamp);
1256                 self::$db_duration_write += (microtime(true) - $stamp);
1257         }
1258
1259         /**
1260          * Checks if some worker job waits to be executed
1261          *
1262          * @brief Checks if some worker job waits to be executed
1263          * @return bool
1264          * @throws \Exception
1265          */
1266         public static function IPCJobsExists()
1267         {
1268                 $stamp = (float)microtime(true);
1269                 $row = DBA::selectFirst('worker-ipc', ['jobs'], ['key' => 1]);
1270                 self::$db_duration += (microtime(true) - $stamp);
1271
1272                 // When we don't have a row, no job is running
1273                 if (!DBA::isResult($row)) {
1274                         return false;
1275                 }
1276
1277                 return (bool)$row['jobs'];
1278         }
1279 }