]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
XMPP queued output & initial retooling of DB queue manager to support non-Notice...
authorBrion Vibber <brion@pobox.com>
Fri, 22 Jan 2010 00:42:50 +0000 (16:42 -0800)
committerBrion Vibber <brion@pobox.com>
Fri, 22 Jan 2010 00:42:50 +0000 (16:42 -0800)
Queue handlers for XMPP individual & firehose output now send their XML stanzas
to another output queue instead of connecting directly to the chat server. This
lets us have as many general processing threads as we need, while all actual
XMPP input and output go through a single daemon with a single connection open.

This avoids problems with multiple connected resources:
* multiple windows shown in some chat clients (psi, gajim, kopete)
* extra load on server
* incoming message delivery forwarding issues

Database changes:
* queue_item drops 'notice_id' in favor of a 'frame' blob.
  This is based on Craig Andrews' work branch to generalize queues to take any
  object, but conservatively leaving out the serialization for now.
  Table updater (preserves any existing queued items) in db/rc3to09.sql

Code changes to watch out for:
* Queue handlers should now define a handle() method instead of handle_notice()
* QueueDaemon and XmppDaemon now share common i/o (IoMaster) and respawning
  thread management (RespawningDaemon) infrastructure.
* The polling XmppConfirmManager has been dropped, as the message is queued
  directly when saving IM settings.
* Enable $config['queue']['debug_memory'] to output current memory usage at
  each run through the event loop to watch for memory leaks

To do:
* Adapt XMPP i/o to component connection mode for multi-site support.
* XMPP input can also be broken out to a queue, which would allow the actual
  notice save etc to be handled by general queue threads.
* Make sure there are no problems with simply pushing serialized Notice objects
  to queues.
* Find a way to improve interactive performance of the database-backed queue
  handler; polling is pretty painful to XMPP.
* Possibly redo the way QueueHandlers are injected into a QueueManager. The
  grouping used to split out the XMPP output queue is a bit awkward.

33 files changed:
actions/imsettings.php
classes/Queue_item.php
classes/statusnet.ini
db/08to09.sql
db/rc3to09.sql [new file with mode: 0644]
db/statusnet.sql
lib/dbqueuemanager.php
lib/default.php
lib/iomaster.php
lib/jabber.php
lib/jabberqueuehandler.php
lib/ombqueuehandler.php
lib/pingqueuehandler.php
lib/pluginqueuehandler.php
lib/publicqueuehandler.php
lib/queued_xmpp.php [new file with mode: 0644]
lib/queuehandler.php
lib/queuemanager.php
lib/smsqueuehandler.php
lib/spawningdaemon.php [new file with mode: 0644]
lib/stompqueuemanager.php
lib/util.php
lib/xmppconfirmmanager.php [deleted file]
lib/xmppmanager.php
lib/xmppoutqueuehandler.php [new file with mode: 0644]
plugins/Enjit/enjitqueuehandler.php
plugins/Facebook/facebookqueuehandler.php
plugins/RSSCloud/RSSCloudPlugin.php
plugins/RSSCloud/RSSCloudQueueHandler.php [changed mode: 0755->0644]
plugins/TwitterBridge/twitterqueuehandler.php
scripts/handlequeued.php
scripts/queuedaemon.php
scripts/xmppdaemon.php

index 751c6117cd1c3d4ca96f19e35a2d01a08a64666d..af4915843d5f799fab73d4075795d65aa23dc648 100644 (file)
@@ -309,6 +309,8 @@ class ImsettingsAction extends ConnectSettingsAction
         $confirm->address_type = 'jabber';
         $confirm->user_id      = $user->id;
         $confirm->code         = common_confirmation_code(64);
+        $confirm->sent         = common_sql_now();
+        $confirm->claimed      = common_sql_now();
 
         $result = $confirm->insert();
 
@@ -318,11 +320,9 @@ class ImsettingsAction extends ConnectSettingsAction
             return;
         }
 
