require_once(INSTALLDIR.'/plugins/Msn/extlib/phpmsnclass/msn.class.php');\r
return false;\r
case 'MsnManager':\r
+ case 'Msn_waiting_message':\r
include_once $dir . '/'.strtolower($cls).'.php';\r
return false;\r
default:\r
return true;\r
}\r
\r
+ /**\r
+ * Ensure the database table is present\r
+ *\r
+ */\r
+ public function onCheckSchema() {\r
+ $schema = Schema::get();\r
+\r
+ // For storing messages while sessions become ready\r
+ $schema->ensureTable('msn_waiting_message',\r
+ array(new ColumnDef('id', 'integer', null,\r
+ false, 'PRI', null, null, true),\r
+ new ColumnDef('screenname', 'integer', null, false),\r
+ new ColumnDef('message', 'text', null, false),\r
+ new ColumnDef('created', 'datetime', null, false),\r
+ new ColumnDef('claimed', 'datetime')));\r
+\r
+ return true;\r
+ }\r
+\r
/**\r
* Get a microid URI for the given screenname\r
*\r
// SB: <<< IRO {id} {rooster} {roostercount} {email} {alias} {clientid}\r
@list(/* IRO */, /* id */, $cur_num, $total, $email, $alias, $clientid) = @explode(' ', $data);\r
$this->debug_message("*** $email joined session");\r
- $session['joined'] = true;\r
+ if ($email == $session['to']) {\r
+ $session['joined'] = true;\r
+ $this->callHandler('SessionReady', array('to' => $email));\r
+ }\r
break;\r
case 'BYE':\r
$this->debug_message("*** Quit for BYE");\r
case 'JOI':\r
// SB: <<< JOI {user} {alias} {clientid?}\r
// someone join us\r
- // we don't need the data, just ignore it\r
- // no more user here\r
- $session['joined'] = true;\r
+ @list(/* JOI */, $email) = @explode(' ', $data);\r
+ if ($email == $session['to']) {\r
+ $session['joined'] = true;\r
+ $this->callHandler('SessionReady', array('to' => $email));\r
+ }\r
break;\r
case 'MSG':\r
// SB: <<< MSG {email} {alias} {len}\r
\r
if ($this->sb_writeln($socket, $id, "MSG $id N $len") === false ||\r
$this->sb_writedata($socket, $SendString) === false) {\r
+ $this->endSBSession($socket);\r
return false;\r
}\r
}\r
\r
if ($this->sb_writeln($socket, $id, "MSG $id N $len") === false ||\r
$this->sb_writedata($socket, $aMessage) === false) {\r
+ $this->endSBSession($socket);\r
return false;\r
}\r
\r
* where network is 1 for MSN, 32 for Yahoo\r
* and 'Offline' for offline messages\r
* @param string $message Message\r
+ * @param boolean &$waitForSession Boolean passed by reference,\r
+ * if set to true on return, message\r
+ * did not fail to send but is\r
+ * waiting for a valid session\r
+ *\r
+ * @return boolean true on success\r
*/\r
- public function sendMessage($to, $message) {\r
+ public function sendMessage($to, $message, &$waitForSession) {\r
if ($message != '') {\r
$toParts = explode('@', $to);\r
if(count($toParts) < 3) {\r
$this->debug_message("*** No existing SB session or request has timed out");\r
$this->reqSBSession($recipient);\r
}\r
+\r
+ $waitForSession = true;\r
return false;\r
} else {\r
$socket = $this->switchBoardSessionLookup[$recipient];\r
if ($this->switchBoardSessions[$intsocket]['offline']) {\r
$this->debug_message("*** Contact ($recipient) offline, sending OIM");\r
$this->endSBSession($socket);\r
+ $waitForSession = false;\r
return $this->sendMessage($recipient.'@Offline', $message);\r
} else {\r
if ($this->switchBoardSessions[$intsocket]['joined'] !== true) {\r
$this->debug_message("*** Recipient has not joined session, returning false");\r
+ $waitForSession = true;\r
return false;\r
}\r
\r
return true;\r
}\r
\r
+ $waitForSession = false;\r
return false;\r
}\r
}\r
* Registers a user handler\r
*\r
* Handler List\r
- * IMIn, Pong, ConnectFailed, Reconnect,\r
+ * IMIn, SessionReady, Pong, ConnectFailed, Reconnect,\r
* AddedToList, RemovedFromList, StatusChange\r
*\r
* @param string $event Event name\r
--- /dev/null
+<?php\r
+/**\r
+ * Table Definition for msn_plugin_message\r
+ */\r
+require_once INSTALLDIR.'/classes/Memcached_DataObject.php';\r
+\r
+class Msn_waiting_message extends Memcached_DataObject {\r
+\r
+ public $__table = 'msn_waiting_message'; // table name\r
+ public $id; // int primary_key not_null auto_increment\r
+ public $screenname; // varchar(255) not_null\r
+ public $message; // text not_null\r
+ public $created; // datetime() not_null\r
+ public $claimed; // datetime()\r
+\r
+ /* Static get */\r
+ public function staticGet($k, $v = null) {\r
+ return Memcached_DataObject::staticGet('Msn_waiting_message', $k, $v);\r
+ }\r
+\r
+ /**\r
+ * @param mixed $screenname screenname or array of screennames to pull from\r
+ * If not specified, checks all queues in the system.\r
+ */\r
+ public static function top($screenname = null) {\r
+ $wm = new Msn_waiting_message();\r
+ if ($screenname) {\r
+ if (is_array($screenname)) {\r
+ // @fixme use safer escaping\r
+ $list = implode("','", array_map('addslashes', $transports));\r
+ $wm->whereAdd("screename in ('$list')");\r
+ } else {\r
+ $wm->screenname = $screenname;\r
+ }\r
+ }\r
+ $wm->orderBy('created');\r
+ $wm->whereAdd('claimed is null');\r
+\r
+ $wm->limit(1);\r
+\r
+ $cnt = $wm->find(true);\r
+\r
+ if ($cnt) {\r
+ # XXX: potential race condition\r
+ # can we force it to only update if claimed is still null\r
+ # (or old)?\r
+ common_log(LOG_INFO, 'claiming msn waiting message id = ' . $wm->id);\r
+ $orig = clone($wm);\r
+ $wm->claimed = common_sql_now();\r
+ $result = $wm->update($orig);\r
+ if ($result) {\r
+ common_log(LOG_INFO, 'claim succeeded.');\r
+ return $wm;\r
+ } else {\r
+ common_log(LOG_INFO, 'claim failed.');\r
+ }\r
+ }\r
+ $wm = null;\r
+ return null;\r
+ }\r
+\r
+ /**\r
+ * Release a claimed item.\r
+ */\r
+ public function releaseClaim() {\r
+ // DB_DataObject doesn't let us save nulls right now\r
+ $sql = sprintf("UPDATE msn_waiting_message SET claimed=NULL WHERE id=%d", $this->id);\r
+ $this->query($sql);\r
+\r
+ $this->claimed = null;\r
+ $this->encache();\r
+ }\r
+}\r
)\r
);\r
$this->conn->registerHandler('IMin', array($this, 'handle_msn_message'));\r
+ $this->conn->registerHandler('SessionReady', array($this, 'handle_session_ready'));\r
$this->conn->registerHandler('Pong', array($this, 'update_ping_time'));\r
$this->conn->registerHandler('ConnectFailed', array($this, 'handle_connect_failed'));\r
$this->conn->registerHandler('Reconnect', array($this, 'handle_reconnect'));\r
\r
/**\r
* Update the time till the next ping\r
- * \r
+ *\r
* @param $data Time till next ping\r
* @return void\r
*/\r
return true;\r
}\r
\r
+ /**\r
+ * Called via a callback when a session becomes ready\r
+ *\r
+ * @param array $data Data\r
+ */\r
+ public function handle_session_ready($data) {\r
+ while (($wm = Msn_waiting_message::top($data['to']) != NULL)) {\r
+ if ($this->conn->sendMessage($wm->screenname, $wm->message, $ignore)) {\r
+ $wm->delete();\r
+ } else {\r
+ // Requeue the message in the regular queue\r
+ $this->plugin->send_message($wm->screenname, $wm->message);\r
+ }\r
+ }\r
+ }\r
+\r
/**\r
* Called by callback to log failure during connect\r
*\r
public function handle_reconnect($data) {\r
common_log(LOG_NOTICE, 'MSN reconnecting');\r
}\r
- \r
+\r
+ /**\r
+ * Enters a message into the database for sending via a callback\r
+ * when the session is established\r
+ *\r
+ * @param string $to Intended recipient\r
+ * @param string $message Message\r
+ */\r
+ private function enqueue_waiting_message($to, $message) {\r
+ $wm = new Msn_waiting_message();\r
+\r
+ $wm->screenname = $to;\r
+ $wm->message = $message;\r
+ $wm->created = common_sql_now();\r
+ $result = $wm->insert();\r
+\r
+ if (!$result) {\r
+ common_log_db_error($wm, 'INSERT', __FILE__);\r
+ throw new ServerException('DB error inserting queue item');\r
+ }\r
+\r
+ return true;\r
+ }\r
+\r
/**\r
* Send a message using the daemon\r
- * \r
- * @param $data Message\r
+ *\r
+ * @param $data Message data\r
* @return boolean true on success\r
*/\r
public function send_raw_message($data) {\r
return false;\r
}\r
\r
- if (!$this->conn->sendMessage($data['to'], $data['message'])) {\r
- return false;\r
+ $waitForSession = false;\r
+ if (!$this->conn->sendMessage($data['to'], $data['message'], $waitForSession)) {\r
+ if ($waitForSession) {\r
+ $this->enqueue_waiting_message($data['to'], $data['message']);\r
+ } else {\r
+ return false;\r
+ }\r
}\r
- \r
+\r
// Sending a command updates the time till next ping\r
$this->lastping = time();\r
$this->pingInterval = 50;\r