]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
Merge branch 'queuemanager' into 0.8.x
authorEvan Prodromou <evan@controlyourself.ca>
Tue, 7 Jul 2009 15:35:00 +0000 (11:35 -0400)
committerEvan Prodromou <evan@controlyourself.ca>
Tue, 7 Jul 2009 15:35:00 +0000 (11:35 -0400)
classes/Notice.php
classes/Queue_item.php
lib/dbqueuemanager.php [new file with mode: 0644]
lib/queuehandler.php
lib/queuemanager.php [new file with mode: 0644]
lib/stompqueuemanager.php [new file with mode: 0644]
lib/unqueuemanager.php [new file with mode: 0644]
lib/util.php
lib/xmppqueuehandler.php

index 8a018068aee6289580217df68b627936985442ab..5ec0692d90e5e3eadffc2da8a529be7263b6347f 100644 (file)
@@ -1210,7 +1210,7 @@ class Notice extends Memcached_DataObject
             $window = explode(',', $laststr);
             $last_id = $window[0];
             $new_ids = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW,
-                                                                          $last_id, 0, null, $tag)));
+                                                                          $last_id, 0, null)));
 
             $new_window = array_merge($new_ids, $window);
 
@@ -1225,7 +1225,7 @@ class Notice extends Memcached_DataObject
         }
 
         $window = call_user_func_array($fn, array_merge($args, array(0, NOTICE_CACHE_WINDOW,
-                                                                     0, 0, null, $tag)));
+                                                                     0, 0, null)));
 
         $windowstr = implode(',', $window);
 
index 9b909ec22ba8360ca1f39ee5f990cd47ed92f3e7..295c321b57d4615fa5a139b9e14b9752fd19a4e3 100644 (file)
@@ -4,7 +4,7 @@
  */
 require_once INSTALLDIR.'/classes/Memcached_DataObject.php';
 
-class Queue_item extends Memcached_DataObject 
+class Queue_item extends Memcached_DataObject
 {
     ###START_AUTOCODE
     /* the code below is auto generated do not remove the above tag */
@@ -13,7 +13,7 @@ class Queue_item extends Memcached_DataObject
     public $notice_id;                       // int(4)  primary_key not_null
     public $transport;                       // varchar(8)  primary_key not_null
     public $created;                         // datetime()   not_null
-    public $claimed;                         // datetime()  
+    public $claimed;                         // datetime()
 
     /* Static get */
     function staticGet($k,$v=null)
@@ -24,7 +24,7 @@ class Queue_item extends Memcached_DataObject
 
     function sequenceKey()
     { return array(false, false); }
-    
+
     static function top($transport) {
 
         $qi = new Queue_item();
@@ -54,4 +54,9 @@ class Queue_item extends Memcached_DataObject
         $qi = null;
         return null;
     }
+
+    function &pkeyGet($kv)
+    {
+        return Memcached_DataObject::pkeyGet('Queue_item', $kv);
+    }
 }