-        if (!common_config('queue', 'enabled')) {
-            jabber_confirm_address($confirm->code,
-                                   $user->nickname,
-                                   $jabber);
-        }
+        jabber_confirm_address($confirm->code,
+                               $user->nickname,
+                               $jabber);
 
         $msg = sprintf(_('A confirmation code was sent '.
                          'to the IM address you added. '.
index cf805a6060a6213cb003efb63f05496885591adc..f83c2cef184d49f868d712a82a016fefa34d4c68 100644 (file)
@@ -10,8 +10,8 @@ class Queue_item extends Memcached_DataObject
     /* the code below is auto generated do not remove the above tag */
 
     public $__table = 'queue_item';                      // table name
-    public $notice_id;                       // int(4)  primary_key not_null
-    public $transport;                       // varchar(8)  primary_key not_null
+    public $id;                              // int(4)  primary_key not_null
+    public $frame;                           // blob not_null
     public $created;                         // datetime()   not_null
     public $claimed;                         // datetime()
 
@@ -22,14 +22,21 @@ class Queue_item extends Memcached_DataObject
     /* the code above is auto generated do not remove the tag below */
     ###END_AUTOCODE
 
-    function sequenceKey()
-    { return array(false, false); }
-
-    static function top($transport=null) {
+    /**
+     * @param mixed $transports name of a single queue or array of queues to pull from
+     *                          If not specified, checks all queues in the system.
+     */
+    static function top($transports=null) {
 
         $qi = new Queue_item();
-        if ($transport) {
-            $qi->transport = $transport;
+        if ($transports) {
+            if (is_array($transports)) {
+                // @fixme use safer escaping
+                $list = implode("','", array_map('addslashes', $transports));
+                $qi->whereAdd("transport in ('$list')");
+            } else {
+                $qi->transport = $transports;
+            }
         }
         $qi->orderBy('created');
         $qi->whereAdd('claimed is null');
@@ -42,7 +49,7 @@ class Queue_item extends Memcached_DataObject
             # XXX: potential race condition
             # can we force it to only update if claimed is still null
             # (or old)?
-            common_log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id .
+            common_log(LOG_INFO, 'claiming queue item id = ' . $qi->id .
                 ' for transport ' . $qi->transport);
             $orig = clone($qi);
             $qi->claimed = common_sql_now();
@@ -57,9 +64,4 @@ class Queue_item extends Memcached_DataObject
         $qi = null;
         return null;
     }
-
-    function pkeyGet($kv)
-    {
-        return Memcached_DataObject::pkeyGet('Queue_item', $kv);
-    }
 }
index 44088cf6b09fa3ad2a6bfe68f095c47cd756ac98..6203650a693a626d6004908c3a5fe2d43221030a 100644 (file)
@@ -428,14 +428,14 @@ tagged = K
 tag = K
 
 [queue_item]
-notice_id = 129
+id = 129
+frame = 66
 transport = 130
 created = 142
 claimed = 14
 
 [queue_item__keys]
-notice_id = K
-transport = K
+id = K
 
 [related_group]
 group_id = 129
index d9c25bc723ade4610b69951a7dc9e6466e7e4391..b10e47dbcb67f769f1577c046c9222e0e772c600 100644 (file)
@@ -94,3 +94,19 @@ create table user_location_prefs (
     constraint primary key (user_id)
 ) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
 
+create table queue_item_new (
+    id integer auto_increment primary key comment 'unique identifier',
+    frame blob not null comment 'data: object reference or opaque string',
+    transport varchar(8) not null comment 'queue for what? "email", "jabber", "sms", "irc", ...',
+    created datetime not null comment 'date this record was created',
+    claimed datetime comment 'date this item was claimed',
+
+    index queue_item_created_idx (created)
+
+) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
+
+insert into queue_item_new (frame,transport,created,claimed)
+    select notice_id,transport,created,claimed from queue_item;
+alter table queue_item rename to queue_item_old;
+alter table queue_item_new rename to queue_item;
+
diff --git a/db/rc3to09.sql b/db/rc3to09.sql
new file mode 100644 (file)
index 0000000..02dc7a6
--- /dev/null
@@ -0,0 +1,16 @@
+create table queue_item_new (
+    id integer auto_increment primary key comment 'unique identifier',
+    frame blob not null comment 'data: object reference or opaque string',
+    transport varchar(8) not null comment 'queue for what? "email", "jabber", "sms", "irc", ...',
+    created datetime not null comment 'date this record was created',
+    claimed datetime comment 'date this item was claimed',
+
+    index queue_item_created_idx (created)
+
+) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
+
+insert into queue_item_new (frame,transport,created,claimed)
+    select notice_id,transport,created,claimed from queue_item;
+alter table queue_item rename to queue_item_old;
+alter table queue_item_new rename to queue_item;
+
index 2a9ab74c776a402b541b81a303355f278d3b1359..17de4fd0d48b0852b197bf1c18f3ded8524a4ac7 100644 (file)
@@ -274,13 +274,12 @@ create table remember_me (
 ) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
 
 create table queue_item (
-
-    notice_id integer not null comment 'notice queued' references notice (id),
+    id integer auto_increment primary key comment 'unique identifier',
+    frame blob not null comment 'data: object reference or opaque string',
     transport varchar(8) not null comment 'queue for what? "email", "jabber", "sms", "irc", ...',
     created datetime not null comment 'date this record was created',
     claimed datetime comment 'date this item was claimed',
 
-    constraint primary key (notice_id, transport),
     index queue_item_created_idx (created)
 
 ) ENGINE=InnoDB CHARACTER SET utf8 COLLATE utf8_bin;
index 889365b6495e0d454183fa3347839ffd40395c88..c6350fc669227f69403f9b9bb167604a6dbdb4a8 100644 (file)
 class DBQueueManager extends QueueManager
 {
     /**
-     * Saves a notice object reference into the queue item table.
+     * Saves an object reference into the queue item table.
      * @return boolean true on success
      * @throws ServerException on failure
      */
     public function enqueue($object, $queue)
     {
-        $notice = $object;
-
         $qi = new Queue_item();
 
-        $qi->notice_id = $notice->id;
+        $qi->frame     = $this->encode($object);
         $qi->transport = $queue;
-        $qi->created   = $notice->created;
+        $qi->created   = common_sql_now();
         $result        = $qi->insert();
 
         if (!$result) {
@@ -57,146 +55,92 @@ class DBQueueManager extends QueueManager
     }
 
     /**
-     * Poll every minute for new events during idle periods.
+     * Poll every 10 seconds for new events during idle periods.
      * We'll look in more often when there's data available.
      *
      * @return int seconds
      */
     public function pollInterval()
     {
-        return 60;
+        return 10;
     }
 
     /**
      * Run a polling cycle during idle processing in the input loop.
-     * @return boolean true if we had a hit
+     * @return boolean true if we should poll again for more data immediately
      */
     public function poll()
     {
         $this->_log(LOG_DEBUG, 'Checking for notices...');
-        $item = $this->_nextItem();
-        if ($item === false) {
+        $qi = Queue_item::top($this->getQueues());
+        if (empty($qi)) {
             $this->_log(LOG_DEBUG, 'No notices waiting; idling.');
             return false;
         }
-        if ($item === true) {
-            // We dequeued an entry for a deleted or invalid notice.
-            // Consider it a hit for poll rate purposes.
-            return true;
-        }
 
-        list($queue, $notice) = $item;
-        $this->_log(LOG_INFO, 'Got notice '. $notice->id . ' for transport ' . $queue);
-
-        // Yay! Got one!
-        $handler = $this->getHandler($queue);
-        if ($handler) {
-            if ($handler->handle_notice($notice)) {
-                $this->_log(LOG_INFO, "[$queue:notice $notice->id] Successfully handled notice");
-                $this->_done($notice, $queue);
+        $queue = $qi->transport;
+        $item = $this->decode($qi->frame);
+
+        if ($item) {
+            $rep = $this->logrep($item);
+            $this->_log(LOG_INFO, "Got $rep for transport $queue");
+            
+            $handler = $this->getHandler($queue);
+            if ($handler) {
+                if ($handler->handle($item)) {
+                    $this->_log(LOG_INFO, "[$queue:$rep] Successfully handled item");
+                    $this->_done($qi);
+                } else {
+                    $this->_log(LOG_INFO, "[$queue:$rep] Failed to handle item");
+                    $this->_fail($qi);
+                }
             } else {
-                $this->_log(LOG_INFO, "[$queue:notice $notice->id] Failed to handle notice");
-                $this->_fail($notice, $queue);
+                $this->_log(LOG_INFO, "[$queue:$rep] No handler for queue $queue; discarding.");
+                $this->_done($qi);
             }
         } else {
-            $this->_log(LOG_INFO, "[$queue:notice $notice->id] No handler for queue $queue; discarding.");
-            $this->_done($notice, $queue);
+            $this->_log(LOG_INFO, "[$queue] Got empty/deleted item, discarding");
+            $this->_fail($qi);
         }
         return true;
     }
 
-    /**
-     * Pop the oldest unclaimed item off the queue set and claim it.
-     *
-     * @return mixed false if no items; true if bogus hit; otherwise array(string, Notice)
-     *               giving the queue transport name.
-     */
-    protected function _nextItem()
-    {
-        $start = time();
-        $result = null;
-
-        $qi = Queue_item::top();
-        if (empty($qi)) {
-            return false;
-        }
-
-        $queue = $qi->transport;
-        $notice = Notice::staticGet('id', $qi->notice_id);
-        if (empty($notice)) {
-            $this->_log(LOG_INFO, "[$queue:notice $notice->id] dequeued non-existent notice");
-            $qi->delete();
-            return true;
-        }
-
-        $result = $notice;
-        return array($queue, $notice);
-    }
-
     /**
      * Delete our claimed item from the queue after successful processing.
      *
-     * @param Notice $object
-     * @param string $queue
+     * @param QueueItem $qi
      */
-    protected function _done($object, $queue)
+    protected function _done($qi)
     {
-        // XXX: right now, we only handle notices
-
-        $notice = $object;
-
-        $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
-                                        'transport' => $queue));
+        $queue = $qi->transport;
 
-        if (empty($qi)) {
-            $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
-        } else {
-            if (empty($qi->claimed)) {
-                $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Reluctantly releasing unclaimed queue item");
-            }
-            $qi->delete();
-            $qi->free();
+        if (empty($qi->claimed)) {
+            $this->_log(LOG_WARNING, "Reluctantly releasing unclaimed queue item $qi->id from $qi->queue");
         }
+        $qi->delete();
 
-        $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with item");
         $this->stats('handled', $queue);
-
-        $notice->free();
     }
 
     /**
      * Free our claimed queue item for later reprocessing in case of
      * temporary failure.
      *
-     * @param Notice $object
-     * @param string $queue
+     * @param QueueItem $qi
      */
-    protected function _fail($object, $queue)
+    protected function _fail($qi)
     {
-        // XXX: right now, we only handle notices
-
-        $notice = $object;
-
-        $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
-                                        'transport' => $queue));
+        $queue = $qi->transport;
 
-        if (empty($qi)) {
-            $this->_log(LOG_INFO, "[$queue:notice $notice->id] Cannot find queue item");
+        if (empty($qi->claimed)) {
+            $this->_log(LOG_WARNING, "[$queue:item $qi->id] Ignoring failure for unclaimed queue item");
         } else {
-            if (empty($qi->claimed)) {
-                $this->_log(LOG_WARNING, "[$queue:notice $notice->id] Ignoring failure for unclaimed queue item");
-            } else {
-                $orig = clone($qi);
-                $qi->claimed = null;
-                $qi->update($orig);
-                $qi = null;
-            }
+            $orig = clone($qi);
+            $qi->claimed = null;
+            $qi->update($orig);
         }
 
-        $this->_log(LOG_INFO, "[$queue:notice $notice->id] done with queue item");
         $this->stats('error', $queue);
-
-        $notice->free();
     }
 
     protected function _log($level, $msg)
index 764d309dfd5d02a71d787a68b27b780d1f603497..d258bbaf48ab402306f74ca9d49081b8491c1f7e 100644 (file)
@@ -83,6 +83,7 @@ $default =
               'stomp_password' => null,
               'monitor' => null, // URL to monitor ping endpoint (work in progress)
               'softlimit' => '90%', // total size or % of memory_limit at which to restart queue threads gracefully
+              'debug_memory' => false, // true to spit memory usage to log
               ),
         'license' =>
         array('type' => 'cc', # can be 'cc', 'allrightsreserved', 'private'
index ce77b53b2e6c3f35059bf62ead9ebb007581d099..004e92b3eea067f8819887a94980d0ad4eff9879 100644 (file)
@@ -27,7 +27,7 @@
  * @link      http://status.net/
  */
 
-class IoMaster
+abstract class IoMaster
 {
     public $id;
 
@@ -66,23 +66,18 @@ class IoMaster
             if ($site != common_config('site', 'server')) {
                 StatusNet::init($site);
             }
-
-            $classes = array();
-            if (Event::handle('StartIoManagerClasses', array(&$classes))) {
-                $classes[] = 'QueueManager';
-                if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
-                    $classes[] = 'XmppManager'; // handles pings/reconnects
-                    $classes[] = 'XmppConfirmManager'; // polls for outgoing confirmations
-                }
-            }
-            Event::handle('EndIoManagerClasses', array(&$classes));
-
-            foreach ($classes as $class) {
-                $this->instantiate($class);
-            }
+            $this->initManagers();
         }
     }
 
+    /**
+     * Initialize IoManagers for the currently configured site
+     * which are appropriate to this instance.
+     *
+     * Pass class names into $this->instantiate()
+     */
+    abstract function initManagers();
+
     /**
      * Pull all local sites from status_network table.
      * @return array of hostnames
@@ -170,7 +165,7 @@ class IoMaster
                 $write = array();
                 $except = array();
                 $this->logState('listening');
-                common_log(LOG_INFO, "Waiting up to $timeout seconds for socket data...");
+                common_log(LOG_DEBUG, "Waiting up to $timeout seconds for socket data...");
                 $ready = stream_select($read, $write, $except, $timeout, 0);
 
                 if ($ready === false) {
@@ -190,7 +185,7 @@ class IoMaster
 
             if ($timeout > 0 && empty($sockets)) {
                 // If we had no listeners, sleep until the pollers' next requested wakeup.
-                common_log(LOG_INFO, "Sleeping $timeout seconds until next poll cycle...");
+                common_log(LOG_DEBUG, "Sleeping $timeout seconds until next poll cycle...");
                 $this->logState('sleep');
                 sleep($timeout);
             }
@@ -207,6 +202,8 @@ class IoMaster
                 if ($usage > $memoryLimit) {
                     common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
                     break;
+                } else if (common_config('queue', 'debug_memory')) {
+                    common_log(LOG_DEBUG, "Memory usage $usage");
                 }
             }
         }
@@ -223,8 +220,7 @@ class IoMaster
     {
         $softLimit = trim(common_config('queue', 'softlimit'));
         if (substr($softLimit, -1) == '%') {
-            $limit = trim(ini_get('memory_limit'));
-            $limit = $this->parseMemoryLimit($limit);
+            $limit = $this->parseMemoryLimit(ini_get('memory_limit'));
             if ($limit > 0) {
                 return intval(substr($softLimit, 0, -1) * $limit / 100);
             } else {
@@ -242,9 +238,10 @@ class IoMaster
      * @param string $mem
      * @return int
      */
-    protected function parseMemoryLimit($mem)
+    public function parseMemoryLimit($mem)
     {
         // http://www.php.net/manual/en/faq.using.php#faq.using.shorthandbytes
+        $mem = strtolower(trim($mem));
         $size = array('k' => 1024,
                       'm' => 1024*1024,
                       'g' => 1024*1024*1024);
@@ -253,7 +250,7 @@ class IoMaster
         } else if (is_numeric($mem)) {
             return intval($mem);
         } else {
-            $mult = strtolower(substr($mem, -1));
+            $mult = substr($mem, -1);
             if (isset($size[$mult])) {
                 return substr($mem, 0, -1) * $size[$mult];
             } else {
index 4cdfa674655e55db28d4e08b741d641685b45216..b6b23521bdf4e8a11088c94d4dfdc983e2849872 100644 (file)
@@ -85,6 +85,27 @@ class Sharing_XMPP extends XMPPHP_XMPP
     }
 }
 
+/**
+ * Build an XMPP proxy connection that'll save outgoing messages
+ * to the 'xmppout' queue to be picked up by xmppdaemon later.
+ */
+function jabber_proxy()
+{
+       $proxy = new Queued_XMPP(common_config('xmpp', 'host') ?
+                             common_config('xmpp', 'host') :
+                             common_config('xmpp', 'server'),
+                             common_config('xmpp', 'port'),
+                             common_config('xmpp', 'user'),
+                             common_config('xmpp', 'password'),
+                             common_config('xmpp', 'resource') . 'daemon',
+                             common_config('xmpp', 'server'),
+                             common_config('xmpp', 'debug') ?
+                             true : false,
+                             common_config('xmpp', 'debug') ?
+                             XMPPHP_Log::LEVEL_VERBOSE :  null);
+    return $proxy;
+}
+
 /**
  * Lazy-connect the configured Jabber account to the configured server;
  * if already opened, the same connection will be returned.
@@ -143,7 +164,7 @@ function jabber_connect($resource=null)
 }
 
 /**
- * send a single notice to a given Jabber address
+ * Queue send for a single notice to a given Jabber address
  *
  * @param string $to     JID to send the notice to
  * @param Notice $notice notice to send
@@ -153,10 +174,7 @@ function jabber_connect($resource=null)
 
 function jabber_send_notice($to, $notice)
 {
-    $conn = jabber_connect();
-    if (!$conn) {
-        return false;
-    }
+    $conn = jabber_proxy();
     $profile = Profile::staticGet($notice->profile_id);
     if (!$profile) {
         common_log(LOG_WARNING, 'Refusing to send notice with ' .
@@ -221,10 +239,7 @@ function jabber_format_entry($profile, $notice)
 
 function jabber_send_message($to, $body, $type='chat', $subject=null)
 {
-    $conn = jabber_connect();
-    if (!$conn) {
-        return false;
-    }
+    $conn = jabber_proxy();
     $conn->message($to, $body, $type, $subject);
     return true;
 }
@@ -319,7 +334,7 @@ function jabber_special_presence($type, $to=null, $show=null, $status=null)
 }
 
 /**
- * broadcast a notice to all subscribers and reply recipients
+ * Queue broadcast of a notice to all subscribers and reply recipients
  *
  * This function will send a notice to all subscribers on the local server
  * who have Jabber addresses, and have Jabber notification enabled, and
@@ -354,7 +369,7 @@ function jabber_broadcast_notice($notice)
 
     $sent_to = array();
 
-    $conn = jabber_connect();
+    $conn = jabber_proxy();
 
     $ni = $notice->whoGets();
 
@@ -389,14 +404,13 @@ function jabber_broadcast_notice($notice)
                    'Sending notice ' . $notice->id . ' to ' . $user->jabber,
                    __FILE__);
         $conn->message($user->jabber, $msg, 'chat', null, $entry);
-        $conn->processTime(0);
     }
 
     return true;
 }
 
 /**
- * send a notice to all public listeners
+ * Queue send of a notice to all public listeners
  *
  * For notices that are generated on the local system (by users), we can optionally
  * forward them to remote listeners by XMPP.
@@ -429,7 +443,7 @@ function jabber_public_notice($notice)
         $msg   = jabber_format_notice($profile, $notice);
         $entry = jabber_format_entry($profile, $notice);
 
-        $conn = jabber_connect();
+        $conn = jabber_proxy();
 
         foreach ($public as $address) {
             common_log(LOG_INFO,
@@ -437,7 +451,6 @@ function jabber_public_notice($notice)
                        ' to public listener ' . $address,
                        __FILE__);
             $conn->message($address, $msg, 'chat', null, $entry);
-            $conn->processTime(0);
         }
         $profile->free();
     }
index b1518866d7549416fcdc04c77c3bdf7512a83086..83471f2df7d1789a9fc5c766e73e6cf092b730f9 100644 (file)
@@ -34,14 +34,14 @@ class JabberQueueHandler extends QueueHandler
         return 'jabber';
     }
 
-    function handle_notice($notice)
+    function handle($notice)
     {
         require_once(INSTALLDIR.'/lib/jabber.php');
         try {
             return jabber_broadcast_notice($notice);
         } catch (XMPPHP_Exception $e) {
             $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
-            exit(1);
+            return false;
         }
     }
 }
index 3ffc1313bcaa1ea6377c77cccb9d9a71b8f54728..24896c784c78ed1105bd006afd418fd9e9e99718 100644 (file)
@@ -36,7 +36,7 @@ class OmbQueueHandler extends QueueHandler
      * @fixme doesn't currently report failure back to the queue manager
      * because omb_broadcast_notice() doesn't report it to us
      */
-    function handle_notice($notice)
+    function handle($notice)
     {
         if ($this->is_remote($notice)) {
             $this->log(LOG_DEBUG, 'Ignoring remote notice ' . $notice->id);
index 8bb2180786e81cfecf185c783d03ba4fbeab4971..4e4d74cb1a6ef35f1910905aee18a287cabb79b4 100644 (file)
@@ -30,7 +30,7 @@ class PingQueueHandler extends QueueHandler {
         return 'ping';
     }
 
-    function handle_notice($notice) {
+    function handle($notice) {
         require_once INSTALLDIR . '/lib/ping.php';
         return ping_broadcast_notice($notice);
     }
index 24d504699706c7fe310f341ff796bc4f44c52f28..9653ccad42454f3896a34c26e5d49ae53e3adda9 100644 (file)
@@ -42,7 +42,7 @@ class PluginQueueHandler extends QueueHandler
         return 'plugin';
     }
 
-    function handle_notice($notice)
+    function handle($notice)
     {
         Event::handle('HandleQueuedNotice', array(&$notice));
         return true;
index 9ea9ee73a3290af300640b7a17a6547ecc2c7a44..c9edb8d5d79b184bbcf1d2fa045f8c82219e4f5c 100644 (file)
@@ -23,7 +23,6 @@ if (!defined('STATUSNET') && !defined('LACONICA')) {
 
 /**
  * Queue handler for pushing new notices to public XMPP subscribers.
- * @fixme correct this exception handling
  */
 class PublicQueueHandler extends QueueHandler
 {
@@ -33,15 +32,14 @@ class PublicQueueHandler extends QueueHandler
         return 'public';
     }
 
-    function handle_notice($notice)
+    function handle($notice)
     {
         require_once(INSTALLDIR.'/lib/jabber.php');
         try {
             return jabber_public_notice($notice);
         } catch (XMPPHP_Exception $e) {
             $this->log(LOG_ERR, "Got an XMPPHP_Exception: " . $e->getMessage());
-            die($e->getMessage());
+            return false;
         }
-        return true;
     }
 }
diff --git a/lib/queued_xmpp.php b/lib/queued_xmpp.php
new file mode 100644 (file)
index 0000000..4b890c4
--- /dev/null
@@ -0,0 +1,117 @@
+<?php
+/**
+ * StatusNet, the distributed open-source microblogging tool
+ *
+ * Queue-mediated proxy class for outgoing XMPP messages.
+ *
+ * PHP version 5
+ *
+ * LICENCE: This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ *
+ * @category  Network
+ * @package   StatusNet
+ * @author    Brion Vibber <brion@status.net>
+ * @copyright 2010 StatusNet, Inc.
+ * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
+ * @link      http://status.net/
+ */
+
+if (!defined('STATUSNET') && !defined('LACONICA')) {
+    exit(1);
+}
+
+require_once INSTALLDIR . '/lib/jabber.php';
+
+class Queued_XMPP extends XMPPHP_XMPP
+{
+       /**
+        * Constructor
+        *
+        * @param string  $host
+        * @param integer $port
+        * @param string  $user
+        * @param string  $password
+        * @param string  $resource
+        * @param string  $server
+        * @param boolean $printlog
+        * @param string  $loglevel
+        */
+       public function __construct($host, $port, $user, $password, $resource, $server = null, $printlog = false, $loglevel = null)
+       {
+               parent::__construct($host, $port, $user, $password, $resource, $server, $printlog, $loglevel);
+               // Normally the fulljid isn't filled out until resource binding time;
+               // we need to save it here since we're not talking to a real server.
+               $this->fulljid = "{$this->basejid}/{$this->resource}";
+    }
+
+    /**
+     * Send a formatted message to the outgoing queue for later forwarding
+     * to a real XMPP connection.
+     *
+     * @param string $msg
+     */
+    public function send($msg, $timeout=NULL)
+    {
+        $qm = QueueManager::get();
+        $qm->enqueue(strval($msg), 'xmppout');
+    }
+
+    /**
+     * Since we'll be getting input through a queue system's run loop,
+     * we'll process one standalone message at a time rather than our
+     * own XMPP message pump.
+     *
+     * @param string $message
+     */
+    public function processMessage($message) {
+       $frame = array_shift($this->frames);
+       xml_parse($this->parser, $frame->body, false);
+    }
+
+    //@{
+    /**
+     * Stream i/o functions disabled; push input through processMessage()
+     */
+    public function connect($timeout = 30, $persistent = false, $sendinit = true)
+    {
+        throw new Exception("Can't connect to server from XMPP queue proxy.");
+    }
+
+    public function disconnect()
+    {
+        throw new Exception("Can't connect to server from XMPP queue proxy.");
+    }
+
+    public function process()
+    {
+        throw new Exception("Can't read stream from XMPP queue proxy.");
+    }
+
+    public function processUntil($event, $timeout=-1)
+    {
+        throw new Exception("Can't read stream from XMPP queue proxy.");
+    }
+
+    public function read()
+    {
+        throw new Exception("Can't read stream from XMPP queue proxy.");
+    }
+
+    public function readyToProcess()
+    {
+        throw new Exception("Can't read stream from XMPP queue proxy.");
+    }
+    //@}
+}
+
index 613be6e33085ae0b01dda9f95626ed849bb3c7af..2909cd83b100656efd354063323610590191d152 100644 (file)
@@ -22,51 +22,20 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
 /**
  * Base class for queue handlers.
  *
- * As extensions of the Daemon class, each queue handler has the ability
- * to launch itself in the background, at which point it'll pass control
- * to the configured QueueManager class to poll for updates.
+ * As of 0.9, queue handlers are short-lived for items as they are
+ * dequeued by a QueueManager running in an IoMaster in a daemon
+ * such as queuedaemon.php.
+ *
+ * Extensions requiring long-running maintenance or polling should
+ * register an IoManager.
  *
  * Subclasses must override at least the following methods:
  * - transport
- * - handle_notice
+ * - handle
  */
-#class QueueHandler extends Daemon
 class QueueHandler
 {
 
-#    function __construct($id=null, $daemonize=true)
-#    {
-#        parent::__construct($daemonize);
-#
-#        if ($id) {
-#            $this->set_id($id);
-#        }
-#    }
-
-    /**
-     * How many seconds a polling-based queue manager should wait between
-     * checks for new items to handle.
-     *
-     * Defaults to 60 seconds; override to speed up or slow down.
-     *
-     * @fixme not really compatible with global queue manager
-     * @return int timeout in seconds
-     */
-#    function timeout()
-#    {
-#        return 60;
-#    }
-
-#    function class_name()
-#    {
-#        return ucfirst($this->transport()) . 'Handler';
-#    }
-
-#    function name()
-#    {
-#        return strtolower($this->class_name().'.'.$this->get_id());
-#    }
-
     /**
      * Return transport keyword which identifies items this queue handler
      * services; must be defined for all subclasses.
@@ -83,61 +52,17 @@ class QueueHandler
 
     /**
      * Here's the meat of your queue handler -- you're handed a Notice
-     * object, which you may do as you will with.
+     * or other object, which you may do as you will with.
      *
      * If this function indicates failure, a warning will be logged
      * and the item is placed back in the queue to be re-run.
      *
-     * @param Notice $notice
-     * @return boolean true on success, false on failure
-     */
-    function handle_notice($notice)
-    {
-        return true;
-    }
-
-    /**
-     * Setup and start of run loop for this queue handler as a daemon.
-     * Most of the heavy lifting is passed on to the QueueManager's service()
-     * method, which passes control back to our handle_notice() method for
-     * each notice that comes in on the queue.
-     *
-     * Most of the time this won't need to be overridden in a subclass.
-     *
+     * @param mixed $object
      * @return boolean true on success, false on failure
      */
-    function run()
+    function handle($object)
     {
-        if (!$this->start()) {
-            $this->log(LOG_WARNING, 'failed to start');
-            return false;
-        }
-
-        $this->log(LOG_INFO, 'checking for queued notices');
-
-        $queue   = $this->transport();
-        $timeout = $this->timeout();
-
-        $qm = QueueManager::get();
-
-        $qm->service($queue, $this);
-
-        $this->log(LOG_INFO, 'finished servicing the queue');
-
-        if (!$this->finish()) {
-            $this->log(LOG_WARNING, 'failed to clean up');
-            return false;
-        }
-
-        $this->log(LOG_INFO, 'terminating normally');
-
         return true;
     }
-
-
-    function log($level, $msg)
-    {
-        common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
-    }
 }
 
index 291174d3c4729a7ac11e5f561fc497ff0ce3b17a..4eb39bfa8c72ecf75ef76630244b3a513fe42a97 100644 (file)
@@ -39,6 +39,10 @@ abstract class QueueManager extends IoManager
 {
     static $qm = null;
 
+    public $master = null;
+    public $handlers = array();
+    public $groups = array();
+
     /**
      * Factory function to pull the appropriate QueueManager object
      * for this site's configuration. It can then be used to queue
@@ -109,6 +113,64 @@ abstract class QueueManager extends IoManager
      */
     abstract function enqueue($object, $queue);
 
+    /**
+     * Build a representation for an object for logging
+     * @param mixed
+     * @return string
+     */
+    function logrep($object) {
+        if (is_object($object)) {
+            $class = get_class($object);
+            if (isset($object->id)) {
+                return "$class $object->id";
+            }
+            return $class;
+        }
+        if (is_string($object)) {
+            $len = strlen($object);
+            $fragment = mb_substr($object, 0, 32);
+            if (mb_strlen($object) > 32) {
+                $fragment .= '...';
+            }
+            return "string '$fragment' ($len bytes)";
+        }
+        return strval($object);
+    }
+
+    /**
+     * Encode an object for queued storage.
+     * Next gen may use serialization.
+     *
+     * @param mixed $object
+     * @return string
+     */
+    protected function encode($object)
+    {
+        if ($object instanceof Notice) {
+            return $object->id;
+        } else if (is_string($object)) {
+            return $object;
+        } else {
+            throw new ServerException("Can't queue this type", 500);
+        }
+    }
+
+    /**
+     * Decode an object from queued storage.
+     * Accepts back-compat notice reference entries and strings for now.
+     *
+     * @param string
+     * @return mixed
+     */
+    protected function decode($frame)
+    {
+        if (is_numeric($frame)) {
+            return Notice::staticGet(intval($frame));
+        } else {
+            return $frame;
+        }
+    }
+
     /**
      * Instantiate the appropriate QueueHandler class for the given queue.
      *
@@ -131,13 +193,15 @@ abstract class QueueManager extends IoManager
     }
 
     /**
-     * Get a list of all registered queue transport names.
+     * Get a list of registered queue transport names to be used
+     * for this daemon.
      *
      * @return array of strings
      */
     function getQueues()
     {
-        return array_keys($this->handlers);
+        $group = $this->activeGroup();
+        return array_keys($this->groups[$group]);
     }
 
     /**
@@ -148,33 +212,29 @@ abstract class QueueManager extends IoManager
      */
     function initialize()
     {
+        // @fixme we'll want to be able to listen to particular queues...
         if (Event::handle('StartInitializeQueueManager', array($this))) {
-            if (!defined('XMPP_ONLY_FLAG')) { // hack!
-                $this->connect('plugin', 'PluginQueueHandler');
-                $this->connect('omb', 'OmbQueueHandler');
-                $this->connect('ping', 'PingQueueHandler');
-                if (common_config('sms', 'enabled')) {
-                    $this->connect('sms', 'SmsQueueHandler');
-                }
+            $this->connect('plugin', 'PluginQueueHandler');
+            $this->connect('omb', 'OmbQueueHandler');
+            $this->connect('ping', 'PingQueueHandler');
+            if (common_config('sms', 'enabled')) {
+                $this->connect('sms', 'SmsQueueHandler');
             }
 
             // XMPP output handlers...
-            if (common_config('xmpp', 'enabled') && !defined('XMPP_EMERGENCY_FLAG')) {
-                $this->connect('jabber', 'JabberQueueHandler');
-                $this->connect('public', 'PublicQueueHandler');
-                
-                // @fixme this should move up a level or should get an actual queue
-                $this->connect('confirm', 'XmppConfirmHandler');
-            }
+            $this->connect('jabber', 'JabberQueueHandler');
+            $this->connect('public', 'PublicQueueHandler');
+            
+            // @fixme this should get an actual queue
+            //$this->connect('confirm', 'XmppConfirmHandler');
+
+            // For compat with old plugins not registering their own handlers.
+            $this->connect('plugin', 'PluginQueueHandler');
+
+            $this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon');
 
-            if (!defined('XMPP_ONLY_FLAG')) { // hack!
-                // For compat with old plugins not registering their own handlers.
-                $this->connect('plugin', 'PluginQueueHandler');
-            }
-        }
-        if (!defined('XMPP_ONLY_FLAG')) { // hack!
-            Event::handle('EndInitializeQueueManager', array($this));
         }
+        Event::handle('EndInitializeQueueManager', array($this));
     }
 
     /**
@@ -183,10 +243,27 @@ abstract class QueueManager extends IoManager
      *
      * @param string $transport
      * @param string $class
+     * @param string $group
      */
-    public function connect($transport, $class)
+    public function connect($transport, $class, $group='queuedaemon')
     {
         $this->handlers[$transport] = $class;
+        $this->groups[$group][$transport] = $class;
+    }
+
+    /**
+     * @return string queue group to use for this request
+     */
+    function activeGroup()
+    {
+        $group = 'queuedaemon';
+        if ($this->master) {
+            // hack hack
+            if ($this->master instanceof XmppMaster) {
+                return 'xmppdaemon';
+            }
+        }
+        return $group;
     }
 
     /**
index 48a96409d0d926e4d0b4b3a3fe04fa53d97e4dd1..6085d2b4ac545b63643979c043a23fe107eba774 100644 (file)
@@ -31,7 +31,7 @@ class SmsQueueHandler extends QueueHandler
         return 'sms';
     }
 
-    function handle_notice($notice)
+    function handle($notice)
     {
        require_once(INSTALLDIR.'/lib/mail.php');
         return mail_broadcast_notice_sms($notice);
diff --git a/lib/spawningdaemon.php b/lib/spawningdaemon.php
new file mode 100644 (file)
index 0000000..8baefe8
--- /dev/null
@@ -0,0 +1,159 @@
+<?php
+/*
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2010, StatusNet, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * Base class for daemon that can launch one or more processing threads,
+ * respawning them if they exit.
+ *
+ * This is mainly intended for indefinite workloads such as monitoring
+ * a queue or maintaining an IM channel.
+ *
+ * Child classes should implement the 
+ *
+ * We can then pass individual items through the QueueHandler subclasses
+ * they belong to. We additionally can handle queues for multiple sites.
+ *
+ * @package QueueHandler
+ * @author Brion Vibber <brion@status.net>
+ */
+abstract class SpawningDaemon extends Daemon
+{
+    protected $threads=1;
+
+    function __construct($id=null, $daemonize=true, $threads=1)
+    {
+        parent::__construct($daemonize);
+
+        if ($id) {
+            $this->set_id($id);
+        }
+        $this->threads = $threads;
+    }
+
+    /**
+     * Perform some actual work!
+     *
+     * @return boolean true on success, false on failure
+     */
+    public abstract function runThread();
+
+    /**
+     * Spawn one or more background processes and let them start running.
+     * Each individual process will execute whatever's in the runThread()
+     * method, which should be overridden.
+     *
+     * Child processes will be automatically respawned when they exit.
+     *
+     * @todo possibly allow for not respawning on "normal" exits...
+     *       though ParallelizingDaemon is probably better for workloads
+     *       that have forseeable endpoints.
+     */
+    function run()
+    {
+        $children = array();
+        for ($i = 1; $i <= $this->threads; $i++) {
+            $pid = pcntl_fork();
+            if ($pid < 0) {
+                $this->log(LOG_ERROR, "Couldn't fork for thread $i; aborting\n");
+                exit(1);
+            } else if ($pid == 0) {
+                $this->initAndRunChild($i);
+            } else {
+                $this->log(LOG_INFO, "Spawned thread $i as pid $pid");
+                $children[$i] = $pid;
+            }
+        }
+        
+        $this->log(LOG_INFO, "Waiting for children to complete.");
+        while (count($children) > 0) {
+            $status = null;
+            $pid = pcntl_wait($status);
+            if ($pid > 0) {
+                $i = array_search($pid, $children);
+                if ($i === false) {
+                    $this->log(LOG_ERR, "Unrecognized child pid $pid exited!");
+                    continue;
+                }
+                unset($children[$i]);
+                $this->log(LOG_INFO, "Thread $i pid $pid exited.");
+                
+                $pid = pcntl_fork();
+                if ($pid < 0) {
+                    $this->log(LOG_ERROR, "Couldn't fork to respawn thread $i; aborting thread.\n");
+                } else if ($pid == 0) {
+                    $this->initAndRunChild($i);
+                } else {
+                    $this->log(LOG_INFO, "Respawned thread $i as pid $pid");
+                    $children[$i] = $pid;
+                }
+            }
+        }
+        $this->log(LOG_INFO, "All child processes complete.");
+        return true;
+    }
+
+    /**
+     * Initialize things for a fresh thread, call runThread(), and
+     * exit at completion with appropriate return value.
+     */
+    protected function initAndRunChild($thread)
+    {
+        $this->set_id($this->get_id() . "." . $thread);
+        $this->resetDb();
+        $ok = $this->runThread();
+        exit($ok ? 0 : 1);
+    }
+
+    /**
+     * Reconnect to the database for each child process,
+     * or they'll get very confused trying to use the
+     * same socket.
+     */
+    protected function resetDb()
+    {
+        // @fixme do we need to explicitly open the db too
+        // or is this implied?
+        global $_DB_DATAOBJECT;
+        unset($_DB_DATAOBJECT['CONNECTIONS']);
+
+        // Reconnect main memcached, or threads will stomp on
+        // each other and corrupt their requests.
+        $cache = common_memcache();
+        if ($cache) {
+            $cache->reconnect();
+        }
+
+        // Also reconnect memcached for status_network table.
+        if (!empty(Status_network::$cache)) {
+            Status_network::$cache->close();
+            Status_network::$cache = null;
+        }
+    }
+
+    function log($level, $msg)
+    {
+        common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg);
+    }
+
+    function name()
+    {
+        return strtolower(get_class($this).'.'.$this->get_id());
+    }
+}
+
index 00590fdb69f847bfb515767dee653c53425b996b..f057bd9e41718bda776bbf9ba48caf1eac4f2af7 100644 (file)
@@ -39,7 +39,6 @@ class StompQueueManager extends QueueManager
     var $base = null;
     var $con = null;
     
-    protected $master = null;
     protected $sites = array();
 
     function __construct()
@@ -104,11 +103,12 @@ class StompQueueManager extends QueueManager
      */
     function getQueues()
     {
+        $group = $this->activeGroup();
         $site = common_config('site', 'server');
-        if (empty($this->handlers[$site])) {
+        if (empty($this->groups[$site][$group])) {
             return array();
         } else {
-            return array_keys($this->handlers[$site]);
+            return array_keys($this->groups[$site][$group]);
         }
     }
 
@@ -118,10 +118,12 @@ class StompQueueManager extends QueueManager
      *
      * @param string $transport
      * @param string $class
+     * @param string $group
      */
-    public function connect($transport, $class)
+    public function connect($transport, $class, $group='queuedaemon')
     {
         $this->handlers[common_config('site', 'server')][$transport] = $class;
+        $this->groups[common_config('site', 'server')][$group][$transport] = $class;
     }
 
     /**
@@ -130,23 +132,23 @@ class StompQueueManager extends QueueManager
      */
     public function enqueue($object, $queue)
     {
-        $notice = $object;
+        $msg = $this->encode($object);
+        $rep = $this->logrep($object);
 
         $this->_connect();
 
         // XXX: serialize and send entire notice
 
         $result = $this->con->send($this->queueName($queue),
-                                   $notice->id,                // BODY of the message
-                                   array ('created' => $notice->created));
+                                   $msg,               // BODY of the message
+                                   array ('created' => common_sql_now()));
 
         if (!$result) {
-            common_log(LOG_ERR, 'Error sending to '.$queue.' queue');
+            common_log(LOG_ERR, "Error sending $rep to $queue queue");
             return false;
         }
 
-        common_log(LOG_DEBUG, 'complete remote queueing notice ID = '
-                   . $notice->id . ' for ' . $queue);
+        common_log(LOG_DEBUG, "complete remote queueing $rep for $queue");
         $this->stats('enqueued', $queue);
     }
 
@@ -174,7 +176,7 @@ class StompQueueManager extends QueueManager
         $ok = true;
         $frames = $this->con->readFrames();
         foreach ($frames as $frame) {
-            $ok = $ok && $this->_handleNotice($frame);
+            $ok = $ok && $this->_handleItem($frame);
         }
         return $ok;
     }
@@ -265,7 +267,7 @@ class StompQueueManager extends QueueManager
     }
 
     /**
-     * Handle and acknowledge a notice event that's come in through a queue.
+     * Handle and acknowledge an event that's come in through a queue.
      *
      * If the queue handler reports failure, the message is requeued for later.
      * Missing notices or handler classes will drop the message.
@@ -276,7 +278,7 @@ class StompQueueManager extends QueueManager
      * @param StompFrame $frame
      * @return bool
      */
-    protected function _handleNotice($frame)
+    protected function _handleItem($frame)
     {
         list($site, $queue) = $this->parseDestination($frame->headers['destination']);
         if ($site != common_config('site', 'server')) {
@@ -284,15 +286,23 @@ class StompQueueManager extends QueueManager
             StatusNet::init($site);
         }
 
-        $id = intval($frame->body);
-        $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
+        if (is_numeric($frame->body)) {
+            $id = intval($frame->body);
+            $info = "notice $id posted at {$frame->headers['created']} in queue $queue";
 
-        $notice = Notice::staticGet('id', $id);
-        if (empty($notice)) {
-            $this->_log(LOG_WARNING, "Skipping missing $info");
-            $this->con->ack($frame);
-            $this->stats('badnotice', $queue);
-            return false;
+            $notice = Notice::staticGet('id', $id);
+            if (empty($notice)) {
+                $this->_log(LOG_WARNING, "Skipping missing $info");
+                $this->con->ack($frame);
+                $this->stats('badnotice', $queue);
+                return false;
+            }
+
+            $item = $notice;
+        } else {
+            // @fixme should we serialize, or json, or what here?
+            $info = "string posted at {$frame->headers['created']} in queue $queue";
+            $item = $frame->body;
         }
 
         $handler = $this->getHandler($queue);
@@ -303,7 +313,7 @@ class StompQueueManager extends QueueManager
             return false;
         }
 
-        $ok = $handler->handle_notice($notice);
+        $ok = $handler->handle($item);
 
         if (!$ok) {
             $this->_log(LOG_WARNING, "Failed handling $info");
@@ -311,7 +321,7 @@ class StompQueueManager extends QueueManager
             // this kind of queue management ourselves;
             // if we don't ack, it should resend...
             $this->con->ack($frame);
-            $this->enqueue($notice, $queue);
+            $this->enqueue($item, $queue);
             $this->stats('requeued', $queue);
             return false;
         }
index ef8a5d1f02fdfbd50d14906b99cfce4ff9ed23ce..fb3b8be8765b3ab7c5a79924170b11ebe049d636 100644 (file)
@@ -1130,7 +1130,8 @@ function common_request_id()
     $pid = getmypid();
     $server = common_config('site', 'server');
     if (php_sapi_name() == 'cli') {
-        return "$server:$pid";
+        $script = basename($_SERVER['PHP_SELF']);
+        return "$server:$script:$pid";
     } else {
         static $req_id = null;
         if (!isset($req_id)) {
diff --git a/lib/xmppconfirmmanager.php b/lib/xmppconfirmmanager.php
deleted file mode 100644 (file)
index ee4e294..0000000
+++ /dev/null
@@ -1,168 +0,0 @@
-<?php
-/*
- * StatusNet - the distributed open-source microblogging tool
- * Copyright (C) 2008-2010 StatusNet, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License as published by
- * the Free Software Foundation, either version 3 of the License, or
- * (at your option) any later version.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- * GNU Affero General Public License for more details.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program.  If not, see <http://www.gnu.org/licenses/>.
- */
-
-if (!defined('STATUSNET') && !defined('LACONICA')) {
-    exit(1);
-}
-
-/**
- * Event handler for pushing new confirmations to Jabber users.
- * @fixme recommend redoing this on a queue-trigger model
- * @fixme expiration of old items got dropped in the past, put it back?
- */
-class XmppConfirmManager extends IoManager
-{
-
-    /**
-     * @return mixed XmppConfirmManager, or false if unneeded
-     */
-    public static function get()
-    {
-        if (common_config('xmpp', 'enabled')) {
-            $site = common_config('site', 'server');
-            return new XmppConfirmManager();
-        } else {
-            return false;
-        }
-    }
-
-    /**
-     * Tell the i/o master we need one instance for each supporting site
-     * being handled in this process.
-     */
-    public static function multiSite()
-    {
-        return IoManager::INSTANCE_PER_SITE;
-    }
-
-    function __construct()
-    {
-        $this->site = common_config('site', 'server');
-    }
-
-    /**
-     * 10 seconds? Really? That seems a bit frequent.
-     */
-    function pollInterval()
-    {
-        return 10;
-    }
-
-    /**
-     * Ping!
-     * @return boolean true if we found something
-     */
-    function poll()
-    {
-        $this->switchSite();
-        $confirm = $this->next_confirm();
-        if ($confirm) {
-            $this->handle_confirm($confirm);
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    protected function handle_confirm($confirm)
-    {
-        require_once INSTALLDIR . '/lib/jabber.php';
-
-        common_log(LOG_INFO, 'Sending confirmation for ' . $confirm->address);
-        $user = User::staticGet($confirm->user_id);
-        if (!$user) {
-            common_log(LOG_WARNING, 'Confirmation for unknown user ' . $confirm->user_id);
-            return;
-        }
-        $success = jabber_confirm_address($confirm->code,
-                                          $user->nickname,
-                                          $confirm->address);
-        if (!$success) {
-            common_log(LOG_ERR, 'Confirmation failed for ' . $confirm->address);
-            # Just let the claim age out; hopefully things work then
-            return;
-        } else {
-            common_log(LOG_INFO, 'Confirmation sent for ' . $confirm->address);
-            # Mark confirmation sent; need a dupe so we don't have the WHERE clause
-            $dupe = Confirm_address::staticGet('code', $confirm->code);
-            if (!$dupe) {
-                common_log(LOG_WARNING, 'Could not refetch confirm', __FILE__);
-                return;
-            }
-            $orig = clone($dupe);
-            $dupe->sent = $dupe->claimed;
-            $result = $dupe->update($orig);
-            if (!$result) {
-                common_log_db_error($dupe, 'UPDATE', __FILE__);
-                # Just let the claim age out; hopefully things work then
-                return;
-            }
-        }
-        return true;
-    }
-
-    protected function next_confirm()
-    {
-        $confirm = new Confirm_address();
-        $confirm->whereAdd('claimed IS null');
-        $confirm->whereAdd('sent IS null');
-        # XXX: eventually we could do other confirmations in the queue, too
-        $confirm->address_type = 'jabber';
-        $confirm->orderBy('modified DESC');
-        $confirm->limit(1);
-        if ($confirm->find(true)) {
-            common_log(LOG_INFO, 'Claiming confirmation for ' . $confirm->address);
-                # working around some weird DB_DataObject behaviour
-            $confirm->whereAdd(''); # clears where stuff
-            $original = clone($confirm);
-            $confirm->claimed = common_sql_now();
-            $result = $confirm->update($original);
-            if ($result) {
-                common_log(LOG_INFO, 'Succeeded in claim! '. $result);
-                return $confirm;
-            } else {
-                common_log(LOG_INFO, 'Failed in claim!');
-                return false;
-            }
-        }
-        return null;
-    }
-
-    protected function clear_old_confirm_claims()
-    {
-        $confirm = new Confirm();
-        $confirm->claimed = null;
-        $confirm->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
-        $confirm->update(DB_DATAOBJECT_WHEREADD_ONLY);
-        $confirm->free();
-        unset($confirm);
-    }
-
-    /**
-     * Make sure we're on the right site configuration
-     */
-    protected function switchSite()
-    {
-        if ($this->site != common_config('site', 'server')) {
-            common_log(LOG_DEBUG, __METHOD__ . ": switching to site $this->site");
-            $this->stats('switch');
-            StatusNet::init($this->site);
-        }
-    }
-}
index dfff63a30c31aa88ea6a9df7b1759baa2d145ab3..299175dd7d79927f8819d24c9ca4e6ae9db6014c 100644 (file)
@@ -70,6 +70,7 @@ class XmppManager extends IoManager
     function __construct()
     {
         $this->site = common_config('site', 'server');
+        $this->resource = common_config('xmpp', 'resource') . 'daemon';
     }
 
     /**
@@ -86,15 +87,19 @@ class XmppManager extends IoManager
         # Low priority; we don't want to receive messages
 
         common_log(LOG_INFO, "INITIALIZE");
-        $this->conn = jabber_connect($this->resource());
+        $this->conn = jabber_connect($this->resource);
 
         if (empty($this->conn)) {
             common_log(LOG_ERR, "Couldn't connect to server.");
             return false;
         }
 
-        $this->conn->addEventHandler('message', 'forward_message', $this);
+        $this->log(LOG_DEBUG, "Initializing stanza handlers.");
+
+        $this->conn->addEventHandler('message', 'handle_message', $this);
+        $this->conn->addEventHandler('presence', 'handle_presence', $this);
         $this->conn->addEventHandler('reconnect', 'handle_reconnect', $this);
+
         $this->conn->setReconnectTimeout(600);
         jabber_send_presence("Send me a message to post a notice", 'available', null, 'available', -1);
 
@@ -175,12 +180,37 @@ class XmppManager extends IoManager
         }
     }
 
+    /**
+     * For queue handlers to pass us a message to push out,
+     * if we're active.
+     *
+     * @fixme should this be blocking etc?
+     *
+     * @param string $msg XML stanza to send
+     * @return boolean success
+     */
+    public function send($msg)
+    {
+        if ($this->conn && !$this->conn->isDisconnected()) {
+            $bytes = $this->conn->send($msg);
+            if ($bytes > 0) {
+                $this->conn->processTime(0);
+                return true;
+            } else {
+                return false;
+            }
+        } else {
+            // Can't send right now...
+            return false;
+        }
+    }
+
     /**
      * Send a keepalive ping to the XMPP server.
      */
     protected function sendPing()
     {
-        $jid = jabber_daemon_address().'/'.$this->resource();
+        $jid = jabber_daemon_address().'/'.$this->resource;
         $server = common_config('xmpp', 'server');
 
         if (!isset($this->pingid)) {
@@ -206,61 +236,239 @@ class XmppManager extends IoManager
         $this->conn->presence(null, 'available', null, 'available', -1);
     }
 
+
+    function get_user($from)
+    {
+        $user = User::staticGet('jabber', jabber_normalize_jid($from));
+        return $user;
+    }
+
     /**
-     * Callback for Jabber message event.
-     *
-     * This connection handles output; if we get a message straight to us,
-     * forward it on to our XmppDaemon listener for processing.
-     *
-     * @param $pl
+     * XMPP callback for handling message input...
+     * @param array $pl XMPP payload
      */
-    function forward_message(&$pl)
+    function handle_message(&$pl)
     {
+        $from = jabber_normalize_jid($pl['from']);
+
         if ($pl['type'] != 'chat') {
-            common_log(LOG_DEBUG, 'Ignoring message of type ' . $pl['type'] . ' from ' . $pl['from']);
+            $this->log(LOG_WARNING, "Ignoring message of type ".$pl['type']." from $from.");
             return;
         }
-        $listener = $this->listener();
-        if (strtolower($listener) == strtolower($pl['from'])) {
-            common_log(LOG_WARNING, 'Ignoring loop message.');
+
+        if (mb_strlen($pl['body']) == 0) {
+            $this->log(LOG_WARNING, "Ignoring message with empty body from $from.");
             return;
         }
-        common_log(LOG_INFO, 'Forwarding message from ' . $pl['from'] . ' to ' . $listener);
-        $this->conn->message($this->listener(), $pl['body'], 'chat', null, $this->ofrom($pl['from']));
+
+        // Forwarded from another daemon for us to handle; this shouldn't
+        // happen any more but we might get some legacy items.
+        if ($this->is_self($from)) {
+            $this->log(LOG_INFO, "Got forwarded notice from self ($from).");
+            $from = $this->get_ofrom($pl);
+            $this->log(LOG_INFO, "Originally sent by $from.");
+            if (is_null($from) || $this->is_self($from)) {
+                $this->log(LOG_INFO, "Ignoring notice originally sent by $from.");
+                return;
+            }
+        }
+
+        $user = $this->get_user($from);
+
+        // For common_current_user to work
+        global $_cur;
+        $_cur = $user;
+
+        if (!$user) {
+            $this->from_site($from, 'Unknown user; go to ' .
+                             common_local_url('imsettings') .
+                             ' to add your address to your account');
+            $this->log(LOG_WARNING, 'Message from unknown user ' . $from);
+            return;
+        }
+        if ($this->handle_command($user, $pl['body'])) {
+            $this->log(LOG_INFO, "Command message by $from handled.");
+            return;
+        } else if ($this->is_autoreply($pl['body'])) {
+            $this->log(LOG_INFO, 'Ignoring auto reply from ' . $from);
+            return;
+        } else if ($this->is_otr($pl['body'])) {
+            $this->log(LOG_INFO, 'Ignoring OTR from ' . $from);
+            return;
+        } else {
+
+            $this->log(LOG_INFO, 'Posting a notice from ' . $user->nickname);
+
+            $this->add_notice($user, $pl);
+        }
+
+        $user->free();
+        unset($user);
+        unset($_cur);
+
+        unset($pl['xml']);
+        $pl['xml'] = null;
+
+        $pl = null;
+        unset($pl);
     }
 
-    /**
-     * Build an <addresses> block with an ofrom entry for forwarded messages
-     *
-     * @param string $from Jabber ID of original sender
-     * @return string XML fragment
-     */
-    protected function ofrom($from)
+
+    function is_self($from)
     {
-        $address = "<addresses xmlns='http://jabber.org/protocol/address'>\n";
-        $address .= "<address type='ofrom' jid='$from' />\n";
-        $address .= "</addresses>\n";
-        return $address;
+        return preg_match('/^'.strtolower(jabber_daemon_address()).'/', strtolower($from));
     }
 
-    /**
-     * Build the complete JID of the XmppDaemon process which
-     * handles primary XMPP input for this site.
-     *
-     * @return string Jabber ID
-     */
-    protected function listener()
+    function get_ofrom($pl)
+    {
+        $xml = $pl['xml'];
+        $addresses = $xml->sub('addresses');
+        if (!$addresses) {
+            $this->log(LOG_WARNING, 'Forwarded message without addresses');
+            return null;
+        }
+        $address = $addresses->sub('address');
+        if (!$address) {
+            $this->log(LOG_WARNING, 'Forwarded message without address');
+            return null;
+        }
+        if (!array_key_exists('type', $address->attrs)) {
+            $this->log(LOG_WARNING, 'No type for forwarded message');
+            return null;
+        }
+        $type = $address->attrs['type'];
+        if ($type != 'ofrom') {
+            $this->log(LOG_WARNING, 'Type of forwarded message is not ofrom');
+            return null;
+        }
+        if (!array_key_exists('jid', $address->attrs)) {
+            $this->log(LOG_WARNING, 'No jid for forwarded message');
+            return null;
+        }
+        $jid = $address->attrs['jid'];
+        if (!$jid) {
+            $this->log(LOG_WARNING, 'Could not get jid from address');
+            return null;
+        }
+        $this->log(LOG_DEBUG, 'Got message forwarded from jid ' . $jid);
+        return $jid;
+    }
+
+    function is_autoreply($txt)
+    {
+        if (preg_match('/[\[\(]?[Aa]uto[-\s]?[Rr]e(ply|sponse)[\]\)]/', $txt)) {
+            return true;
+        } else if (preg_match('/^System: Message wasn\'t delivered. Offline storage size was exceeded.$/', $txt)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    function is_otr($txt)
+    {
+        if (preg_match('/^\?OTR/', $txt)) {
+            return true;
+        } else {
+            return false;
+        }
+    }
+
+    function from_site($address, $msg)
+    {
+        $text = '['.common_config('site', 'name') . '] ' . $msg;
+        jabber_send_message($address, $text);
+    }
+
+    function handle_command($user, $body)
     {
-        if (common_config('xmpp', 'listener')) {
-            return common_config('xmpp', 'listener');
+        $inter = new CommandInterpreter();
+        $cmd = $inter->handle_command($user, $body);
+        if ($cmd) {
+            $chan = new XMPPChannel($this->conn);
+            $cmd->execute($chan);
+            return true;
         } else {
-            return jabber_daemon_address() . '/' . common_config('xmpp','resource') . 'daemon';
+            return false;
+        }
+    }
+
+    function add_notice(&$user, &$pl)
+    {
+        $body = trim($pl['body']);
+        $content_shortened = common_shorten_links($body);
+        if (Notice::contentTooLong($content_shortened)) {
+          $from = jabber_normalize_jid($pl['from']);
+          $this->from_site($from, sprintf(_('Message too long - maximum is %1$d characters, you sent %2$d.'),
+                                          Notice::maxContent(),
+                                          mb_strlen($content_shortened)));
+          return;
+        }
+
+        try {
+            $notice = Notice::saveNew($user->id, $content_shortened, 'xmpp');
+        } catch (Exception $e) {
+            $this->log(LOG_ERR, $e->getMessage());
+            $this->from_site($user->jabber, $e->getMessage());
+            return;
         }
+
+        common_broadcast_notice($notice);
+        $this->log(LOG_INFO,
+                   'Added notice ' . $notice->id . ' from user ' . $user->nickname);
+        $notice->free();
+        unset($notice);
+    }
+
+    function handle_presence(&$pl)
+    {
+        $from = jabber_normalize_jid($pl['from']);
+        switch ($pl['type']) {
+         case 'subscribe':
+            # We let anyone subscribe
+            $this->subscribed($from);
+            $this->log(LOG_INFO,
+                       'Accepted subscription from ' . $from);
+            break;
+         case 'subscribed':
+         case 'unsubscribed':
+         case 'unsubscribe':
+            $this->log(LOG_INFO,
+                       'Ignoring  "' . $pl['type'] . '" from ' . $from);
+            break;
+         default:
+            if (!$pl['type']) {
+                $user = User::staticGet('jabber', $from);
+                if (!$user) {
+                    $this->log(LOG_WARNING, 'Presence from unknown user ' . $from);
+                    return;
+                }
+                if ($user->updatefrompresence) {
+                    $this->log(LOG_INFO, 'Updating ' . $user->nickname .
+                               ' status from presence.');
+                    $this->add_notice($user, $pl);
+                }
+                $user->free();
+                unset($user);
+            }
+            break;
+        }
+        unset($pl['xml']);
+        $pl['xml'] = null;
+
+        $pl = null;
+        unset($pl);
+    }
+
+    function log($level, $msg)
+    {
+        $text = 'XMPPDaemon('.$this->resource.'): '.$msg;
+        common_log($level, $text);
     }
 
-    protected function resource()
+    function subscribed($to)
     {
-        return 'queue' . posix_getpid(); // @fixme PIDs won't be host-unique
+        jabber_special_presence('subscribed', $to);
     }
 
     /**
diff --git a/lib/xmppoutqueuehandler.php b/lib/xmppoutqueuehandler.php
new file mode 100644 (file)
index 0000000..2afa260
--- /dev/null
@@ -0,0 +1,55 @@
+<?php
+/*
+ * StatusNet - the distributed open-source microblogging tool
+ * Copyright (C) 2010, StatusNet, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+/**
+ * Queue handler for pre-processed outgoing XMPP messages.
+ * Formatted XML stanzas will have been pushed into the queue
+ * via the Queued_XMPP connection proxy, probably from some
+ * other queue processor.
+ *
+ * Here, the XML stanzas are simply pulled out of the queue and
+ * pushed out over the wire; an XmppManager is needed to set up
+ * and maintain the actual server connection.
+ *
+ * This queue will be run via XmppDaemon rather than QueueDaemon.
+ *
+ * @author Brion Vibber <brion@status.net>
+ */
+class XmppOutQueueHandler extends QueueHandler
+{
+    function transport() {
+        return 'xmppout';
+    }
+
+    /**
+     * Take a previously-queued XMPP stanza and send it out ot the server.
+     * @param string $msg
+     * @return boolean true on success
+     */
+    function handle($msg)
+    {
+        assert(is_string($msg));
+
+        $xmpp = XmppManager::get();
+        $ok = $xmpp->send($msg);
+
+        return $ok;
+    }
+}
+
index f0e706b929fd2182a7735eae21ea82ad0067702b..14085cc5e39d3a5aecf1ef67c6f490df4353967d 100644 (file)
@@ -32,14 +32,7 @@ class EnjitQueueHandler extends QueueHandler
         return 'enjit';
     }
 
-    function start()
-    {
-        $this->log(LOG_INFO, "Starting EnjitQueueHandler");
-        $this->log(LOG_INFO, "Broadcasting to ".common_config('enjit', 'apiurl'));
-        return true;
-    }
-
-    function handle_notice($notice)
+    function handle($notice)
     {
 
         $profile = Profile::staticGet($notice->profile_id);
index 1778690e5bcddac296c1af3078a6cf19d5fbb14d..524af7bc45fc39e99b8f8f29c20e4db43f9eafeb 100644 (file)
@@ -28,7 +28,7 @@ class FacebookQueueHandler extends QueueHandler
         return 'facebook';
     }
 
-    function handle_notice($notice)
+    function handle($notice)
     {
         if ($this->_isLocal($notice)) {
             return facebookBroadcastNotice($notice);
index 2de162628ffbbb05fe7fc0be56f4a76f64190c45..9f444c8bba0ca5482363ca612b242ec6eabbe43a 100644 (file)
@@ -138,6 +138,9 @@ class RSSCloudPlugin extends Plugin
         case 'RSSCloudNotifier':
             include_once INSTALLDIR . '/plugins/RSSCloud/RSSCloudNotifier.php';
             return false;
+        case 'RSSCloudQueueHandler':
+            include_once INSTALLDIR . '/plugins/RSSCloud/RSSCloudQueueHandler.php';
+            return false;
         case 'RSSCloudRequestNotifyAction':
         case 'LoggingAggregatorAction':
             include_once INSTALLDIR . '/plugins/RSSCloud/' .
@@ -193,32 +196,6 @@ class RSSCloudPlugin extends Plugin
         return true;
     }
 
-    /**
-     * broadcast the message when not using queuehandler
-     *
-     * @param Notice &$notice the notice
-     * @param array  $queue   destination queue
-     *
-     * @return boolean hook return
-     */
-
-    function onUnqueueHandleNotice(&$notice, $queue)
-    {
-        if (($queue == 'rsscloud') && ($this->_isLocal($notice))) {
-
-            common_debug('broadcasting rssCloud bound notice ' . $notice->id);
-
-            $profile = $notice->getProfile();
-
-            $notifier = new RSSCloudNotifier();
-            $notifier->notify($profile);
-
-            return false;
-        }
-
-        return true;
-    }
-
     /**
      * Determine whether the notice was locally created
      *
@@ -261,19 +238,15 @@ class RSSCloudPlugin extends Plugin
     }
 
     /**
-     * Add RSSCloudQueueHandler to the list of valid daemons to
-     * start
+     * Register RSSCloud notice queue handler
      *
-     * @param array $daemons the list of daemons to run
+     * @param QueueManager $manager
      *
      * @return boolean hook return
-     *
      */
-
-    function onGetValidDaemons($daemons)
+    function onEndInitializeQueueManager($manager)
     {
-        array_push($daemons, INSTALLDIR .
-                   '/plugins/RSSCloud/RSSCloudQueueHandler.php');
+        $manager->connect('rsscloud', 'RSSCloudQueueHandler');
         return true;
     }
 
old mode 100755 (executable)
new mode 100644 (file)
index 693dd27..295c261
@@ -1,4 +1,3 @@
-#!/usr/bin/env php
 <?php
 /*
  * StatusNet - the distributed open-source microblogging tool
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
-define('INSTALLDIR', realpath(dirname(__FILE__) . '/../..'));
-
-$shortoptions = 'i::';
-$longoptions = array('id::');
-
-$helptext = <<<END_OF_ENJIT_HELP
-Daemon script for pushing new notices to RSSCloud subscribers.
-
-    -i --id           Identity (default none)
-
-END_OF_ENJIT_HELP;
-
-require_once INSTALLDIR . '/scripts/commandline.inc';
-require_once INSTALLDIR . '/lib/queuehandler.php';
-require_once INSTALLDIR . '/plugins/RSSCloud/RSSCloudNotifier.php';
-require_once INSTALLDIR . '/plugins/RSSCloud/RSSCloudSubscription.php';
+if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
 
 class RSSCloudQueueHandler extends QueueHandler
 {
-    var $notifier = null;
-
     function transport()
     {
         return 'rsscloud';
     }
 
-    function start()
-    {
-        $this->log(LOG_INFO, "INITIALIZE");
-        $this->notifier = new RSSCloudNotifier();
-        return true;
-    }
-
-    function handle_notice($notice)
+    function handle($notice)
     {
         $profile = $notice->getProfile();
-        return $this->notifier->notify($profile);
-    }
-
-    function finish()
-    {
+        $notifier = new RSSCloudNotifier();
+        return $notifier->notify($profile);
     }
-
-}
-
-if (have_option('i')) {
-    $id = get_option_value('i');
-} else if (have_option('--id')) {
-    $id = get_option_value('--id');
-} else if (count($args) > 0) {
-    $id = $args[0];
-} else {
-    $id = null;
 }
 
-$handler = new RSSCloudQueueHandler($id);
-
-$handler->runOnce();
index 5089ca7b74203811ae0c2bea3a68e8d355c5014a..b5a624e83d928815ad3c5cee1396ba16f2dae637 100644 (file)
@@ -28,7 +28,7 @@ class TwitterQueueHandler extends QueueHandler
         return 'twitter';
     }
 
-    function handle_notice($notice)
+    function handle($notice)
     {
         return broadcast_twitter($notice);
     }
index 9031437aac9439a1139ff3deaa30857c468ff5ae..8158849695e92cad7664051cd6edabdd7414ea83 100755 (executable)
@@ -50,7 +50,7 @@ if (empty($notice)) {
     exit(1);
 }
 
-if (!$handler->handle_notice($notice)) {
+if (!$handler->handle($notice)) {
     print "Failed to handle notice id $noticeId on queue '$queue'.\n";
     exit(1);
 }
index 162f617e0d6da3a0838469f3fb98148acedd3b11..a9cfda6d72e8f927ee513fe8572a26ccde00c96c 100755 (executable)
@@ -29,6 +29,8 @@ $longoptions = array('id=', 'foreground', 'all', 'threads=', 'skip-xmpp', 'xmpp-
  *
  * Recognizes Linux and Mac OS X; others will return default of 1.
  *
+ * @fixme move this to SpawningDaemon, but to get the default val for help
+ *        text we seem to need it before loading infrastructure
  * @return intval
  */
 function getProcessorCount()
@@ -83,143 +85,29 @@ define('CLAIM_TIMEOUT', 1200);
  * We can then pass individual items through the QueueHandler subclasses
  * they belong to.
  */
-class QueueDaemon extends Daemon
+class QueueDaemon extends SpawningDaemon
 {
-    protected $allsites;
-    protected $threads=1;
+    protected $allsites = false;
 
     function __construct($id=null, $daemonize=true, $threads=1, $allsites=false)
     {
-        parent::__construct($daemonize);
-
-        if ($id) {
-            $this->set_id($id);
-        }
+        parent::__construct($id, $daemonize, $threads);
         $this->all = $allsites;
-        $this->threads = $threads;
-    }
-
-    /**
-     * How many seconds a polling-based queue manager should wait between
-     * checks for new items to handle.
-     *
-     * Defaults to 60 seconds; override to speed up or slow down.
-     *
-     * @return int timeout in seconds
-     */
-    function timeout()
-    {
-        return 60;
-    }
-
-    function name()
-    {
-        return strtolower(get_class($this).'.'.$this->get_id());
-    }
-
-    function run()
-    {
-        if ($this->threads > 1) {
-            return $this->runThreads();
-        } else {
-            return $this->runLoop();
-        }
-    }
-    
-    function runThreads()
-    {
-        $children = array();
-        for ($i = 1; $i <= $this->threads; $i++) {
-            $pid = pcntl_fork();
-            if ($pid < 0) {
-                print "Couldn't fork for thread $i; aborting\n";
-                exit(1);
-            } else if ($pid == 0) {
-                $this->runChild($i);
-                exit(0);
-            } else {
-                $this->log(LOG_INFO, "Spawned thread $i as pid $pid");
-                $children[$i] = $pid;
-            }
-        }
-        
-        $this->log(LOG_INFO, "Waiting for children to complete.");
-        while (count($children) > 0) {
-            $status = null;
-            $pid = pcntl_wait($status);
-            if ($pid > 0) {
-                $i = array_search($pid, $children);
-                if ($i === false) {
-                    $this->log(LOG_ERR, "Unrecognized child pid $pid exited!");
-                    continue;
-                }
-                unset($children[$i]);
-                $this->log(LOG_INFO, "Thread $i pid $pid exited.");
-                
-                $pid = pcntl_fork();
-                if ($pid < 0) {
-                    print "Couldn't fork to respawn thread $i; aborting thread.\n";
-                } else if ($pid == 0) {
-                    $this->runChild($i);
-                    exit(0);
-                } else {
-                    $this->log(LOG_INFO, "Respawned thread $i as pid $pid");
-                    $children[$i] = $pid;
-                }
-            }
-        }
-        $this->log(LOG_INFO, "All child processes complete.");
-        return true;
-    }
-
-    function runChild($thread)
-    {
-        $this->set_id($this->get_id() . "." . $thread);
-        $this->resetDb();
-        $this->runLoop();
-    }
-
-    /**
-     * Reconnect to the database for each child process,
-     * or they'll get very confused trying to use the
-     * same socket.
-     */
-    function resetDb()
-    {
-        // @fixme do we need to explicitly open the db too
-        // or is this implied?
-        global $_DB_DATAOBJECT;
-        unset($_DB_DATAOBJECT['CONNECTIONS']);
-
-        // Reconnect main memcached, or threads will stomp on
-        // each other and corrupt their requests.
-        $cache = common_memcache();
-        if ($cache) {
-            $cache->reconnect();
-        }
-
-        // Also reconnect memcached for status_network table.
-        if (!empty(Status_network::$cache)) {
-            Status_network::$cache->close();
-            Status_network::$cache = null;
-        }
     }
 
     /**
      * Setup and start of run loop for this queue handler as a daemon.
      * Most of the heavy lifting is passed on to the QueueManager's service()
-     * method, which passes control on to the QueueHandler's handle_notice()
-     * method for each notice that comes in on the queue.
-     *
-     * Most of the time this won't need to be overridden in a subclass.
+     * method, which passes control on to the QueueHandler's handle()
+     * method for each item that comes in on the queue.
      *
      * @return boolean true on success, false on failure
      */
-    function runLoop()
+    function runThread()
     {
         $this->log(LOG_INFO, 'checking for queued notices');
 
-        $master = new IoMaster($this->get_id());
+        $master = new QueueMaster($this->get_id());
         $master->init($this->all);
         $master->service();
 
@@ -229,10 +117,25 @@ class QueueDaemon extends Daemon
 
         return true;
     }
+}
 
-    function log($level, $msg)
+class QueueMaster extends IoMaster
+{
+    /**
+     * Initialize IoManagers for the currently configured site
+     * which are appropriate to this instance.
+     */
+    function initManagers()
     {
-        common_log($level, get_class($this) . ' ('. $this->get_id() .'): '.$msg);
+        $classes = array();
+        if (Event::handle('StartQueueDaemonIoManagers', array(&$classes))) {
+            $classes[] = 'QueueManager';
+        }
+        Event::handle('EndQueueDaemonIoManagers', array(&$classes));
+
+        foreach ($classes as $class) {
+            $this->instantiate($class);
+        }
     }
 }
 
index cef9c4bd074e7d049e25ddf2d8281726cd36b78d..fd7cf055b485cedaae3f327952c16baf4f634456 100755 (executable)
@@ -33,347 +33,46 @@ END_OF_XMPP_HELP;
 
 require_once INSTALLDIR.'/scripts/commandline.inc';
 
-require_once INSTALLDIR . '/lib/common.php';
 require_once INSTALLDIR . '/lib/jabber.php';
-require_once INSTALLDIR . '/lib/daemon.php';
 
-# This is kind of clunky; we create a class to call the global functions
-# in jabber.php, which create a new XMPP class. A more elegant (?) solution
-# might be to use make this a subclass of XMPP.
-
-class XMPPDaemon extends Daemon
+class XMPPDaemon extends SpawningDaemon
 {
-    function __construct($resource=null, $daemonize=true)
-    {
-        parent::__construct($daemonize);
-
-        static $attrs = array('server', 'port', 'user', 'password', 'host');
-
-        foreach ($attrs as $attr)
-        {
-            $this->$attr = common_config('xmpp', $attr);
-        }
-
-        if ($resource) {
-            $this->resource = $resource . 'daemon';
-        } else {
-            $this->resource = common_config('xmpp', 'resource') . 'daemon';
-        }
-
-        $this->jid = $this->user.'@'.$this->server.'/'.$this->resource;
-
-        $this->log(LOG_INFO, "INITIALIZE XMPPDaemon {$this->jid}");
-    }
-
-    function connect()
-    {
-        $connect_to = ($this->host) ? $this->host : $this->server;
-
-        $this->log(LOG_INFO, "Connecting to $connect_to on port $this->port");
-
-        $this->conn = jabber_connect($this->resource);
-
-        if (!$this->conn) {
-            return false;
-        }
-
-        $this->log(LOG_INFO, "Connected");
-
-        $this->conn->setReconnectTimeout(600);
-
-        $this->log(LOG_INFO, "Sending initial presence.");
-
-        jabber_send_presence("Send me a message to post a notice", 'available',
-                             null, 'available', 100);
-
-        $this->log(LOG_INFO, "Done connecting.");
-
-        return !$this->conn->isDisconnected();
-    }
-
-    function name()
-    {
-        return strtolower('xmppdaemon.'.$this->resource);
-    }
-
-    function run()
+    function __construct($id=null, $daemonize=true, $threads=1)
     {
-        if ($this->connect()) {
-
-            $this->log(LOG_DEBUG, "Initializing stanza handlers.");
-
-            $this->conn->addEventHandler('message', 'handle_message', $this);
-            $this->conn->addEventHandler('presence', 'handle_presence', $this);
-            $this->conn->addEventHandler('reconnect', 'handle_reconnect', $this);
-
-            $this->log(LOG_DEBUG, "Beginning processing loop.");
-
-            while ($this->conn->processTime(60)) {
-                $this->sendPing();
-            }
+        if ($threads != 1) {
+            // This should never happen. :)
+            throw new Exception("XMPPDaemon can must run single-threaded");
         }
+        parent::__construct($id, $daemonize, $threads);
     }
 
-    function sendPing()
+    function runThread()
     {
-        if (!isset($this->pingid)) {
-            $this->pingid = 0;
-        } else {
-            $this->pingid++;
-        }
+        common_log(LOG_INFO, 'Waiting to listen to XMPP and queues');
 
-        $this->log(LOG_DEBUG, "Sending ping #{$this->pingid}");
+        $master = new XmppMaster($this->get_id());
+        $master->init();
+        $master->service();
 
-               $this->conn->send("<iq from='{$this->jid}' to='{$this->server}' id='ping_{$this->pingid}' type='get'><ping xmlns='urn:xmpp:ping'/></iq>");
-    }
+        common_log(LOG_INFO, 'terminating normally');
 
-    function handle_reconnect(&$pl)
-    {
-        $this->log(LOG_DEBUG, "Got reconnection callback.");
-        $this->conn->processUntil('session_start');
-        $this->log(LOG_DEBUG, "Sending reconnection presence.");
-        $this->conn->presence('Send me a message to post a notice', 'available', null, 'available', 100);
-        unset($pl['xml']);
-        $pl['xml'] = null;
-
-        $pl = null;
-        unset($pl);
-    }
-
-    function get_user($from)
-    {
-        $user = User::staticGet('jabber', jabber_normalize_jid($from));
-        return $user;
+        return true;
     }
 
-    function handle_message(&$pl)
-    {
-        $from = jabber_normalize_jid($pl['from']);
-
-        if ($pl['type'] != 'chat') {
-            $this->log(LOG_WARNING, "Ignoring message of type ".$pl['type']." from $from.");
-            return;
-        }
-
-        if (mb_strlen($pl['body']) == 0) {
-            $this->log(LOG_WARNING, "Ignoring message with empty body from $from.");
-            return;
-        }
-
-        # Forwarded from another daemon (probably a broadcaster) for
-        # us to handle
-
-        if ($this->is_self($from)) {
-            $this->log(LOG_INFO, "Got forwarded notice from self ($from).");
-            $from = $this->get_ofrom($pl);
-            $this->log(LOG_INFO, "Originally sent by $from.");
-            if (is_null($from) || $this->is_self($from)) {
-                $this->log(LOG_INFO, "Ignoring notice originally sent by $from.");
-                return;
-            }
-        }
-
-        $user = $this->get_user($from);
-
-        // For common_current_user to work
-        global $_cur;
-        $_cur = $user;
-
-        if (!$user) {
-            $this->from_site($from, 'Unknown user; go to ' .
-                             common_local_url('imsettings') .
-                             ' to add your address to your account');
-            $this->log(LOG_WARNING, 'Message from unknown user ' . $from);
-            return;
-        }
-        if ($this->handle_command($user, $pl['body'])) {
-            $this->log(LOG_INFO, "Command message by $from handled.");
-            return;
-        } else if ($this->is_autoreply($pl['body'])) {
-            $this->log(LOG_INFO, 'Ignoring auto reply from ' . $from);
-            return;
-        } else if ($this->is_otr($pl['body'])) {
-            $this->log(LOG_INFO, 'Ignoring OTR from ' . $from);
-            return;
-        } else {
-
-            $this->log(LOG_INFO, 'Posting a notice from ' . $user->nickname);
-
-            $this->add_notice($user, $pl);
-        }
-
-        $user->free();
-        unset($user);
-        unset($_cur);
-
-        unset($pl['xml']);
-        $pl['xml'] = null;
-
-        $pl = null;
-        unset($pl);
-    }
-
-    function is_self($from)
-    {
-        return preg_match('/^'.strtolower(jabber_daemon_address()).'/', strtolower($from));
-    }
-
-    function get_ofrom($pl)
-    {
-        $xml = $pl['xml'];
-        $addresses = $xml->sub('addresses');
-        if (!$addresses) {
-            $this->log(LOG_WARNING, 'Forwarded message without addresses');
-            return null;
-        }
-        $address = $addresses->sub('address');
-        if (!$address) {
-            $this->log(LOG_WARNING, 'Forwarded message without address');
-            return null;
-        }
-        if (!array_key_exists('type', $address->attrs)) {
-            $this->log(LOG_WARNING, 'No type for forwarded message');
-            return null;
-        }
-        $type = $address->attrs['type'];
-        if ($type != 'ofrom') {
-            $this->log(LOG_WARNING, 'Type of forwarded message is not ofrom');
-            return null;
-        }
-        if (!array_key_exists('jid', $address->attrs)) {
-            $this->log(LOG_WARNING, 'No jid for forwarded message');
-            return null;
-        }
-        $jid = $address->attrs['jid'];
-        if (!$jid) {
-            $this->log(LOG_WARNING, 'Could not get jid from address');
-            return null;
-        }
-        $this->log(LOG_DEBUG, 'Got message forwarded from jid ' . $jid);
-        return $jid;
-    }
-
-    function is_autoreply($txt)
-    {
-        if (preg_match('/[\[\(]?[Aa]uto[-\s]?[Rr]e(ply|sponse)[\]\)]/', $txt)) {
-            return true;
-        } else if (preg_match('/^System: Message wasn\'t delivered. Offline storage size was exceeded.$/', $txt)) {
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    function is_otr($txt)
-    {
-        if (preg_match('/^\?OTR/', $txt)) {
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    function from_site($address, $msg)
-    {
-        $text = '['.common_config('site', 'name') . '] ' . $msg;
-        jabber_send_message($address, $text);
-    }
-
-    function handle_command($user, $body)
-    {
-        $inter = new CommandInterpreter();
-        $cmd = $inter->handle_command($user, $body);
-        if ($cmd) {
-            $chan = new XMPPChannel($this->conn);
-            $cmd->execute($chan);
-            return true;
-        } else {
-            return false;
-        }
-    }
-
-    function add_notice(&$user, &$pl)
-    {
-        $body = trim($pl['body']);
-        $content_shortened = common_shorten_links($body);
-        if (Notice::contentTooLong($content_shortened)) {
-          $from = jabber_normalize_jid($pl['from']);
-          $this->from_site($from, sprintf(_('Message too long - maximum is %1$d characters, you sent %2$d.'),
-                                          Notice::maxContent(),
-                                          mb_strlen($content_shortened)));
-          return;
-        }
-
-        try {
-            $notice = Notice::saveNew($user->id, $content_shortened, 'xmpp');
-        } catch (Exception $e) {
-            $this->log(LOG_ERR, $e->getMessage());
-            $this->from_site($user->jabber, $e->getMessage());
-            return;
-        }
-
-        common_broadcast_notice($notice);
-        $this->log(LOG_INFO,
-                   'Added notice ' . $notice->id . ' from user ' . $user->nickname);
-        $notice->free();
-        unset($notice);
-    }
-
-    function handle_presence(&$pl)
-    {
-        $from = jabber_normalize_jid($pl['from']);
-        switch ($pl['type']) {
-         case 'subscribe':
-            # We let anyone subscribe
-            $this->subscribed($from);
-            $this->log(LOG_INFO,
-                       'Accepted subscription from ' . $from);
-            break;
-         case 'subscribed':
-         case 'unsubscribed':
-         case 'unsubscribe':
-            $this->log(LOG_INFO,
-                       'Ignoring  "' . $pl['type'] . '" from ' . $from);
-            break;
-         default:
-            if (!$pl['type']) {
-                $user = User::staticGet('jabber', $from);
-                if (!$user) {
-                    $this->log(LOG_WARNING, 'Presence from unknown user ' . $from);
-                    return;
-                }
-                if ($user->updatefrompresence) {
-                    $this->log(LOG_INFO, 'Updating ' . $user->nickname .
-                               ' status from presence.');
-                    $this->add_notice($user, $pl);
-                }
-                $user->free();
-                unset($user);
-            }
-            break;
-        }
-        unset($pl['xml']);
-        $pl['xml'] = null;
-
-        $pl = null;
-        unset($pl);
-    }
-
-    function log($level, $msg)
-    {
-        $text = 'XMPPDaemon('.$this->resource.'): '.$msg;
-        common_log($level, $text);
-        if (!$this->daemonize)
-        {
-            $line = common_log_line($level, $text);
-            echo $line;
-            echo "\n";
-        }
-    }
+}
 
-    function subscribed($to)
-    {
-        jabber_special_presence('subscribed', $to);
+class XmppMaster extends IoMaster
+{
+    /**
+     * Initialize IoManagers for the currently configured site
+     * which are appropriate to this instance.
+     */
+    function initManagers()
+    {
+        // @fixme right now there's a hack in QueueManager to determine
+        // which queues to subscribe to based on the master class.
+        $this->instantiate('QueueManager');
+        $this->instantiate('XmppManager');
     }
 }