// We fetch the next queue entry that is about to be executed
while ($r = self::workerProcess()) {
if (self::IPCJobsExists(getmypid())) {
- self::IPCSetJobState(false, getmypid());
+ self::IPCDeleteJobState(getmypid());
}
+
// Don't refetch when a worker fetches tasks for multiple workers
$refetched = DI::config()->get('system', 'worker_multiple_fetch');
foreach ($r as $entry) {
} elseif ($pid) {
// The parent process continues here
DBA::connect();
- Logger::info('Spawned new worker', ['pid' => $pid]);
+
self::IPCSetJobState(true, $pid);
+ Logger::info('Spawned new worker', ['pid' => $pid]);
$cycles = 0;
while (self::IPCJobsExists($pid) && (++$cycles < 100)) {
// We now are in the new worker
DBA::connect();
+ /// @todo Reinitialize the logger to set a new process_id and uid
+
+ self::IPCSetJobState(true, getmypid());
Logger::info('Worker spawned', ['pid' => getmypid()]);
- $cycles = 0;
- while (!self::IPCJobsExists($pid) && (++$cycles < 100)) {
- usleep(10000);
- }
-
- Logger::info('Parent is ready', ['pid' => getmypid(), 'wait_cycles' => $cycles]);
-
self::processQueue($do_cron);
self::unclaimProcess();
* Set the flag if some job is waiting
*
* @param boolean $jobs Is there a waiting job?
+ * @param int $key Key number
* @throws \Exception
*/
public static function IPCSetJobState(bool $jobs, int $key = 0)
self::$db_duration_write += (microtime(true) - $stamp);
}
+ /**
+ * Delete a key entry
+ *
+ * @param int $key Key number
+ * @throws \Exception
+ */
+ public static function IPCDeleteJobState(int $key)
+ {
+ $stamp = (float)microtime(true);
+ DBA::delete('worker-ipc', ['key' => $key]);
+ self::$db_duration += (microtime(true) - $stamp);
+ self::$db_duration_write += (microtime(true) - $stamp);
+ }
+
/**
* Checks if some worker job waits to be executed
*
+ * @param int $key Key number
* @return bool
* @throws \Exception
*/