diff --git a/lib/dbqueuemanager.php b/lib/dbqueuemanager.php
new file mode 100644 (file)
index 0000000..6e7172d
--- /dev/null
@@ -0,0 +1,166 @@
+<?php
+/**
+ * Laconica, the distributed open-source microblogging tool
+ *
+ * Simple-minded queue manager for storing items in the database
+ *
+ * PHP version 5
+ *
+ * LICENCE: This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category  QueueManager
+ * @package   Laconica
+ * @author    Evan Prodromou <evan@controlyourself.ca>
+ * @author    Sarven Capadisli <csarven@controlyourself.ca>
+ * @copyright 2009 Control Yourself, Inc.
+ * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link      http://laconi.ca/
+ */
+
+class DBQueueManager extends QueueManager
+{
+    var $qis = array();
+
+    function enqueue($object, $queue)
+    {
+        $notice = $object;
+
+        $qi = new Queue_item();
+
+        $qi->notice_id = $notice->id;
+        $qi->transport = $queue;
+        $qi->created   = $notice->created;
+        $result        = $qi->insert();
+
+        if (!$result) {
+            common_log_db_error($qi, 'INSERT', __FILE__);
+            throw new ServerException('DB error inserting queue item');
+        }
+
+        return true;
+    }
+
+    function service($queue, $handler)
+    {
+        while (true) {
+            $this->_log(LOG_DEBUG, 'Checking for notices...');
+            $notice = $this->_nextItem($queue, null);
+            if (empty($notice)) {
+                $this->_log(LOG_DEBUG, 'No notices waiting; idling.');
+                // Nothing in the queue. Do you
+                // have other tasks, like servicing your
+                // XMPP connection, to do?
+                $handler->idle(QUEUE_HANDLER_MISS_IDLE);
+            } else {
+                $this->_log(LOG_INFO, 'Got notice '. $notice->id);
+                // Yay! Got one!
+                if ($handler->handle_notice($notice)) {
+                    $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id);
+                    $this->_done($notice, $queue);
+                } else {
+                    $this->_log(LOG_INFO, 'Failed to handle notice '. $notice->id);
+                    $this->_fail($notice, $queue);
+                }
+                // Chance to e.g. service your XMPP connection
+                $this->_log(LOG_DEBUG, 'Idling after success.');
+                $handler->idle(QUEUE_HANDLER_HIT_IDLE);
+            }
+            // XXX: when do we give up?
+        }
+    }
+
+    function _nextItem($queue, $timeout=null)
+    {
+        $start = time();
+        $result = null;
+
+        do {
+            $qi = Queue_item::top($queue);
+            if (!empty($qi)) {
+                $notice = Notice::staticGet('id', $qi->notice_id);
+                if (!empty($notice)) {
+                    $result = $notice;
+                } else {
+                    $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id);
+                    $qi->delete();
+                    $qi->free();
+                    $qi = null;
+                }
+            }
+        } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout));
+
+        return $result;
+    }
+
+    function _done($object, $queue)
+    {
+        // XXX: right now, we only handle notices
+
+        $notice = $object;
+
+        $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
+                                        'transport' => $queue));
+
+        if (empty($qi)) {
+            $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
+        } else {
+            if (empty($qi->claimed)) {
+                $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '.
+                           'for '.$notice->id.', queue '.$queue);
+            }
+            $qi->delete();
+            $qi->free();
+            $qi = null;
+        }
+
+        $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
+
+        $notice->free();
+        $notice = null;
+    }
+
+    function _fail($object, $queue)
+    {
+        // XXX: right now, we only handle notices
+
+        $notice = $object;
+
+        $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
+                                        'transport' => $queue));
+
+        if (empty($qi)) {
+            $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
+        } else {
+            if (empty($qi->claimed)) {
+                $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '.
+                           'for '.$notice->id.', queue '.$queue);
+            } else {
+                $orig = clone($qi);
+                $qi->claimed = null;
+                $qi->update($orig);
+                $qi = null;
+            }
+        }
+
+        $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
+
+        $notice->free();
+        $notice = null;
+    }
+
+    function _log($level, $msg)
+    {
+        common_log($level, 'DBQueueManager: '.$msg);
+    }
+}
index c1c4f3309a24011b60e72de070a5fcef0267ad56..c0f38f4e35f35fa9f7f132ecf56fcf3f236e9aec 100644 (file)
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
-define('CLAIM_TIMEOUT', 1200);
-
 if (!defined('LACONICA')) { exit(1); }
 
 require_once(INSTALLDIR.'/lib/daemon.php');
 require_once(INSTALLDIR.'/classes/Queue_item.php');
 require_once(INSTALLDIR.'/classes/Notice.php');
 
+define('CLAIM_TIMEOUT', 1200);
+define('QUEUE_HANDLER_MISS_IDLE', 10);
+define('QUEUE_HANDLER_HIT_IDLE', 0);
+
 class QueueHandler extends Daemon
 {
     var $_id = 'generic';
@@ -38,6 +40,11 @@ class QueueHandler extends Daemon
         }
     }
 
+    function timeout()
+    {
+        return 60;
+    }
+
     function class_name()
     {
         return ucfirst($this->transport()) . 'Handler';
@@ -76,110 +83,21 @@ class QueueHandler extends Daemon
         return true;
     }
 
