]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
break up monolithic xmppdaemon into multiple queue handlers
authorEvan Prodromou <evan@prodromou.name>
Wed, 27 Aug 2008 20:54:07 +0000 (16:54 -0400)
committerEvan Prodromou <evan@prodromou.name>
Wed, 27 Aug 2008 20:54:07 +0000 (16:54 -0400)
Eventually, the poor xmppdaemon has become overloaded with extra
tasks. So, I've broken it up. Now, we have 5 background scripts, and
more coming:

* xmppdaemon.php - handles incoming XMPP messages only.
* xmppqueuehandler.php - sends notices from the queue out through XMPP.
* smsqueuehandler.php - sends notices from the queue out over SMS
* ombqueuehandler.php - sends notices from the queue out over OMB
* xmppconfirmhandler.php - sends confirmation requests out over XMPP.

This is in addition to maildaemon.php, which takes incoming messages.

None of these are "true" daemons -- they don't daemonize themselves
automatically. Use nohup or another tool to background them. monit can
also be useful to keep them running.

At some point, these might become fork()'ing daemons, able to handle
more than one notice at a time. For now, I'm just running multiple
instances, hoping they don't interfere.

darcs-hash:20080827205407-84dde-97884a12f5f4e54c93bc785bd280683d1ee7e749.gz

classes/Queue_item.php
lib/jabber.php
lib/queuehandler.php [new file with mode: 0644]
lib/util.php
scripts/ombqueuehandler.php [new file with mode: 0755]
scripts/smsqueuehandler.php [new file with mode: 0755]
scripts/xmppconfirmhandler.php [new file with mode: 0755]
scripts/xmppdaemon.php
scripts/xmppqueuehandler.php [new file with mode: 0755]

index a953e141b874e22c16c4c0dfbb15c2f7f030b584..2ae78d9f64e8284187adc666b71467115c28fc5d 100644 (file)
@@ -22,4 +22,34 @@ class Queue_item extends DB_DataObject
     ###END_AUTOCODE
 
     function sequenceKey() { return array(false, false); }
+       
+       static function top($transport) {
+
+               $qi = new Queue_item();
+               $qi->transport = $transport;
+               $qi->orderBy('created');
+               $qi->whereAdd('claimed is NULL');
+
+               $qi->limit(1);
+
+               $cnt = $qi->find(TRUE);
+
+               if ($cnt) {
+                       # XXX: potential race condition
+                       # can we force it to only update if claimed is still NULL
+                       # (or old)?
+                       $this->log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id);
+                       $orig = clone($qi);
+                       $qi->claimed = common_sql_now();
+                       $result = $qi->update($orig);
+                       if ($result) {
+                               $this->log(LOG_INFO, 'claim succeeded.');
+                               return $qi;
+                       } else {
+                               $this->log(LOG_INFO, 'claim failed.');
+                       }
+               }
+               $qi = NULL;
+               return NULL;
+       }
 }
