]> git.mxchange.org Git - quix0rs-gnu-social.git/blobdiff - lib/queuehandler.php
Added class entry-content to attachment list container
[quix0rs-gnu-social.git] / lib / queuehandler.php
index 747e7b49303dd8bd5267615a9dcf48b334342c08..d5e0150d9c9997ba334b8df6c00a15d141ea8daa 100644 (file)
@@ -1,7 +1,7 @@
 <?php
 /*
  * Laconica - a distributed open-source microblogging tool
- * Copyright (C) 2008, Controlez-Vous, Inc.
+ * Copyright (C) 2008, 2009, Control Yourself, Inc.
  *
  * This program is free software: you can redistribute it and/or modify
  * it under the terms of the GNU Affero General Public License as published by
 
 define('CLAIM_TIMEOUT', 1200);
 
-class QueueHandler {
-
-       var $_id = 'generic';
-
-       function QueueHandler($id=NULL) {
-               if ($id) {
-                       $this->set_id($id);
-               }
-       }
-       
-       function class_name() {
-               return ucfirst($this->transport()) . 'Handler';
-       }
-       
-       function get_id() {
-               return $this->_id;
-       }
-
-       function set_id($id) {
-               $this->_id = $id;
-       }
-       
-       function transport() {
-               return NULL;
-       }
-       
-       function start() {
-       }
-       
-       function finish() {
-       }
-
-       function handle_notice($notice) {
-               return true;
-       }
-       
-       function handle_queue() {
-               $this->log(LOG_INFO, 'checking for queued notices');
-               $cnt = 0;
-               $transport = $this->transport();
-               do {
-                       $qi = Queue_item::top($transport);
-                       if ($qi) {
-                               $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created));
-                               $notice = Notice::staticGet($qi->notice_id);
-                               if ($notice) {
-                                       $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
-                                       # XXX: what to do if broadcast fails?
-                                       $result = $this->handle_notice($notice);
-                                       if (!$result) {
-                                               $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
-                                               $orig = $qi;
-                                               $qi->claimed = NULL;
-                                               $qi->update($orig);
-                                               $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id);
-                                               continue;
-                                       }
-                                       $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
-                                       $notice = NULL;
-                               } else {
-                                       $this->log(LOG_WARNING, 'queue item for notice that does not exist');
-                               }
-                               $qi->delete();
-                               $cnt++;
-                       } else {
-                               $this->clear_old_claims();
-                               sleep(10);
-                       }       
-               } while (true);
-       }
-
-       function clear_old_claims() {
-               $qi = new Queue_item();
-               $qi->transport = $this->transport();
-               $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
-               $qi->update(DB_DATAOBJECT_WHEREADD_ONLY);
-       }
-       
-       function log($level, $msg) {
-               common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
-       }
+if (!defined('LACONICA')) { exit(1); }
+
+require_once(INSTALLDIR.'/lib/daemon.php');
+require_once(INSTALLDIR.'/classes/Queue_item.php');
+require_once(INSTALLDIR.'/classes/Notice.php');
+
+class QueueHandler extends Daemon
+{
+
+    var $_id = 'generic';
+
+    function QueueHandler($id=null)
+    {
+        if ($id) {
+            $this->set_id($id);
+        }
+    }
+
+    function class_name()
+    {
+        return ucfirst($this->transport()) . 'Handler';
+    }
+
+    function name()
+    {
+        return strtolower($this->class_name().'.'.$this->get_id());
+    }
+
+    function get_id()
+    {
+        return $this->_id;
+    }
+
+    function set_id($id)
+    {
+        $this->_id = $id;
+    }
+
+    function transport()
+    {
+        return null;
+    }
+
+    function start()
+    {
+    }
+
+    function finish()
+    {
+    }
+
+    function handle_notice($notice)
+    {
+        return true;
+    }
+
+    function db_dispatch() {
+        do {
+            $qi = Queue_item::top($this->transport());
+            if ($qi) {
+                $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created));
+                $notice = Notice::staticGet($qi->notice_id);
+                if ($notice) {
+                    $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
+                    # XXX: what to do if broadcast fails?
+                    $result = $this->handle_notice($notice);
+                    if (!$result) {
+                        $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
+                        $orig = $qi;
+                        $qi->claimed = null;
+                        $qi->update($orig);
+                        $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id);
+                        continue;
+                    }
+                    $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
+                    $notice->free();
+                    unset($notice);
+                    $notice = null;
+                } else {
+                    $this->log(LOG_WARNING, 'queue item for notice that does not exist');
+                }
+                $qi->delete();
+                $qi->free();
+                unset($qi);
+                $this->idle(0);
+            } else {
+                $this->clear_old_claims();
+                $this->idle(5);
+            }
+        } while (true);
+    }
+
+    function stomp_dispatch() {
+        require("Stomp.php");
+        $con = new Stomp(common_config('queue','stomp_server'));
+        if (!$con->connect()) {
+            $this->log(LOG_ERR, 'Failed to connect to queue server');
+            return false;
+        }
+        $queue_basename = common_config('queue','queue_basename');
+        // subscribe to the relevant queue (format: basename-transport)
+        $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport());
+
+        do {
+            $frame = $con->readFrame();
+            if ($frame) {
+                $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created']));
+
+                // XXX: Now the queue handler receives only the ID of the
+                // notice, and it has to get it from the DB
+                // A massive improvement would be avoid DB query by transmitting
+                // all the notice details via queue server...
+                $notice = Notice::staticGet($frame->body);
+
+                if ($notice) {
+                    $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
+                    $result = $this->handle_notice($notice);
+                    if ($result) {
+                        // if the msg has been handled positively, ack it
+                        // and the queue server will remove it from the queue
+                        $con->ack($frame);
+                        $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
+                    }
+                    else {
+                        // no ack
+                        $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
+                    }
+                    $notice->free();
+                    unset($notice);
+                    $notice = null;
+                } else {
+                    $this->log(LOG_WARNING, 'queue item for notice that does not exist');
+                }
+            }
+        } while (true);
+
+        $con->disconnect();
+    }
+
+    function run()
+    {
+        if (!$this->start()) {
+            return false;
+        }
+        $this->log(LOG_INFO, 'checking for queued notices');
+        if (common_config('queue','subsystem') == 'stomp') {
+            $this->stomp_dispatch();
+        }
+        else {
+            $this->db_dispatch();
+        }
+        if (!$this->finish()) {
+            return false;
+        }
+        return true;
+    }
+
+    function idle($timeout=0)
+    {
+        if ($timeout>0) {
+            sleep($timeout);
+        }
+    }
+
+    function clear_old_claims()
+    {
+        $qi = new Queue_item();
+        $qi->transport = $this->transport();
+        $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
+        $qi->update(DB_DATAOBJECT_WHEREADD_ONLY);
+        $qi->free();
+        unset($qi);
+    }
+
+    function log($level, $msg)
+    {
+        common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
+    }
 }
-       
\ No newline at end of file
+