-    function db_dispatch() {
-        do {
-            $qi = Queue_item::top($this->transport());
-            if ($qi) {
-                $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created));
-                $notice = Notice::staticGet($qi->notice_id);
-                if ($notice) {
-                    $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
-                    # XXX: what to do if broadcast fails?
-                    $result = $this->handle_notice($notice);
-                    if (!$result) {
-                        $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
-                        $orig = $qi;
-                        $qi->claimed = null;
-                        $qi->update($orig);
-                        $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id);
-                        continue;
-                    }
-                    $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
-                    $notice->free();
-                    unset($notice);
-                    $notice = null;
-                } else {
-                    $this->log(LOG_WARNING, 'queue item for notice that does not exist');
-                }
-                $qi->delete();
-                $qi->free();
-                unset($qi);
-                $this->idle(0);
-            } else {
-                $this->clear_old_claims();
-                $this->idle(5);
-            }
-        } while (true);
-    }
-
-    function stomp_dispatch() {
-
-        // use an external message queue system via STOMP
-        require_once("Stomp.php");
-
-        $server = common_config('queue','stomp_server');
-        $username = common_config('queue', 'stomp_username');
-        $password = common_config('queue', 'stomp_password');
-
-        $con = new Stomp($server);
-
-        if (!$con->connect($username, $password)) {
-            $this->log(LOG_ERR, 'Failed to connect to queue server');
-            return false;
-        }
-
-        $queue_basename = common_config('queue','queue_basename');
-        // subscribe to the relevant queue (format: basename-transport)
-        $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport());
-
-        do {
-            $frame = $con->readFrame();
-            if ($frame) {
-                $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created']));
-
-                // XXX: Now the queue handler receives only the ID of the
-                // notice, and it has to get it from the DB
-                // A massive improvement would be avoid DB query by transmitting
-                // all the notice details via queue server...
-                $notice = Notice::staticGet($frame->body);
-
-                if ($notice) {
-                    $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
-                    $result = $this->handle_notice($notice);
-                    if ($result) {
-                        // if the msg has been handled positively, ack it
-                        // and the queue server will remove it from the queue
-                        $con->ack($frame);
-                        $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
-                    }
-                    else {
-                        // no ack
-                        $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
-                    }
-                    $notice->free();
-                    unset($notice);
-                    $notice = null;
-                } else {
-                    $this->log(LOG_WARNING, 'queue item for notice that does not exist');
-                }
-            }
-        } while (true);
-
-        $con->disconnect();
-    }
-
     function run()
     {
         if (!$this->start()) {
             return false;
         }
+
         $this->log(LOG_INFO, 'checking for queued notices');
-        if (common_config('queue','subsystem') == 'stomp') {
-            $this->stomp_dispatch();
-        }
-        else {
-            $this->db_dispatch();
-        }
+
+        $queue   = $this->transport();
+        $timeout = $this->timeout();
+
+        $qm = QueueManager::get();
+
+        $qm->service($queue, $this);
+
         if (!$this->finish()) {
             return false;
         }
@@ -188,21 +106,11 @@ class QueueHandler extends Daemon
 
     function idle($timeout=0)
     {
-        if ($timeout>0) {
+        if ($timeout > 0) {
             sleep($timeout);
         }
     }
 
-    function clear_old_claims()
-    {
-        $qi = new Queue_item();
-        $qi->transport = $this->transport();
-        $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
-        $qi->update(DB_DATAOBJECT_WHEREADD_ONLY);
-        $qi->free();
-        unset($qi);
-    }
-
     function log($level, $msg)
     {
         common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
diff --git a/lib/queuemanager.php b/lib/queuemanager.php
new file mode 100644 (file)
index 0000000..582c247
--- /dev/null
@@ -0,0 +1,74 @@
+<?php
+/**
+ * Laconica, the distributed open-source microblogging tool
+ *
+ * Abstract class for queue managers
+ *
+ * PHP version 5
+ *
+ * LICENCE: This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category  QueueManager
+ * @package   Laconica
+ * @author    Evan Prodromou <evan@controlyourself.ca>
+ * @author    Sarven Capadisli <csarven@controlyourself.ca>
+ * @copyright 2009 Control Yourself, Inc.
+ * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link      http://laconi.ca/
+ */
+
+class QueueManager
+{
+    static $qm = null;
+
+    static function get()
+    {
+        if (empty(self::$qm)) {
+
+            if (Event::handle('StartNewQueueManager', array(&self::$qm))) {
+
+                $enabled = common_config('queue', 'enabled');
+                $type = common_config('queue', 'subsystem');
+
+                if (!$enabled) {
+                    // does everything immediately
+                    self::$qm = new UnQueueManager();
+                } else {
+                    switch ($type) {
+                     case 'db':
+                        self::$qm = new DBQueueManager();
+                        break;
+                     case 'stomp':
+                        self::$qm = new StompQueueManager();
+                        break;
+                     default:
+                        throw new ServerException("No queue manager class for type '$type'");
+                    }
+                }
+            }
+        }
+
+        return self::$qm;
+    }
+
+    function enqueue($object, $queue)
+    {
+        throw ServerException("Unimplemented function 'enqueue' called");
+    }
+
+    function service($queue, $handler)
+    {
+        throw ServerException("Unimplemented function 'service' called");
+    }
+}
diff --git a/lib/stompqueuemanager.php b/lib/stompqueuemanager.php
new file mode 100644 (file)
index 0000000..e7e1e00
--- /dev/null
@@ -0,0 +1,129 @@
+<?php
+/**
+ * Laconica, the distributed open-source microblogging tool
+ *
+ * Abstract class for queue managers
+ *
+ * PHP version 5
+ *
+ * LICENCE: This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category  QueueManager
+ * @package   Laconica
+ * @author    Evan Prodromou <evan@controlyourself.ca>
+ * @author    Sarven Capadisli <csarven@controlyourself.ca>
+ * @copyright 2009 Control Yourself, Inc.
+ * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link      http://laconi.ca/
+ */
+
+require_once 'Stomp.php';
+
+class StompQueueManager
+{
+    var $server = null;
+    var $username = null;
+    var $password = null;
+    var $base = null;
+    var $con = null;
+
+    function __construct()
+    {
+        $this->server   = common_config('queue', 'stomp_server');
+        $this->username = common_config('queue', 'stomp_username');
+        $this->password = common_config('queue', 'stomp_password');
+        $this->base     = common_config('queue', 'queue_basename');
+    }
+
+    function _connect()
+    {
+        if (empty($this->con)) {
+            $this->_log(LOG_INFO, "Connecting to '$this->server' as '$this->username'...");
+            $this->con = new Stomp($this->server);
+
+            if ($this->con->connect($this->username, $this->password)) {
+                $this->_log(LOG_INFO, "Connected.");
+            } else {
+                $this->_log(LOG_ERR, 'Failed to connect to queue server');
+                throw new ServerException('Failed to connect to queue server');
+            }
+        }
+    }
+
+    function enqueue($object, $queue)
+    {
+        $notice = $object;
+
+        $this->_connect();
+
+        // XXX: serialize and send entire notice
+
+        $result = $this->con->send($this->_queueName($queue),
+                                   $notice->id,                // BODY of the message
+                                   array ('created' => $notice->created));
+
+        if (!$result) {
+            common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
+            return false;
+        }
+
+        common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
+                   . $notice->id . ' for ' . $queue);
+    }
+
+    function service($queue, $handler)
+    {
+        $result = null;
+
+        $this->_connect();
+
+        $this->con->setReadTimeout($handler->timeout());
+
+        $this->con->subscribe($this->_queueName($queue));
+
+        while (true) {
+
+            $frame = $this->con->readFrame();
+
+            if (!empty($frame)) {
+                $notice = Notice::staticGet('id', $frame->body);
+
+                if (empty($notice)) {
+                    $this->_log(LOG_WARNING, 'Got ID '. $frame->body .' for non-existent notice');
+                    $this->con->ack($frame);
+                } else if ($handler->handle_notice($notice)) {
+                    $this->_log(LOG_INFO, 'Successfully handled notice '. $notice->id .' posted at ' . $frame->headers['created']);
+                    $this->con->ack($frame);
+                    unset($notice);
+                }
+
+                unset($frame);
+            }
+
+            $handler->idle(0);
+        }
+
+        $this->con->unsubscribe($this->_queueName($queue));
+    }
+
+    function _queueName($queue)
+    {
+        return common_config('queue', 'queue_basename') . $queue;
+    }
+
+    function _log($level, $msg)
+    {
+        common_log($level, 'StompQueueManager: '.$msg);
+    }
+}
diff --git a/lib/unqueuemanager.php b/lib/unqueuemanager.php
new file mode 100644 (file)
index 0000000..5154610
--- /dev/null
@@ -0,0 +1,85 @@
+<?php
+/**
+ * Laconica, the distributed open-source microblogging tool
+ *
+ * A queue manager interface for just doing things immediately
+ *
+ * PHP version 5
+ *
+ * LICENCE: This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category  QueueManager
+ * @package   Laconica
+ * @author    Evan Prodromou <evan@controlyourself.ca>
+ * @author    Sarven Capadisli <csarven@controlyourself.ca>
+ * @copyright 2009 Control Yourself, Inc.
+ * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link      http://laconi.ca/
+ */
+
+class UnQueueManager
+{
+    function enqueue($object, $queue)
+    {
+        $notice = $object;
+
+        switch ($queue)
+        {
+         case 'omb':
+            if ($this->_isLocal($notice)) {
+                require_once(INSTALLDIR.'/lib/omb.php');
+                omb_broadcast_remote_subscribers($notice);
+            }
+            break;
+         case 'public':
+            if ($this->_isLocal($notice)) {
+                require_once(INSTALLDIR.'/lib/jabber.php');
+                jabber_public_notice($notice);
+            }
+            break;
+         case 'twitter':
+            if ($this->_isLocal($notice)) {
+                broadcast_twitter($notice);
+            }
+            break;
+         case 'facebook':
+            if ($this->_isLocal($notice)) {
+                require_once INSTALLDIR . '/lib/facebookutil.php';
+                return facebookBroadcastNotice($notice);
+            }
+            break;
+         case 'ping':
+            if ($this->_isLocal($notice)) {
+                require_once INSTALLDIR . '/lib/ping.php';
+                return ping_broadcast_notice($notice);
+            }
+         case 'sms':
+            require_once(INSTALLDIR.'/lib/mail.php');
+            mail_broadcast_notice_sms($notice);
+            break;
+         case 'jabber':
+            require_once(INSTALLDIR.'/lib/jabber.php');
+            jabber_broadcast_notice($notice);
+            break;
+         default:
+            throw ServerException("UnQueueManager: Unknown queue: $type");
+        }
+    }
+
+    function _isLocal($notice)
+    {
+        return ($notice->is_local == NOTICE_LOCAL_PUBLIC ||
+                $notice->is_local == NOTICE_LOCAL_NONPUBLIC);
+    }
+}
\ No newline at end of file
index d4d79afb30c25fc76d405887a5f2c2e06ffe80ec..9e8ec41d2592a29f46f0aa62195d24ef5ea13fcb 100644 (file)
@@ -862,165 +862,45 @@ function common_redirect($url, $code=307)
 
 function common_broadcast_notice($notice, $remote=false)
 {
-    if (common_config('queue', 'enabled')) {
-        // Do it later!
-        return common_enqueue_notice($notice);
-    } else {
-        return common_real_broadcast($notice, $remote);
-    }
+    return common_enqueue_notice($notice);
 }
 
 // Stick the notice on the queue
 
 function common_enqueue_notice($notice)
 {
-    $transports = array('omb', 'sms', 'public', 'twitter', 'facebook', 'ping');
-
-    if (common_config('xmpp', 'enabled'))
-    {
-        $transports[] = 'jabber';
-    }
-
-    if (common_config('queue','subsystem') == 'stomp') {
-        common_enqueue_notice_stomp($notice, $transports);
-    }
-    else {
-        common_enqueue_notice_db($notice, $transports);
-    }
-    return $result;
-}
-
-function common_enqueue_notice_stomp($notice, $transports)
-{
-    // use an external message queue system via STOMP
-    require_once("Stomp.php");
+    static $localTransports = array('omb',
+                                    'twitter',
+                                    'facebook',
+                                    'ping');
+    static $allTransports = array('sms');
 
-    $server = common_config('queue','stomp_server');
-    $username = common_config('queue', 'stomp_username');
-    $password = common_config('queue', 'stomp_password');
+    $transports = $allTransports;
 
-    $con = new Stomp($server);
+    $xmpp = common_config('xmpp', 'enabled');
 
-    if (!$con->connect($username, $password)) {
-        common_log(LOG_ERR, 'Failed to connect to queue server');
-        return false;
+    if ($xmpp) {
+        $transports[] = 'jabber';
     }
 
-    $queue_basename = common_config('queue','queue_basename');
-
-    foreach ($transports as $transport) {
-        $result = $con->send('/queue/'.$queue_basename.'-'.$transport, // QUEUE
-                             $notice->id,              // BODY of the message
-                             array ('created' => $notice->created));
-        if (!$result) {
-            common_log(LOG_ERR, 'Error sending to '.$transport.' queue');
-            return false;
-        }
-        common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport);
-    }
-
-    //send tags as headers, so they can be used as JMS selectors
-    common_log(LOG_DEBUG, 'searching for tags ' . $notice->id);
-    $tags = array();
-    $tag = new Notice_tag();
-    $tag->notice_id = $notice->id;
-    if ($tag->find()) {
-        while ($tag->fetch()) {
-            common_log(LOG_DEBUG, 'tag found = ' . $tag->tag);
-            array_push($tags,$tag->tag);
+    if ($notice->is_local == NOTICE_LOCAL_PUBLIC ||
+        $notice->is_local == NOTICE_LOCAL_NONPUBLIC) {
+        $transports = array_merge($transports, $localTransports);
+        if ($xmpp) {
+            $transports[] = 'public';
         }
     }
-    $tag->free();
 
-    $con->send('/topic/laconica.'.$notice->profile_id,
-               $notice->content,
-               array(
-                     'profile_id' => $notice->profile_id,
-                     'created' => $notice->created,
-                     'tags' => implode($tags,' - ')
-                     )
-               );
-    common_log(LOG_DEBUG, 'sent to personal topic ' . $notice->id);
-    $con->send('/topic/laconica.allusers',
-               $notice->content,
-               array(
-                     'profile_id' => $notice->profile_id,
-                     'created' => $notice->created,
-                     'tags' => implode($tags,' - ')
-                     )
-               );
-    common_log(LOG_DEBUG, 'sent to catch-all topic ' . $notice->id);
-    $result = true;
-}
+    $qm = QueueManager::get();
 
-function common_enqueue_notice_db($notice, $transports)
-{
-    // in any other case, 'internal'
-    foreach ($transports as $transport) {
-        common_enqueue_notice_transport($notice, $transport);
+    foreach ($transports as $transport)
+    {
+        $qm->enqueue($notice, $transport);
     }
-}
 
-function common_enqueue_notice_transport($notice, $transport)
-{
-    $qi = new Queue_item();
-    $qi->notice_id = $notice->id;
-    $qi->transport = $transport;
-    $qi->created = $notice->created;
-    $result = $qi->insert();
-    if (!$result) {
-        $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError');
-        common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message);
-        throw new ServerException('DB error inserting queue item: ' . $last_error->message);
-    }
-    common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport);
     return true;
 }
 
-function common_real_broadcast($notice, $remote=false)
-{
-    $success = true;
-    if (!$remote) {
-        // Make sure we have the OMB stuff
-        require_once(INSTALLDIR.'/lib/omb.php');
-        $success = omb_broadcast_remote_subscribers($notice);
-        if (!$success) {
-            common_log(LOG_ERR, 'Error in OMB broadcast for notice ' . $notice->id);
-        }
-    }
-    if ($success) {
-        require_once(INSTALLDIR.'/lib/jabber.php');
-        $success = jabber_broadcast_notice($notice);
-        if (!$success) {
-            common_log(LOG_ERR, 'Error in jabber broadcast for notice ' . $notice->id);
-        }
-    }
-    if ($success) {
-        require_once(INSTALLDIR.'/lib/mail.php');
-        $success = mail_broadcast_notice_sms($notice);
-        if (!$success) {
-            common_log(LOG_ERR, 'Error in sms broadcast for notice ' . $notice->id);
-        }
-    }
-    if ($success) {
-        $success = jabber_public_notice($notice);
-        if (!$success) {
-            common_log(LOG_ERR, 'Error in public broadcast for notice ' . $notice->id);
-        }
-    }
-    if ($success) {
-        $success = broadcast_twitter($notice);
-        if (!$success) {
-            common_log(LOG_ERR, 'Error in Twitter broadcast for notice ' . $notice->id);
-        }
-    }
-
-    // XXX: Do a real-time FB broadcast here?
-
-    // XXX: broadcast notices to other IM
-    return $success;
-}
-
 function common_broadcast_profile($profile)
 {
     // XXX: optionally use a queue system like http://code.google.com/p/microapps/wiki/NQDQ
@@ -1148,6 +1028,9 @@ function common_log_objstring(&$object)
     if (is_null($object)) {
         return "null";
     }
+    if (!($object instanceof DB_DataObject)) {
+        return "(unknown)";
+    }
     $arr = $object->toArray();
     $fields = array();
     foreach ($arr as $k => $v) {
index 986e09c25e4c353cb0d849b5a4736edfd6638cac..c8b5ad1fb4e7efb4d968ed7b6c71d1a21cd9d828 100644 (file)
@@ -91,7 +91,7 @@ class XmppQueueHandler extends QueueHandler
         if (common_config('xmpp', 'listener')) {
             return common_config('xmpp', 'listener');
         } else {
-            return jabber_daemon_address() . '/' . common_config('xmpp','resource') . '-listener';
+            return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon';
         }
     }
 }