]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
First commit of message throttling code
authorLuke Fitzgerald <lw.fitzgerald@googlemail.com>
Wed, 11 Aug 2010 00:27:02 +0000 (17:27 -0700)
committerLuke Fitzgerald <lw.fitzgerald@googlemail.com>
Wed, 11 Aug 2010 00:27:02 +0000 (17:27 -0700)
plugins/Irc/IrcPlugin.php
plugins/Irc/Irc_waiting_message.php [new file with mode: 0644]
plugins/Irc/extlib/phergie/Phergie/Plugin/Statusnet.php
plugins/Irc/ircmanager.php

index 85c348d9b33200772b34a329897ded97529fbe27..54b7585211e288944b2d9afb0f60f08489b92600 100644 (file)
@@ -126,6 +126,7 @@ class IrcPlugin extends ImPlugin {
                 include_once $dir . '/'.strtolower($cls).'.php';
                 return false;
             case 'Fake_Irc':
+            case 'Irc_waiting_message':
             case 'ChannelResponseChannel':
                 include_once $dir . '/'. $cls .'.php';
                 return false;
@@ -150,6 +151,25 @@ class IrcPlugin extends ImPlugin {
         return true;
     }
 
+    /**
+    * Ensure the database table is present
+    *
+    */
+    public function onCheckSchema() {
+        $schema = Schema::get();
+
+        // For storing messages while sessions become ready
+        $schema->ensureTable('irc_waiting_message',
+                             array(new ColumnDef('id', 'integer', null,
+                                                 false, 'PRI', null, null, true),
+                                   new ColumnDef('data', 'blob', null, false),
+                                   new ColumnDef('prioritise', 'tinyint', 1, false),
+                                   new ColumnDef('created', 'datetime', null, false),
+                                   new ColumnDef('claimed', 'datetime')));
+
+        return true;
+    }
+
     /**
     * Get a microid URI for the given screenname
     *
@@ -171,7 +191,7 @@ class IrcPlugin extends ImPlugin {
         $lines = explode("\n", $body);
         foreach ($lines as $line) {
             $this->fake_irc->doPrivmsg($screenname, $line);
-            $this->enqueue_outgoing_raw(array('type' => 'message', 'data' => $this->fake_irc->would_be_sent));
+            $this->enqueue_outgoing_raw(array('type' => 'message', 'prioritise' => 0, 'data' => $this->fake_irc->would_be_sent));
         }
         return true;
     }
@@ -297,6 +317,7 @@ class IrcPlugin extends ImPlugin {
         $this->enqueue_outgoing_raw(
             array(
                 'type' => 'nickcheck',
+                'prioritise' => 1,
                 'data' => $this->fake_irc->would_be_sent,
                 'nickdata' =>
                     array(
diff --git a/plugins/Irc/Irc_waiting_message.php b/plugins/Irc/Irc_waiting_message.php
new file mode 100644 (file)
index 0000000..05f7275
--- /dev/null
@@ -0,0 +1,125 @@
+<?php\r
+/**\r
+ * Table Definition for irc_waiting_message\r
+ */\r
+require_once INSTALLDIR.'/classes/Memcached_DataObject.php';\r
+\r
+class Irc_waiting_message extends Memcached_DataObject {\r
+\r
+    public $__table = 'irc_waiting_message'; // table name\r
+    public $id;                              // int primary_key not_null auto_increment\r
+    public $data;                            // blob not_null\r
+    public $prioritise;                      // tinyint(1) not_null\r
+    public $created;                         // datetime() not_null\r
+    public $claimed;                         // datetime()\r
+\r
+    /* Static get */\r
+    public function staticGet($k, $v = null) {\r
+        return Memcached_DataObject::staticGet('Irc_waiting_message', $k, $v);\r
+    }\r
+\r
+    /**\r
+    * return table definition for DB_DataObject\r
+    *\r
+    * DB_DataObject needs to know something about the table to manipulate\r
+    * instances. This method provides all the DB_DataObject needs to know.\r
+    *\r
+    * @return array array of column definitions\r
+    */\r
+    public function table() {\r
+        return array('id' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL,\r
+                     'data' => DB_DATAOBJECT_BLOB + DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,\r
+                     'prioritise' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL,\r
+                     'created' => DB_DATAOBJECT_TIME + DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,\r
+                     'claimed' => DB_DATAOBJECT_TIME + DB_DATAOBJECT_STR);\r
+    }\r
+\r
+    /**\r
+    * return key definitions for DB_DataObject\r
+    *\r
+    * DB_DataObject needs to know about keys that the table has, since it\r
+    * won't appear in StatusNet's own keys list. In most cases, this will\r
+    * simply reference your keyTypes() function.\r
+    *\r
+    * @return array list of key field names\r
+    */\r
+    public function keys() {\r
+        return array_keys($this->keyTypes());\r
+    }\r
+\r
+    /**\r
+    * return key definitions for Memcached_DataObject\r
+    *\r
+    * Our caching system uses the same key definitions, but uses a different\r
+    * method to get them. This key information is used to store and clear\r
+    * cached data, so be sure to list any key that will be used for static\r
+    * lookups.\r
+    *\r
+    * @return array associative array of key definitions, field name to type:\r
+    *         'K' for primary key: for compound keys, add an entry for each component;\r
+    *         'U' for unique keys: compound keys are not well supported here.\r
+    */\r
+    public function keyTypes() {\r
+        return array('id' => 'K');\r
+    }\r
+\r
+    /**\r
+    * Magic formula for non-autoincrementing integer primary keys\r
+    *\r
+    * If a table has a single integer column as its primary key, DB_DataObject\r
+    * assumes that the column is auto-incrementing and makes a sequence table\r
+    * to do this incrementation. Since we don't need this for our class, we\r
+    * overload this method and return the magic formula that DB_DataObject needs.\r
+    *\r
+    * @return array magic three-false array that stops auto-incrementing.\r
+    */\r
+    public function sequenceKey() {\r
+        return array(false, false, false);\r
+    }\r
+\r
+    /**\r
+     * Get the next item in the queue\r
+     *\r
+     * @return Irc_waiting_message Next message if there is one\r
+     */\r
+    public static function top() {\r
+        $wm = new Irc_waiting_message();\r
+\r
+        $wm->orderBy('prioritise DESC, created');\r
+        $wm->whereAdd('claimed is null');\r
+\r
+        $wm->limit(1);\r
+\r
+        $cnt = $wm->find(true);\r
+\r
+        if ($cnt) {\r
+            # XXX: potential race condition\r
+            # can we force it to only update if claimed is still null\r
+            # (or old)?\r
+            common_log(LOG_INFO, 'claiming IRC waiting message id = ' . $wm->id);\r
+            $orig = clone($wm);\r
+            $wm->claimed = common_sql_now();\r
+            $result = $wm->update($orig);\r
+            if ($result) {\r
+                common_log(LOG_INFO, 'claim succeeded.');\r
+                return $wm;\r
+            } else {\r
+                common_log(LOG_INFO, 'claim failed.');\r
+            }\r
+        }\r
+        $wm = null;\r
+        return null;\r
+    }\r
+\r
+    /**\r
+     * Release a claimed item.\r
+     */\r
+    public function releaseClaim() {\r
+        // DB_DataObject doesn't let us save nulls right now\r
+        $sql = sprintf("UPDATE irc_waiting_message SET claimed=NULL WHERE id=%d", $this->id);\r
+        $this->query($sql);\r
+\r
+        $this->claimed = null;\r
+        $this->encache();\r
+    }\r
+}\r
index fb75f1d798a10bb44fd316c6173bceb6e601bd66..dc2680a6dfe49a036f63a2c5dc911c1d460a5b84 100644 (file)
@@ -41,6 +41,13 @@ class Phergie_Plugin_Statusnet extends Phergie_Plugin_Abstract {
     */\r
     protected $regCallback;\r
 \r
+    /**\r
+    * Connection established callback details\r
+    *\r
+    * @var array\r
+    */\r
+    protected $connectedCallback;\r
+\r
     /**\r
     * Load callback from config\r
     */\r
@@ -59,6 +66,13 @@ class Phergie_Plugin_Statusnet extends Phergie_Plugin_Abstract {
             $this->regCallback = NULL;\r
         }\r
 \r
+        $connectedCallback = $this->config['statusnet.connectedcallback'];\r
+        if (is_callable($connectedCallback)) {\r
+            $this->connectedCallback = $connectedCallback;\r
+        } else {\r
+            $this->connectedCallback = NULL;\r
+        }\r
+\r
         $this->unregRegexp = $this->getConfig('statusnet.unregregexp', '/\x02(.*?)\x02 (?:isn\'t|is not) registered/i');\r
         $this->regRegexp = $this->getConfig('statusnet.regregexp', '/(?:\A|\x02)(\w+?)\x02? (?:\(account|is \w+?\z)/i');\r
     }\r
@@ -110,4 +124,20 @@ class Phergie_Plugin_Statusnet extends Phergie_Plugin_Abstract {
             }\r
         }\r
     }\r
