"created": {"type": "datetime", "not null": "1", "default": "0001-01-01 00:00:00", "comment": "Creation date"},
"pid": {"type": "int unsigned", "not null": "1", "default": "0", "comment": "Process id of the worker"},
"executed": {"type": "datetime", "not null": "1", "default": "0001-01-01 00:00:00", "comment": "Execution date"},
+ "next_try": {"type": "datetime", "not null": "1", "default": "0001-01-01 00:00:00", "comment": "Next retrial date"},
+ "retrial": {"type": "tinyint", "not null": "1", "default": "0", "comment": "Retrial counter"},
"done": {"type": "boolean", "not null": "1", "default": "0", "comment": "Marked 1 when the task was done - will be deleted later"}
},
"indexes": {
"PRIMARY": ["id"],
"pid": ["pid"],
"parameter": ["parameter(64)"],
- "priority_created": ["priority", "created"],
- "done_executed": ["done", "executed"]
+ "priority_created_next_try": ["priority", "created", "next_try"],
+ "done_executed_next_try": ["done", "executed", "next_try"]
}
}
}
use Friendica\Model\Process;
use Friendica\Util\DateTimeFormat;
use Friendica\Util\Network;
+use Friendica\BaseObject;
require_once 'include/dba.php';
*/
private static function totalEntries()
{
- return DBA::count('workerqueue', ["`executed` <= ? AND NOT `done`", NULL_DATE]);
+ return DBA::count('workerqueue', ["`executed` <= ? AND NOT `done` AND `next_try` < ?",
+ NULL_DATE, DateTimeFormat::utcNow()]);
}
/**
*/
private static function highestPriority()
{
- $condition = ["`executed` <= ? AND NOT `done`", NULL_DATE];
+ $condition = ["`executed` <= ? AND NOT `done` AND `next_try` < ?", NULL_DATE, DateTimeFormat::utcNow()];
$workerqueue = DBA::selectFirst('workerqueue', ['priority'], $condition, ['order' => ['priority']]);
if (DBA::isResult($workerqueue)) {
return $workerqueue["priority"];
*/
private static function processWithPriorityActive($priority)
{
- $condition = ["`priority` <= ? AND `executed` > ? AND NOT `done`", $priority, NULL_DATE];
+ $condition = ["`priority` <= ? AND `executed` > ? AND NOT `done` AND `next_try` < ?",
+ $priority, NULL_DATE, DateTimeFormat::utcNow()];
return DBA::exists('workerqueue', $condition);
}
self::execFunction($queue, $include, $argv, true);
$stamp = (float)microtime(true);
- if (DBA::update('workerqueue', ['done' => true], ['id' => $queue["id"]])) {
+ if (DBA::update('workerqueue', ['done' => true], ['id' => $queue['id']])) {
Config::set('system', 'last_worker_execution', DateTimeFormat::utcNow());
}
self::$db_duration = (microtime(true) - $stamp);
$result = DBA::select(
'workerqueue',
['id'],
- ["`executed` <= ? AND `priority` < ? AND NOT `done`", NULL_DATE, $highest_priority],
+ ["`executed` <= ? AND `priority` < ? AND NOT `done` AND `next_try` < ?",
+ NULL_DATE, $highest_priority, DateTimeFormat::utcNow()],
['limit' => $limit, 'order' => ['priority', 'created']]
);
$result = DBA::select(
'workerqueue',
['id'],
- ["`executed` <= ? AND `priority` > ? AND NOT `done`", NULL_DATE, $highest_priority],
+ ["`executed` <= ? AND `priority` > ? AND NOT `done` AND `next_try` < ?",
+ NULL_DATE, $highest_priority, DateTimeFormat::utcNow()],
['limit' => $limit, 'order' => ['priority', 'created']]
);
$result = DBA::select(
'workerqueue',
['id'],
- ["`executed` <= ? AND NOT `done`", NULL_DATE],
+ ["`executed` <= ? AND NOT `done` AND `next_try` < ?",
+ NULL_DATE, DateTimeFormat::utcNow()],
['limit' => $limit, 'order' => ['priority', 'created']]
);
return true;
}
+ /**
+ * Defers the current worker entry
+ */
+ public static function defer()
+ {
+ if (empty(BaseObject::getApp()->queue)) {
+ return;
+ }
+
+ $queue = BaseObject::getApp()->queue;
+
+ $retrial = $queue['retrial'];
+ $id = $queue['id'];
+
+ if ($retrial > 14) {
+ logger('Id ' . $id . ' had been tried 14 times, it will be deleted now.', LOGGER_DEBUG);
+ DBA::delete('workerqueue', ['id' => $id]);
+ }
+
+ // Calculate the delay until the next trial
+ $delay = (($retrial + 3) ** 4) + (rand(1, 30) * ($retrial + 1));
+ $next = DateTimeFormat::utc('now + ' . $delay . ' seconds');
+
+ logger('Defer execution ' . $retrial . ' of id ' . $id . ' to ' . $next, LOGGER_DEBUG);
+
+ $fields = ['retrial' => $retrial + 1, 'next_try' => $next, 'executed' => NULL_DATE, 'pid' => 0];
+ DBA::update('workerqueue', $fields, ['id' => $id]);
+ }
+
/**
* Log active processes into the "process" table
*