+/**
+ * @brief Assigns a workerqueue entry to the current process
+ *
+ * When we are sure that the table locks are working correctly, we can remove the checks from here
+ *
+ * @param array $queue Workerqueue entry
+ *
+ * @return boolean "true" if the claiming was successful
+ */
+function poller_claim_process($queue) {
+ $mypid = getmypid();
+
+ $success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid),
+ array('id' => $queue["id"], 'pid' => 0));
+ dba::unlock();
+
+ if (!$success) {
+ logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);
+ return false;
+ }
+
+ // Assure that there are no tasks executed twice
+ $id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
+ if (!$id) {
+ logger("Queue item ".$queue["id"]." vanished - skip this execution", LOGGER_DEBUG);
+ return false;
+ } elseif ((strtotime($id[0]["executed"]) <= 0) || ($id[0]["pid"] == 0)) {
+ logger("Entry for queue item ".$queue["id"]." wasn't stored - skip this execution", LOGGER_DEBUG);
+ return false;
+ } elseif ($id[0]["pid"] != $mypid) {
+ logger("Queue item ".$queue["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG);
+ return false;
+ }
+ return true;
+}
+
+/**
+ * @brief Removes a workerqueue entry from the current process
+ */
+function poller_unclaim_process() {
+ $mypid = getmypid();
+
+ dba::update('workerqueue', array('executed' => NULL_DATE, 'pid' => 0), array('pid' => $mypid));
+}
+