+\r
+    /**\r
+     * Intercepts the end of the "message of the day" response and tells\r
+     * StatusNet we're connected\r
+     *\r
+     * @return void\r
+     */\r
+    public function onResponse() {\r
+        switch ($this->getEvent()->getCode()) {\r
+        case Phergie_Event_Response::RPL_ENDOFMOTD:\r
+        case Phergie_Event_Response::ERR_NOMOTD:\r
+            if ($this->connectedCallback !== NULL) {\r
+                call_user_func($this->connectedCallback);\r
+            }\r
+        }\r
+    }\r
 }\r
index 96c019b7fc0400718e43e3664fafa8d5761d1dd7..7e2b32b23e7aa1e50c76d45a428a44a10a8cc65a 100644 (file)
@@ -32,10 +32,14 @@ if (!defined('STATUSNET') && !defined('LACONICA')) { exit(1); }
 class IrcManager extends ImManager {
     protected $conn = null;
     protected $lastPing = null;
+    protected $messageWaiting = true;
+    protected $lastMessage = null;
 
     protected $regChecks = array();
     protected $regChecksLookup = array();
 
+    protected $connected = false;
+
     /**
      * Initialize connection to server.
      *
@@ -65,16 +69,38 @@ class IrcManager extends ImManager {
         }
     }
 
+    public function timeout() {
+        return 1;
+    }
+
     /**
      * Idle processing for io manager's execution loop.
-     * Send keepalive pings to server.
      *
      * @return void
      */
     public function idle() {
+        // Send a ping if necessary
         if (empty($this->lastPing) || time() - $this->lastPing > 120) {
             $this->sendPing();
         }
+
+        if ($this->connected) {
+            // Send a waiting message if appropriate
+            if ($this->messageWaiting && time() - $this->lastMessage > 1) {
+                $wm = Irc_waiting_message::top();
+                if ($wm === NULL) {
+                    $this->messageWaiting = false;
+                    return;
+                }
+                $data = unserialize($wm->data);
+
+                if (!$this->send_raw_message($data)) {
+                    $this->plugin->enqueue_outgoing_raw($data);
+                }
+
+                $wm->delete();
+            }
+        }
     }
 
     /**
@@ -90,6 +116,7 @@ class IrcManager extends ImManager {
         try {
             $this->conn->handleEvents();
         } catch (Phergie_Driver_Exception $e) {
+            $this->connected = false;
             $this->conn->reconnect();
         }
     }
@@ -142,6 +169,7 @@ class IrcManager extends ImManager {
 
                     'statusnet.messagecallback' => array($this, 'handle_irc_message'),
                     'statusnet.regcallback' => array($this, 'handle_reg_response'),
+                    'statusnet.connectedcallback' => array($this, 'handle_connected'),
                     'statusnet.unregregexp' => $this->plugin->unregregexp,
                     'statusnet.regregexp' => $this->plugin->regregexp
                 )
@@ -150,6 +178,7 @@ class IrcManager extends ImManager {
             $this->conn->setConfig($config);
             $this->conn->connect();
             $this->lastPing = time();
+            $this->lastMessage = time();
         }
         return $this->conn;
     }
@@ -211,6 +240,38 @@ class IrcManager extends ImManager {
         }
     }
 
+    /**
+    * Called when the connection is established
+    *
+    * @return void
+    */
+    public function handle_connected() {
+        $this->connected = true;
+    }
+
+    /**
+    * Enters a message into the database for sending when ready
+    *
+    * @param string $command Command
+    * @param array $args Arguments
+    * @return boolean
+    */
+    protected function enqueue_waiting_message($data) {
+        $wm = new Irc_waiting_message();
+
+        $wm->data       = serialize($data);
+        $wm->prioritise = $data['prioritise'];
+        $wm->created    = common_sql_now();
+        $result         = $wm->insert();
+
+        if (!$result) {
+            common_log_db_error($wm, 'INSERT', __FILE__);
+            throw new ServerException('DB error inserting IRC waiting queue item');
+        }
+
+        return true;
+    }
+
     /**
      * Send a message using the daemon
      *
@@ -223,28 +284,45 @@ class IrcManager extends ImManager {
             return false;
         }
 
-        if ($data['type'] != 'message') {
-            // Nick checking
-            $nickdata = $data['nickdata'];
-            $usernick = $nickdata['user']->nickname;
-            $screenname = $nickdata['screenname'];
+        if ($data['type'] != 'delayedmessage') {
+            if ($data['type'] != 'message') {
+                // Nick checking
+                $nickdata = $data['nickdata'];
+                $usernick = $nickdata['user']->nickname;
+                $screenname = $nickdata['screenname'];
 
-            // Cancel any existing checks for this user
-            if (isset($this->regChecksLookup[$usernick])) {
-                unset($this->regChecks[$this->regChecksLookup[$usernick]]);
+                // Cancel any existing checks for this user
+                if (isset($this->regChecksLookup[$usernick])) {
+                    unset($this->regChecks[$this->regChecksLookup[$usernick]]);
+                }
+
+                $this->regChecks[$screenname] = $nickdata;
+                $this->regChecksLookup[$usernick] = $screenname;
             }
 
-            $this->regChecks[$screenname] = $nickdata;
-            $this->regChecksLookup[$usernick] = $screenname;
+            // If there is a backlog or we need to wait, queue the message
+            if ($this->messageWaiting || time() - $this->lastMessage < 1) {
+                $this->enqueue_waiting_message(
+                    array(
+                        'type' => 'delayedmessage',
+                        'prioritise' => $data['prioritise'],
+                        'data' => $data['data']
+                    )
+                );
+                $this->messageWaiting = true;
+                return true;
+            }
         }
 
         try {
             $this->conn->send($data['data']['command'], $data['data']['args']);
         } catch (Phergie_Driver_Exception $e) {
+            $this->connected = false;
             $this->conn->reconnect();
             return false;
         }
 
+        $this->lastMessage = time();
         return true;
     }