]> git.mxchange.org Git - quix0rs-gnu-social.git/blobdiff - lib/dbqueuemanager.php
Misses this file to merge. I like the comments.
[quix0rs-gnu-social.git] / lib / dbqueuemanager.php
index 6e7172de005ee7df94096e459122f936c424c3b4..45c4b694d2e4848237bcae4563461fbb79ab55be 100644 (file)
@@ -1,6 +1,6 @@
 <?php
 /**
- * Laconica, the distributed open-source microblogging tool
+ * StatusNet, the distributed open-source microblogging tool
  *
  * Simple-minded queue manager for storing items in the database
  *
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  *
  * @category  QueueManager
- * @package   Laconica
- * @author    Evan Prodromou <evan@controlyourself.ca>
- * @author    Sarven Capadisli <csarven@controlyourself.ca>
- * @copyright 2009 Control Yourself, Inc.
+ * @package   StatusNet
+ * @author    Evan Prodromou <evan@status.net>
+ * @author    Brion Vibber <brion@status.net>
+ * @copyright 2009-2010 StatusNet, Inc.
  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
- * @link      http://laconi.ca/
+ * @link      http://status.net/
  */
 
 class DBQueueManager extends QueueManager
 {
-    var $qis = array();
-
-    function enqueue($object, $queue)
+    /**
+     * Saves an object reference into the queue item table.
+     * @return boolean true on success
+     * @throws ServerException on failure
+     */
+    public function enqueue($object, $queue)
     {
-        $notice = $object;
-
         $qi = new Queue_item();
 
-        $qi->notice_id = $notice->id;
+        $qi->frame     = $this->encode($object);
         $qi->transport = $queue;
-        $qi->created   = $notice->created;
+        $qi->created   = common_sql_now();
         $result        = $qi->insert();
 
-        if (!$result) {
+        if ($result === false) {
             common_log_db_error($qi, 'INSERT', __FILE__);
             throw new ServerException('DB error inserting queue item');
         }
 
+        $this->stats('enqueued', $queue);
+
         return true;
     }
 
-    function service($queue, $handler)
+    /**
+     * Poll every 10 seconds for new events during idle periods.
+     * We'll look in more often when there's data available.
+     *
+     * @return int seconds
+     */
+    public function pollInterval()
     {
-        while (true) {
-            $this->_log(LOG_DEBUG, 'Checking for notices...');
-            $notice = $this->_nextItem($queue, null);
-            if (empty($notice)) {
-                $this->_log(LOG_DEBUG, 'No notices waiting; idling.');
-                // Nothing in the queue. Do you
-                // have other tasks, like servicing your
-                // XMPP connection, to do?
-                $handler->idle(QUEUE_HANDLER_MISS_IDLE);
-            } else {
-                $this->_log(LOG_INFO, 'Got notice '. $notice->id);
-                // Yay! Got one!
-                if ($handler->handle_notice($notice)) {
-                    $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
-                    $this->_done($notice, $queue);
-                } else {
-                    $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id);
-                    $this->_fail($notice, $queue);
-                }
-                // Chance to e.g. service your XMPP connection
-                $this->_log(LOG_DEBUG, 'Idling after success.');
-                $handler->idle(QUEUE_HANDLER_HIT_IDLE);
-            }
-            // XXX: when do we give up?
-        }
+        return 10;
     }
 
-    function _nextItem($queue, $timeout=null)
+    /**
+     * Run a polling cycle during idle processing in the input loop.
+     * @return boolean true if we should poll again for more data immediately
+     */
+    public function poll()
     {
-        $start = time();
-        $result = null;
+        //$this->_log(LOG_DEBUG, 'Checking for notices...');
+        $qi = Queue_item::top($this->activeQueues());
+        if (!$qi instanceof Queue_item) {
+            //$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
+            return false;
+        }
 
-        do {
-            $qi = Queue_item::top($queue);
-            if (!empty($qi)) {
-                $notice = Notice::staticGet('id', $qi->notice_id);
-                if (!empty($notice)) {
-                    $result = $notice;
-                } else {
-                    $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id);
-                    $qi->delete();
-                    $qi->free();
-                    $qi = null;
-                }
+        try {
+            $item = $this->decode($qi->frame);
+        } catch (Exception $e) {
+            $this->_log(LOG_INFO, "[{$qi->transport}] Discarding: ".$e->getMessage());
+            $this->_done($qi);
+            return true;
+        }
+
+        $rep = $this->logrep($item);
+        $this->_log(LOG_DEBUG, "Got {$rep} for transport {$qi->transport}");
+        
+        $handler = $this->getHandler($qi->transport);
+        if ($handler) {
+            if ($handler->handle($item)) {
+                $this->_log(LOG_INFO, "[{$qi->transport}:$rep] Successfully handled item");
+                $this->_done($qi);
+            } else {
+                $this->_log(LOG_INFO, "[{$qi->transport}:$rep] Failed to handle item");
+                $this->_fail($qi);
             }
-        } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout));
+        } else {
+            $this->noHandlerFound($qi, $rep);
+        }
+        return true;
+    }
 
-        return $result;
+    // What to do if no handler was found. For example, the OpportunisticQM
+    // should avoid deleting items just because it can't reach XMPP queues etc.
+    protected function noHandlerFound(Queue_item $qi, $rep=null) {
+        $this->_log(LOG_INFO, "[{$qi->transport}:{$rep}] No handler for queue {$qi->transport}; discarding.");
+        $this->_done($qi);
     }
 
-    function _done($object, $queue)
+    /**
+     * Delete our claimed item from the queue after successful processing.
+     *
+     * @param QueueItem $qi
+     */
+    protected function _done(Queue_item $qi)
     {
-        // XXX: right now, we only handle notices
-
-        $notice = $object;
-
-        $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
-                                        'transport' => $queue));
-
-        if (empty($qi)) {
-            $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
-        } else {
-            if (empty($qi->claimed)) {
-                $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '.
-                           'for '.$notice->id.', queue '.$queue);
-            }
-            $qi->delete();
-            $qi->free();
-            $qi = null;
+        if (empty($qi->claimed)) {
+            $this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item {$qi->id} from {$qi->transport}");
         }
+        $qi->delete();
 
-        $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
-
-        $notice->free();
-        $notice = null;
+        $this->stats('handled', $qi->transport);
     }
 
-    function _fail($object, $queue)
+    /**
+     * Free our claimed queue item for later reprocessing in case of
+     * temporary failure.
+     *
+     * @param QueueItem $qi
+     */
+    protected function _fail(Queue_item $qi, $releaseOnly=false)
     {
-        // XXX: right now, we only handle notices
-
-        $notice = $object;
-
-        $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
-                                        'transport' => $queue));
-
-        if (empty($qi)) {
-            $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
+        if (empty($qi->claimed)) {
+            $this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Ignoring failure for unclaimed queue item");
         } else {
-            if (empty($qi->claimed)) {
-                $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '.
-                           'for '.$notice->id.', queue '.$queue);
-            } else {
-                $orig = clone($qi);
-                $qi->claimed = null;
-                $qi->update($orig);
-                $qi = null;
-            }
+            $qi->releaseClaim();
         }
 
-        $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
-
-        $notice->free();
-        $notice = null;
-    }
-
-    function _log($level, $msg)
-    {
-        common_log($level, 'DBQueueManager: '.$msg);
+        if (!$releaseOnly) {
+            $this->stats('error', $qi->transport);
+        }
     }
 }