include_once $dir . '/'.strtolower($cls).'.php';
return false;
case 'Fake_Irc':
+ case 'Irc_waiting_message':
case 'ChannelResponseChannel':
include_once $dir . '/'. $cls .'.php';
return false;
return true;
}
+ /**
+ * Ensure the database table is present
+ *
+ */
+ public function onCheckSchema() {
+ $schema = Schema::get();
+
+ // For storing messages while sessions become ready
+ $schema->ensureTable('irc_waiting_message',
+ array(new ColumnDef('id', 'integer', null,
+ false, 'PRI', null, null, true),
+ new ColumnDef('data', 'blob', null, false),
+ new ColumnDef('prioritise', 'tinyint', 1, false),
+ new ColumnDef('created', 'datetime', null, false),
+ new ColumnDef('claimed', 'datetime')));
+
+ return true;
+ }
+
/**
* Get a microid URI for the given screenname
*
$lines = explode("\n", $body);
foreach ($lines as $line) {
$this->fake_irc->doPrivmsg($screenname, $line);
- $this->enqueue_outgoing_raw(array('type' => 'message', 'data' => $this->fake_irc->would_be_sent));
+ $this->enqueue_outgoing_raw(array('type' => 'message', 'prioritise' => 0, 'data' => $this->fake_irc->would_be_sent));
}
return true;
}
$this->enqueue_outgoing_raw(
array(
'type' => 'nickcheck',
+ 'prioritise' => 1,
'data' => $this->fake_irc->would_be_sent,
'nickdata' =>
array(
--- /dev/null
+<?php\r
+/**\r
+ * Table Definition for irc_waiting_message\r
+ */\r
+require_once INSTALLDIR.'/classes/Memcached_DataObject.php';\r
+\r
+class Irc_waiting_message extends Memcached_DataObject {\r
+\r
+ public $__table = 'irc_waiting_message'; // table name\r
+ public $id; // int primary_key not_null auto_increment\r
+ public $data; // blob not_null\r
+ public $prioritise; // tinyint(1) not_null\r
+ public $created; // datetime() not_null\r
+ public $claimed; // datetime()\r
+\r
+ /* Static get */\r
+ public function staticGet($k, $v = null) {\r
+ return Memcached_DataObject::staticGet('Irc_waiting_message', $k, $v);\r
+ }\r
+\r
+ /**\r
+ * return table definition for DB_DataObject\r
+ *\r
+ * DB_DataObject needs to know something about the table to manipulate\r
+ * instances. This method provides all the DB_DataObject needs to know.\r
+ *\r
+ * @return array array of column definitions\r
+ */\r
+ public function table() {\r
+ return array('id' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL,\r
+ 'data' => DB_DATAOBJECT_BLOB + DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,\r
+ 'prioritise' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL,\r
+ 'created' => DB_DATAOBJECT_TIME + DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,\r
+ 'claimed' => DB_DATAOBJECT_TIME + DB_DATAOBJECT_STR);\r
+ }\r
+\r
+ /**\r
+ * return key definitions for DB_DataObject\r
+ *\r
+ * DB_DataObject needs to know about keys that the table has, since it\r
+ * won't appear in StatusNet's own keys list. In most cases, this will\r
+ * simply reference your keyTypes() function.\r
+ *\r
+ * @return array list of key field names\r
+ */\r
+ public function keys() {\r
+ return array_keys($this->keyTypes());\r
+ }\r
+\r
+ /**\r
+ * return key definitions for Memcached_DataObject\r
+ *\r
+ * Our caching system uses the same key definitions, but uses a different\r
+ * method to get them. This key information is used to store and clear\r
+ * cached data, so be sure to list any key that will be used for static\r
+ * lookups.\r
+ *\r
+ * @return array associative array of key definitions, field name to type:\r
+ * 'K' for primary key: for compound keys, add an entry for each component;\r
+ * 'U' for unique keys: compound keys are not well supported here.\r
+ */\r
+ public function keyTypes() {\r
+ return array('id' => 'K');\r
+ }\r
+\r
+ /**\r
+ * Magic formula for non-autoincrementing integer primary keys\r
+ *\r
+ * If a table has a single integer column as its primary key, DB_DataObject\r
+ * assumes that the column is auto-incrementing and makes a sequence table\r
+ * to do this incrementation. Since we don't need this for our class, we\r
+ * overload this method and return the magic formula that DB_DataObject needs.\r
+ *\r
+ * @return array magic three-false array that stops auto-incrementing.\r
+ */\r
+ public function sequenceKey() {\r
+ return array(false, false, false);\r
+ }\r
+\r
+ /**\r
+ * Get the next item in the queue\r
+ *\r
+ * @return Irc_waiting_message Next message if there is one\r
+ */\r
+ public static function top() {\r
+ $wm = new Irc_waiting_message();\r
+\r
+ $wm->orderBy('prioritise DESC, created');\r
+ $wm->whereAdd('claimed is null');\r
+\r
+ $wm->limit(1);\r
+\r
+ $cnt = $wm->find(true);\r
+\r
+ if ($cnt) {\r
+ # XXX: potential race condition\r
+ # can we force it to only update if claimed is still null\r
+ # (or old)?\r
+ common_log(LOG_INFO, 'claiming IRC waiting message id = ' . $wm->id);\r
+ $orig = clone($wm);\r
+ $wm->claimed = common_sql_now();\r
+ $result = $wm->update($orig);\r
+ if ($result) {\r
+ common_log(LOG_INFO, 'claim succeeded.');\r
+ return $wm;\r
+ } else {\r
+ common_log(LOG_INFO, 'claim failed.');\r
+ }\r
+ }\r
+ $wm = null;\r
+ return null;\r
+ }\r
+\r
+ /**\r
+ * Release a claimed item.\r
+ */\r
+ public function releaseClaim() {\r
+ // DB_DataObject doesn't let us save nulls right now\r
+ $sql = sprintf("UPDATE irc_waiting_message SET claimed=NULL WHERE id=%d", $this->id);\r
+ $this->query($sql);\r
+\r
+ $this->claimed = null;\r
+ $this->encache();\r
+ }\r
+}\r
*/\r
protected $regCallback;\r
\r
+ /**\r
+ * Connection established callback details\r
+ *\r
+ * @var array\r
+ */\r
+ protected $connectedCallback;\r
+\r
/**\r
* Load callback from config\r
*/\r
$this->regCallback = NULL;\r
}\r
\r
+ $connectedCallback = $this->config['statusnet.connectedcallback'];\r
+ if (is_callable($connectedCallback)) {\r
+ $this->connectedCallback = $connectedCallback;\r
+ } else {\r
+ $this->connectedCallback = NULL;\r
+ }\r
+\r
$this->unregRegexp = $this->getConfig('statusnet.unregregexp', '/\x02(.*?)\x02 (?:isn\'t|is not) registered/i');\r
$this->regRegexp = $this->getConfig('statusnet.regregexp', '/(?:\A|\x02)(\w+?)\x02? (?:\(account|is \w+?\z)/i');\r
}\r
}\r
}\r
}\r
+\r
+ /**\r
+ * Intercepts the end of the "message of the day" response and tells\r
+ * StatusNet we're connected\r
+ *\r
+ * @return void\r
+ */\r
+ public function onResponse() {\r
+ switch ($this->getEvent()->getCode()) {\r
+ case Phergie_Event_Response::RPL_ENDOFMOTD:\r
+ case Phergie_Event_Response::ERR_NOMOTD:\r
+ if ($this->connectedCallback !== NULL) {\r
+ call_user_func($this->connectedCallback);\r
+ }\r
+ }\r
+ }\r
}\r
class IrcManager extends ImManager {
protected $conn = null;
protected $lastPing = null;
+ protected $messageWaiting = true;
+ protected $lastMessage = null;
protected $regChecks = array();
protected $regChecksLookup = array();
+ protected $connected = false;
+
/**
* Initialize connection to server.
*
}
}
+ public function timeout() {
+ return 1;
+ }
+
/**
* Idle processing for io manager's execution loop.
- * Send keepalive pings to server.
*
* @return void
*/
public function idle() {
+ // Send a ping if necessary
if (empty($this->lastPing) || time() - $this->lastPing > 120) {
$this->sendPing();
}
+
+ if ($this->connected) {
+ // Send a waiting message if appropriate
+ if ($this->messageWaiting && time() - $this->lastMessage > 1) {
+ $wm = Irc_waiting_message::top();
+ if ($wm === NULL) {
+ $this->messageWaiting = false;
+ return;
+ }
+ $data = unserialize($wm->data);
+
+ if (!$this->send_raw_message($data)) {
+ $this->plugin->enqueue_outgoing_raw($data);
+ }
+
+ $wm->delete();
+ }
+ }
}
/**
try {
$this->conn->handleEvents();
} catch (Phergie_Driver_Exception $e) {
+ $this->connected = false;
$this->conn->reconnect();
}
}
'statusnet.messagecallback' => array($this, 'handle_irc_message'),
'statusnet.regcallback' => array($this, 'handle_reg_response'),
+ 'statusnet.connectedcallback' => array($this, 'handle_connected'),
'statusnet.unregregexp' => $this->plugin->unregregexp,
'statusnet.regregexp' => $this->plugin->regregexp
)
$this->conn->setConfig($config);
$this->conn->connect();
$this->lastPing = time();
+ $this->lastMessage = time();
}
return $this->conn;
}
}
}
+ /**
+ * Called when the connection is established
+ *
+ * @return void
+ */
+ public function handle_connected() {
+ $this->connected = true;
+ }
+
+ /**
+ * Enters a message into the database for sending when ready
+ *
+ * @param string $command Command
+ * @param array $args Arguments
+ * @return boolean
+ */
+ protected function enqueue_waiting_message($data) {
+ $wm = new Irc_waiting_message();
+
+ $wm->data = serialize($data);
+ $wm->prioritise = $data['prioritise'];
+ $wm->created = common_sql_now();
+ $result = $wm->insert();
+
+ if (!$result) {
+ common_log_db_error($wm, 'INSERT', __FILE__);
+ throw new ServerException('DB error inserting IRC waiting queue item');
+ }
+
+ return true;
+ }
+
/**
* Send a message using the daemon
*
return false;
}
- if ($data['type'] != 'message') {
- // Nick checking
- $nickdata = $data['nickdata'];
- $usernick = $nickdata['user']->nickname;
- $screenname = $nickdata['screenname'];
+ if ($data['type'] != 'delayedmessage') {
+ if ($data['type'] != 'message') {
+ // Nick checking
+ $nickdata = $data['nickdata'];
+ $usernick = $nickdata['user']->nickname;
+ $screenname = $nickdata['screenname'];
- // Cancel any existing checks for this user
- if (isset($this->regChecksLookup[$usernick])) {
- unset($this->regChecks[$this->regChecksLookup[$usernick]]);
+ // Cancel any existing checks for this user
+ if (isset($this->regChecksLookup[$usernick])) {
+ unset($this->regChecks[$this->regChecksLookup[$usernick]]);
+ }
+
+ $this->regChecks[$screenname] = $nickdata;
+ $this->regChecksLookup[$usernick] = $screenname;
}
- $this->regChecks[$screenname] = $nickdata;
- $this->regChecksLookup[$usernick] = $screenname;
+ // If there is a backlog or we need to wait, queue the message
+ if ($this->messageWaiting || time() - $this->lastMessage < 1) {
+ $this->enqueue_waiting_message(
+ array(
+ 'type' => 'delayedmessage',
+ 'prioritise' => $data['prioritise'],
+ 'data' => $data['data']
+ )
+ );
+ $this->messageWaiting = true;
+ return true;
+ }
}
try {
$this->conn->send($data['data']['command'], $data['data']['args']);
} catch (Phergie_Driver_Exception $e) {
+ $this->connected = false;
$this->conn->reconnect();
return false;
}
+ $this->lastMessage = time();
return true;
}