index 1202aa32227505f4d0682389063c695728f6fa73..e2f7ca3c8255a10928c6270b7c662e08c9a08c46 100644 (file)
@@ -26,23 +26,45 @@ require_once('XMPPHP/XMPP.php');
 class Laconica_XMPP extends XMPPHP_XMPP {
     
     function messageplus($to, $body, $type = 'chat', $subject = null, $payload = null) {
-       $to       = htmlspecialchars($to);
-       $body   = htmlspecialchars($body);
-       $subject = htmlspecialchars($subject);
-
-       $jid = jabber_daemon_address();
-       
-       $out = "<message from='$jid' to='$to' type='$type'>";
-       if($subject) $out .= "<subject>$subject</subject>";
-       $out .= "<body>$body</body>";
-       if($payload) $out .= $payload;
-       $out .= "</message>";
-
+               $to       = htmlspecialchars($to);
+               $body   = htmlspecialchars($body);
+               $subject = htmlspecialchars($subject);
+               
+               $jid = jabber_daemon_address();
+               
+               $out = "<message from='$jid' to='$to' type='$type'>";
+               if($subject) $out .= "<subject>$subject</subject>";
+               $out .= "<body>$body</body>";
+               if($payload) $out .= $payload;
+               $out .= "</message>";
+               
                $cnt = strlen($out);
                common_log(LOG_DEBUG, "Sending $cnt chars to $to");
-       $this->send($out);
+               $this->send($out);
                common_log(LOG_DEBUG, 'Done.');
     }
+       
+       public function presence($status = null, $show = 'available', $to = null, $type='available', $priority=NULL) {
+               if($type == 'available') $type = '';
+               $to      = htmlspecialchars($to);
+               $status = htmlspecialchars($status);
+               if($show == 'unavailable') $type = 'unavailable';
+               
+               $out = "<presence";
+               if($to) $out .= " to='$to'";
+               if($type) $out .= " type='$type'";
+               if($show == 'available' and !$status and is_null($priority)) {
+                       $out .= "/>";
+               } else {
+                       $out .= ">";
+                       if($show != 'available') $out .= "<show>$show</show>";
+                       if($status) $out .= "<status>$status</status>";
+                       if(!is_null($priority)) $out .= "<priority>$priority</priority>";
+                       $out .= "</presence>";
+               }
+               
+               $this->send($out);
+       }
 }
 
 function jabber_valid_base_jid($jid) {
@@ -64,7 +86,7 @@ function jabber_daemon_address() {
        return common_config('xmpp', 'user') . '@' . common_config('xmpp', 'server');
 }
 
-function jabber_connect($resource=NULL) {
+function jabber_connect($resource=NULL, $status=NULL, $priority=NULL) {
        static $conn = NULL;
        if (!$conn) {
                $conn = new Laconica_XMPP(common_config('xmpp', 'host') ?
@@ -92,6 +114,8 @@ function jabber_connect($resource=NULL) {
                        return false;
                }
        $conn->processUntil('session_start');
+               $conn->getRoster();
+               $conn->presence($presence, $priority);
        }
        return $conn;
 }
diff --git a/lib/queuehandler.php b/lib/queuehandler.php
new file mode 100644 (file)
index 0000000..f12b880
--- /dev/null
@@ -0,0 +1,105 @@
+<?php
+/*
+ * Laconica - a distributed open-source microblogging tool
+ * Copyright (C) 2008, Controlez-Vous, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+define('CLAIM_TIMEOUT', 1200);
+
+class QueueHandler {
+
+       var $_id = 'generic';
+
+       function QueueHandler($id=NULL) {
+               if ($id) {
+                       $this->set_id($id);
+               }
+       }
+       
+       function class_name() {
+               return ucfirst($this->transport()) . 'Handler';
+       }
+       
+       function get_id() {
+               return $this->_id;
+       }
+
+       function set_id($id) {
+               $this->_id = $id;
+       }
+       
+       function transport() {
+               return NULL;
+       }
+       
+       function start() {
+       }
+       
+       function finish() {
+       }
+
+       function handle_notice($notice) {
+               return true;
+       }
+       
+       function handle_queue() {
+               $this->clear_old_claims();
+               $this->log(LOG_INFO, 'checking for queued notices');
+               $cnt = 0;
+               $transport = $this->transport();
+               do {
+                       $qi = Queue_item::top($transport);
+                       if ($qi) {
+                               $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created));
+                               $notice = Notice::staticGet($qi->notice_id);
+                               if ($notice) {
+                                       $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
+                                       # XXX: what to do if broadcast fails?
+                                       $result = $this->handle_notice($notice);
+                                       if (!$result) {
+                                               $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
+                                               $orig = $qi;
+                                               $qi->claimed = NULL;
+                                               $qi->update($orig);
+                                               $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id);
+                                               continue;
+                                       }
+                                       $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
+                                       $notice = NULL;
+                               } else {
+                                       $this->log(LOG_WARNING, 'queue item for notice that does not exist');
+                               }
+                               $qi->delete();
+                               $cnt++;
+                       } else {
+                               $this->clear_old_claims();
+                               sleep(10);
+                       }       
+               } while (true);
+       }
+
+       function clear_old_claims() {
+               $qi = new Queue_item();
+               $qi->transport = $this->transport();
+               $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
+               $qi->update(DB_DATAOBJECT_WHEREADD_ONLY);
+       }
+       
+       function log($level, $msg) {
+               common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
+       }
+}
+       
\ No newline at end of file
index f06f49d71be682e9660a962237a646ddc9115b3a..cea0c3992f71b6a64c7afd0b8e3b2fb4dd8a4a24 100644 (file)
@@ -1070,17 +1070,20 @@ function common_broadcast_notice($notice, $remote=false) {
 # Stick the notice on the queue
 
 function common_enqueue_notice($notice) {
-       $qi = new Queue_item();
-       $qi->notice_id = $notice->id;
-       $qi->created = $notice->created;
+       foreach (array('jabber', 'oms', 'sms') as $transport) {
+               $qi = new Queue_item();
+               $qi->notice_id = $notice->id;
+               $qi->transport = $transport;
+               $qi->created = $notice->created;
+               if (!$result) {
         $result = $qi->insert();
-       if (!$result) {
-           $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError');
-           common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message);
-           return false;
+                       $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError');
+                       common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message);
+                       return false;
+               }
+               common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport);
+               return $result;
        }
-       common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id);
-       return $result;
 }
 
 function common_dequeue_notice($notice) {
@@ -1477,11 +1480,3 @@ function common_canonical_sms($sms) {
        preg_replace('/\D/', '', $sms);
        return $sms;
 }
-
-function common_session_token() {
-       common_ensure_session();
-       if (!array_key_exists('token', $_SESSION)) {
-               $_SESSION['token'] = common_good_rand(64);
-       }
-       return $_SESSION['token'];
-}
diff --git a/scripts/ombqueuehandler.php b/scripts/ombqueuehandler.php
new file mode 100755 (executable)
index 0000000..7bdad25
--- /dev/null
@@ -0,0 +1,70 @@
+#!/usr/bin/env php
+<?php
+/*
+ * Laconica - a distributed open-source microblogging tool
+ * Copyright (C) 2008, Controlez-Vous, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+# Abort if called from a web server
+if (isset($_SERVER) && array_key_exists('REQUEST_METHOD', $_SERVER)) {
+       print "This script must be run from the command line\n";
+       exit();
+}
+
+define('INSTALLDIR', realpath(dirname(__FILE__) . '/..'));
+define('LACONICA', true);
+
+require_once(INSTALLDIR . '/lib/common.php');
+require_once(INSTALLDIR . '/lib/omb.php');
+require_once(INSTALLDIR . '/lib/queuehandler.php');
+
+set_error_handler('common_error_handler');
+
+class OmbQueueHandler {
+       
+       function transport() {
+               return 'omb';
+       }
+       
+       function start() {
+       }
+
+       function handle_notice($notice) {
+               if (!$this->is_remote($notice)) {
+                       omb_broadcast_remote_subscribers($notice);
+               }
+       }
+       
+       function finish() {
+       }
+
+       function is_remote($notice) {
+               $user = User::staticGet($notice->profile_id);
+               return !$user;
+       }
+}
+
+mb_internal_encoding('UTF-8');
+
+$id = ($argc > 1) ? $argv[1] : NULL;
+
+$handler = new OmbQueueHandler($id);
+
+if ($handler->start()) {
+       $handler->handle_queue();
+}
+
+$handler->finish();
diff --git a/scripts/smsqueuehandler.php b/scripts/smsqueuehandler.php
new file mode 100755 (executable)
index 0000000..99dd9cf
--- /dev/null
@@ -0,0 +1,63 @@
+#!/usr/bin/env php
+<?php
+/*
+ * Laconica - a distributed open-source microblogging tool
+ * Copyright (C) 2008, Controlez-Vous, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+# Abort if called from a web server
+if (isset($_SERVER) && array_key_exists('REQUEST_METHOD', $_SERVER)) {
+       print "This script must be run from the command line\n";
+       exit();
+}
+
+define('INSTALLDIR', realpath(dirname(__FILE__) . '/..'));
+define('LACONICA', true);
+
+require_once(INSTALLDIR . '/lib/common.php');
+require_once(INSTALLDIR . '/lib/omb.php');
+require_once(INSTALLDIR . '/lib/queuehandler.php');
+
+set_error_handler('common_error_handler');
+
+class SmsQueueHandler {
+       
+       function transport() {
+               return 'omb';
+       }
+       
+       function start() {
+       }
+
+       function handle_notice($notice) {
+               mail_broadcast_notice_sms($notice);
+       }
+       
+       function finish() {
+       }
+}
+
+mb_internal_encoding('UTF-8');
+
+$id = ($argc > 1) ? $argv[1] : NULL;
+
+$handler = new SmsQueueHandler($id);
+
+if ($handler->start()) {
+       $handler->handle_queue();
+}
+
+$handler->finish();
diff --git a/scripts/xmppconfirmhandler.php b/scripts/xmppconfirmhandler.php
new file mode 100755 (executable)
index 0000000..0d6ee01
--- /dev/null
@@ -0,0 +1,135 @@
+#!/usr/bin/env php
+<?php
+/*
+ * Laconica - a distributed open-source microblogging tool
+ * Copyright (C) 2008, Controlez-Vous, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+# Abort if called from a web server
+if (isset($_SERVER) && array_key_exists('REQUEST_METHOD', $_SERVER)) {
+       print "This script must be run from the command line\n";
+       exit();
+}
+
+define('INSTALLDIR', realpath(dirname(__FILE__) . '/..'));
+define('LACONICA', true);
+
+require_once(INSTALLDIR . '/lib/common.php');
+require_once(INSTALLDIR . '/lib/jabber.php');
+require_once(INSTALLDIR . '/lib/queuehandler.php');
+
+set_error_handler('common_error_handler');
+
+define('CLAIM_TIMEOUT', 1200);
+
+class XmppConfirmHandler {
+
+       var $_id = 'generic';
+       
+       function XmppConfirmHandler($id=NULL) {
+               if ($id) {
+                       $this->_id = $id;
+               }
+       }
+                 
+       function handle_queue() {
+               $this->log(LOG_INFO, 'checking for queued confirmations');
+               $cnt = 0;
+               do {
+                       $confirm = $this->next_confirm();
+                       if ($confirm) {
+                               $this->log(LOG_INFO, 'Sending confirmation for ' . $confirm->address);
+                               $user = User::staticGet($confirm->user_id);
+                               if (!$user) {
+                                       $this->log(LOG_WARNING, 'Confirmation for unknown user ' . $confirm->user_id);
+                                       continue;
+                               }
+                               $success = jabber_confirm_address($confirm->code,
+                                                                                                 $user->nickname,
+                                                                                                 $confirm->address);
+                               if (!$success) {
+                                       $this->log(LOG_ERR, 'Confirmation failed for ' . $confirm->address);
+                                       # Just let the claim age out; hopefully things work then
+                                       continue;
+                               } else {
+                                       $this->log(LOG_INFO, 'Confirmation sent for ' . $confirm->address);
+                                       # Mark confirmation sent
+                                       $original = clone($confirm);
+                                       $confirm->sent = $confirm->claimed;
+                                       $result = $confirm->update($original);
+                                       if (!$result) {
+                                               $this->log(LOG_ERR, 'Cannot mark sent for ' . $confirm->address);
+                                               # Just let the claim age out; hopefully things work then
+                                               continue;
+                                       }
+                               }
+                               $cnt++;
+                       } else {
+                               $this->clear_old_confirm_claims();
+                               sleep(10);
+                       }
+               } while (true);
+       }
+
+       function next_confirm() {
+               $confirm = new Confirm_address();
+               $confirm->whereAdd('claimed IS NULL');
+               $confirm->whereAdd('sent IS NULL');
+               # XXX: eventually we could do other confirmations in the queue, too
+               $confirm->address_type = 'jabber';
+               $confirm->orderBy('modified DESC');
+               $confirm->limit(1);
+               if ($confirm->find(TRUE)) {
+                       $this->log(LOG_INFO, 'Claiming confirmation for ' . $confirm->address);
+                       # working around some weird DB_DataObject behaviour
+                       $confirm->whereAdd(''); # clears where stuff
+                       $original = clone($confirm);
+                       $confirm->claimed = common_sql_now();
+                       $result = $confirm->update($original);
+                       if ($result) {
+                               $this->log(LOG_INFO, 'Succeeded in claim! '. $result);
+                               return $confirm;
+                       } else {
+                               $this->log(LOG_INFO, 'Failed in claim!');
+                               return false;
+                       }
+               }
+               return NULL;
+       }
+
+       function clear_old_confirm_claims() {
+               $confirm = new Confirm();
+               $confirm->claimed = NULL;
+               $confirm->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
+               $confirm->update(DB_DATAOBJECT_WHEREADD_ONLY);
+       }
+       
+       function log($level, $msg) {
+               common_log($level, 'XmppConfirmHandler ('. $this->_id .'): '.$msg);
+       }
+}
+
+mb_internal_encoding('UTF-8');
+
+$resource = ($argc > 1) ? $argv[1] : NULL;
+
+$handler = new XmppConfirmHandler($resource);
+
+if ($handler->start()) {
+       $handler->handle_queue();
+}
+
+$handler->finish();
index da1638011603f57250187d78e9679afd91a76d6e..2b21efa1e647cb011e97c8876145c7770b5d35e2 100755 (executable)
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 
-function xmppdaemon_error_handler($errno, $errstr, $errfile, $errline, $errcontext) {
-    switch ($errno) {
-     case E_USER_ERROR:
-               echo "ERROR: [$errno] $errstr ($errfile:$errline)\n";
-               echo "  Fatal error on line $errline in file $errfile";
-               echo ", PHP " . PHP_VERSION . " (" . PHP_OS . ")\n";
-               echo "Aborting...\n";
-               exit(1);
-               break;
-
-        case E_USER_WARNING:
-               echo "WARNING [$errno] $errstr ($errfile:$errline)\n";
-               break;
-
-     case E_USER_NOTICE:
-               echo "NOTICE [$errno] $errstr ($errfile:$errline)\n";
-               break;
-    }
-
-    /* Don't execute PHP internal error handler */
-    return true;
-}
-
-set_error_handler('xmppdaemon_error_handler');
-
 # Abort if called from a web server
 if (isset($_SERVER) && array_key_exists('REQUEST_METHOD', $_SERVER)) {
        print "This script must be run from the command line\n";
@@ -51,14 +26,12 @@ if (isset($_SERVER) && array_key_exists('REQUEST_METHOD', $_SERVER)) {
 
 define('INSTALLDIR', realpath(dirname(__FILE__) . '/..'));
 define('LACONICA', true);
-define('CLAIM_TIMEOUT', 100000);
-
-define('MAX_BROADCAST_COUNT', 20);
-define('MAX_CONFIRM_COUNT', 20);
 
 require_once(INSTALLDIR . '/lib/common.php');
 require_once(INSTALLDIR . '/lib/jabber.php');
 
+set_error_handler('common_error_handler');
+
 # This is kind of clunky; we create a class to call the global functions
 # in jabber.php, which create a new XMPP class. A more elegant (?) solution
 # might be to use make this a subclass of XMPP.
@@ -88,7 +61,7 @@ class XMPPDaemon {
 
                $this->log(LOG_INFO, "Connecting to $connect_to on port $this->port");
 
-               $this->conn = jabber_connect($this->resource);
+               $this->conn = jabber_connect($this->resource, "Send me a message to post a notice", 100);
 
                if (!$this->conn) {
                        return false;
@@ -101,7 +74,6 @@ class XMPPDaemon {
 
                $this->conn->addEventHandler('message', 'handle_message', $this);
                $this->conn->addEventHandler('presence', 'handle_presence', $this);
-               $this->conn->addEventHandler('session_start', 'handle_session_start', $this);
                
                while(!$this->conn->isDisconnected()) {
                        $this->conn->processTime(5);
@@ -110,27 +82,11 @@ class XMPPDaemon {
                }
        }
 
-       function handle_session_start(&$pl) {
-               $this->conn->getRoster();
-               $this->set_status("Send me a message to post a notice");
-       }
-       
        function get_user($from) {
                $user = User::staticGet('jabber', jabber_normalize_jid($from));
                return $user;
        }
 
-       function get_confirmation($from) {
-               $confirm = new Confirm_address();
-               $confirm->address = $from;
-               $confirm->address_type = 'jabber';
-               if ($confirm->find(TRUE)) {
-                       return $confirm;
-               } else {
-                       return NULL;
-               }
-       }
-
        function handle_message(&$pl) {
                if ($pl['type'] != 'chat') {
                        return;
@@ -260,7 +216,7 @@ class XMPPDaemon {
                        $this->log(LOG_ERR, $notice);
                        return;
                }
-               common_real_broadcast($notice);
+               common_broadcast_notice($notice);
                $this->log(LOG_INFO,
                                   'Added notice ' . $notice->id . ' from user ' . $user->nickname);
        }
@@ -304,155 +260,6 @@ class XMPPDaemon {
        function subscribed($to) {
                jabber_special_presence('subscribed', $to);
        }
-
-       function set_status($status) {
-               $this->log(LOG_INFO, 'Setting status to "' . $status . '"');
-               jabber_send_presence($status);
-       }
-
-       function top_queue_item() {
-
-               $qi = new Queue_item();
-               $qi->orderBy('created');
-               $qi->whereAdd('claimed is NULL');
-
-               $qi->limit(1);
-
-               $cnt = $qi->find(TRUE);
-
-               if ($cnt) {
-                       # XXX: potential race condition
-                       # can we force it to only update if claimed is still NULL
-                       # (or old)?
-                       $this->log(LOG_INFO, 'claiming queue item = ' . $qi->notice_id);
-                       $orig = clone($qi);
-                       $qi->claimed = common_sql_now();
-                       $result = $qi->update($orig);
-                       if ($result) {
-                               $this->log(LOG_INFO, 'claim succeeded.');
-                               return $qi;
-                       } else {
-                               $this->log(LOG_INFO, 'claim failed.');
-                       }
-               }
-               $qi = NULL;
-               return NULL;
-       }
-
-       function broadcast_queue() {
-               $this->clear_old_claims();
-               $this->log(LOG_INFO, 'checking for queued notices');
-               $cnt = 0;
-               do {
-                       $qi = $this->top_queue_item();
-                       if ($qi) {
-                               $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created));
-                               $notice = Notice::staticGet($qi->notice_id);
-                               if ($notice) {
-                                       $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
-                                       # XXX: what to do if broadcast fails?
-                                       $result = common_real_broadcast($notice, $this->is_remote($notice));
-                                       if (!$result) {
-                                               $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
-                                               $orig = $qi;
-                                               $qi->claimed = NULL;
-                                               $qi->update($orig);
-                                               $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id);
-                                               continue;
-                                       }
-                                       $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
-                                       $notice = NULL;
-                               } else {
-                                       $this->log(LOG_WARNING, 'queue item for notice that does not exist');
-                               }
-                               $qi->delete();
-                               $cnt++;
-                       }
-               } while ($qi && $cnt < MAX_BROADCAST_COUNT);
-       }
-
-       function clear_old_claims() {
-               $qi = new Queue_item();
-               $qi->claimed = NULL;
-               $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
-               $qi->update(DB_DATAOBJECT_WHEREADD_ONLY);
-       }
-
-       function is_remote($notice) {
-               $user = User::staticGet($notice->profile_id);
-               return !$user;
-       }
-
-       function confirmation_queue() {
-           # $this->clear_old_confirm_claims();
-               $this->log(LOG_INFO, 'checking for queued confirmations');
-               $cnt = 0;
-               do {
-                       $confirm = $this->next_confirm();
-                       if ($confirm) {
-                               $this->log(LOG_INFO, 'Sending confirmation for ' . $confirm->address);
-                               $user = User::staticGet($confirm->user_id);
-                               if (!$user) {
-                                       $this->log(LOG_WARNING, 'Confirmation for unknown user ' . $confirm->user_id);
-                                       continue;
-                               }
-                               $success = jabber_confirm_address($confirm->code,
-                                                                 $user->nickname,
-                                                                 $confirm->address);
-                               if (!$success) {
-                                       $this->log(LOG_ERR, 'Confirmation failed for ' . $confirm->address);
-                                       # Just let the claim age out; hopefully things work then
-                                       continue;
-                               } else {
-                                       $this->log(LOG_INFO, 'Confirmation sent for ' . $confirm->address);
-                                       # Mark confirmation sent
-                                       $original = clone($confirm);
-                                       $confirm->sent = $confirm->claimed;
-                                       $result = $confirm->update($original);
-                                       if (!$result) {
-                                               $this->log(LOG_ERR, 'Cannot mark sent for ' . $confirm->address);
-                                               # Just let the claim age out; hopefully things work then
-                                               continue;
-                                       }
-                               }
-                               $cnt++;
-                       }
-               } while ($confirm && $cnt < MAX_CONFIRM_COUNT);
-       }
-
-       function next_confirm() {
-               $confirm = new Confirm_address();
-               $confirm->whereAdd('claimed IS NULL');
-               $confirm->whereAdd('sent IS NULL');
-               # XXX: eventually we could do other confirmations in the queue, too
-               $confirm->address_type = 'jabber';
-               $confirm->orderBy('modified DESC');
-               $confirm->limit(1);
-               if ($confirm->find(TRUE)) {
-                       $this->log(LOG_INFO, 'Claiming confirmation for ' . $confirm->address);
-                       # working around some weird DB_DataObject behaviour
-                       $confirm->whereAdd(''); # clears where stuff
-                       $original = clone($confirm);
-                       $confirm->claimed = common_sql_now();
-                       $result = $confirm->update($original);
-                       if ($result) {
-                               $this->log(LOG_INFO, 'Succeeded in claim! '. $result);
-                               return $confirm;
-                       } else {
-                               $this->log(LOG_INFO, 'Failed in claim!');
-                               return false;
-                       }
-               }
-               return NULL;
-       }
-
-       function clear_old_confirm_claims() {
-               $confirm = new Confirm();
-               $confirm->claimed = NULL;
-               $confirm->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
-               $confirm->update(DB_DATAOBJECT_WHEREADD_ONLY);
-       }
-
 }
 
 mb_internal_encoding('UTF-8');
@@ -462,7 +269,6 @@ $resource = ($argc > 1) ? $argv[1] : NULL;
 $daemon = new XMPPDaemon($resource);
 
 if ($daemon->connect()) {
-       $daemon->set_status("Send me a message to post a notice");
        $daemon->handle();
 }
 
diff --git a/scripts/xmppqueuehandler.php b/scripts/xmppqueuehandler.php
new file mode 100755 (executable)
index 0000000..626db85
--- /dev/null
@@ -0,0 +1,65 @@
+#!/usr/bin/env php
+<?php
+/*
+ * Laconica - a distributed open-source microblogging tool
+ * Copyright (C) 2008, Controlez-Vous, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+# Abort if called from a web server
+if (isset($_SERVER) && array_key_exists('REQUEST_METHOD', $_SERVER)) {
+       print "This script must be run from the command line\n";
+       exit();
+}
+
+define('INSTALLDIR', realpath(dirname(__FILE__) . '/..'));
+define('LACONICA', true);
+
+require_once(INSTALLDIR . '/lib/common.php');
+require_once(INSTALLDIR . '/lib/jabber.php');
+require_once(INSTALLDIR . '/lib/queuehandler.php');
+
+set_error_handler('common_error_handler');
+
+class XmppQueueHandler {
+       
+       function transport() {
+               return 'jabber';
+       }
+       
+       function start() {
+               # Low priority; we don't want to receive messages
+               $this->conn = jabber_connect($this->resource, NULL, -100);
+       }
+
+       function handle_notice($notice) {
+               jabber_broadcast_notice($notice);
+       }
+       
+       function finish() {
+       }
+}
+
+mb_internal_encoding('UTF-8');
+
+$resource = ($argc > 1) ? $argv[1] : NULL;
+
+$handler = new XmppQueueHandler($resource);
+
+if ($handler->start()) {
+       $handler->handle_queue();
+}
+
+$handler->finish();