X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=extlib%2FXMPPHP%2FXMLStream.php;h=04877fda4c8636672f1e91d5e8669f7f7e010dc0;hb=69a1cea319f22362d197cb8a0fc9ab19abdf8548;hp=d23585c14ed9f79d49b3d21c0aac100fc3948bcd;hpb=f7865b1d410e303bb5dc2549d12823f37149b8e5;p=quix0rs-gnu-social.git diff --git a/extlib/XMPPHP/XMLStream.php b/extlib/XMPPHP/XMLStream.php index d23585c14e..04877fda4c 100644 --- a/extlib/XMPPHP/XMLStream.php +++ b/extlib/XMPPHP/XMLStream.php @@ -153,6 +153,10 @@ class XMPPHP_XMLStream { * @var boolean */ protected $use_ssl = false; + /** + * @var integer + */ + protected $reconnectTimeout = 30; /** * Constructor @@ -253,29 +257,38 @@ class XMPPHP_XMLStream { * @param boolean $sendinit */ public function connect($timeout = 30, $persistent = false, $sendinit = true) { - $this->disconnected = false; - $this->sent_disconnect = false; - if($persistent) { - $conflag = STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT; + $starttime = time(); + + do { + $this->disconnected = false; + $this->sent_disconnect = false; + if($persistent) { + $conflag = STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT; + } else { + $conflag = STREAM_CLIENT_CONNECT; + } + $conntype = 'tcp'; + if($this->use_ssl) $conntype = 'ssl'; + $this->log->log("Connecting to $conntype://{$this->host}:{$this->port}"); + try { + $this->socket = @stream_socket_client("$conntype://{$this->host}:{$this->port}", $errno, $errstr, $timeout, $conflag); + } catch (Exception $e) { + throw new XMPPHP_Exception($e->getMessage()); + } + if(!$this->socket) { + $this->log->log("Could not connect.", XMPPHP_Log::LEVEL_ERROR); + $this->disconnected = true; + # Take it easy for a few seconds + sleep(min($timeout, 5)); + } + } while (!$this->socket && (time() - $starttime) < $timeout); + + if ($this->socket) { + stream_set_blocking($this->socket, 1); + if($sendinit) $this->send($this->stream_start); } else { - $conflag = STREAM_CLIENT_CONNECT; - } - $conntype = 'tcp'; - if($this->use_ssl) $conntype = 'ssl'; - $this->log->log("Connecting to $conntype://{$this->host}:{$this->port}"); - try { - $this->socket = @stream_socket_client("$conntype://{$this->host}:{$this->port}", $errno, $errstr, $timeout, $conflag); - } catch (Exception $e) { - throw new XMPPHP_Exception($e->getMessage()); + throw new XMPPHP_Exception("Could not connect before timeout."); } - if(!$this->socket) { - $this->log->log("Could not connect.", XMPPHP_Log::LEVEL_ERROR); - $this->disconnected = true; - - throw new XMPPHP_Exception('Could not connect.'); - } - stream_set_blocking($this->socket, 1); - if($sendinit) $this->send($this->stream_start); } /** @@ -283,12 +296,17 @@ class XMPPHP_XMLStream { */ public function doReconnect() { if(!$this->is_server) { - $this->log->log("Reconnecting...", XMPPHP_Log::LEVEL_WARNING); - $this->connect(30, false, false); + $this->log->log("Reconnecting ($this->reconnectTimeout)...", XMPPHP_Log::LEVEL_WARNING); + $this->connect($this->reconnectTimeout, false, false); $this->reset(); + $this->event('reconnect'); } } + public function setReconnectTimeout($timeout) { + $this->reconnectTimeout = $timeout; + } + /** * Disconnect from XMPP Host */ @@ -319,12 +337,10 @@ class XMPPHP_XMLStream { private function __process($maximum=0) { - $this->log->log("__process($maximum)", XMPPHP_Log::LEVEL_VERBOSE); - $remaining = $maximum; do { - $starttime = microtime(); + $starttime = (microtime(true) * 1000000); $read = array($this->socket); $write = array(); $except = array(); @@ -335,30 +351,28 @@ class XMPPHP_XMLStream { $secs = 0; $usecs = 0; } else { - $secs = $remaining / 1000000; $usecs = $remaining % 1000000; + $secs = floor(($remaining - $usecs) / 1000000); } - $this->log->log("stream_select(read, write, except, $secs, $usecs)", XMPPHP_Log::LEVEL_VERBOSE); $updated = @stream_select($read, $write, $except, $secs, $usecs); if ($updated === false) { $this->log->log("Error on stream_select()", XMPPHP_Log::LEVEL_VERBOSE); if ($this->reconnect) { $this->doReconnect(); } else { - $this->log->log("Giving up", XMPPHP_Log::LEVEL_VERBOSE); fclose($this->socket); + $this->socket = NULL; return false; } } else if ($updated > 0) { # XXX: Is this big enough? - $this->log->log("Reading from socket", XMPPHP_Log::LEVEL_VERBOSE); $buff = @fread($this->socket, 4096); if(!$buff) { if($this->reconnect) { $this->doReconnect(); } else { - $this->log->log("Error on fread(), reconnect", XMPPHP_Log::LEVEL_VERBOSE); fclose($this->socket); + $this->socket = NULL; return false; } } @@ -367,7 +381,9 @@ class XMPPHP_XMLStream { } else { # $updated == 0 means no changes during timeout. } - $remaining -= (microtime() - $starttime); + $endtime = (microtime(true)*1000000); + $time_past = $endtime - $starttime; + $remaining = $remaining - $time_past; } while (is_null($maximum) || $remaining > 0); return true; } @@ -595,34 +611,47 @@ class XMPPHP_XMLStream { * * @param string $msg */ - public function send($msg, $rec=false) { - if($this->time() - $this->last_send < .1) { - usleep(100000); + public function send($msg, $timeout=NULL) { + + if (is_null($timeout)) { + $secs = NULL; + $usecs = NULL; + } else if ($timeout == 0) { + $secs = 0; + $usecs = 0; + } else { + $maximum = $timeout * 1000000; + $usecs = $maximum % 1000000; + $secs = floor(($maximum - $usecs) / 1000000); } - $wait = true; - while($wait) { - $read = null; - $write = array($this->socket); - $except = null; - $select = @stream_select($read, $write, $except, 0, 0); - if($select === False) { - $this->doReconnect(); - return false; - } elseif ($select > 0) { - $wait = false; - } else { - usleep(100000); - //$this->processTime(.25); - } + + $read = array(); + $write = array($this->socket); + $except = array(); + + $select = @stream_select($read, $write, $except, $secs, $usecs); + + if($select === False) { + $this->log->log("ERROR sending message; reconnecting."); + $this->doReconnect(); + # TODO: retry send here + return false; + } elseif ($select > 0) { + $this->log->log("Socket is ready; send it."); + } else { + $this->log->log("Socket is not ready; break."); + return false; } - $sentbytes = @fwrite($this->socket, $msg, 1024); - $this->last_send = $this->time(); + + $sentbytes = @fwrite($this->socket, $msg); $this->log->log("SENT: " . mb_substr($msg, 0, $sentbytes, '8bit'), XMPPHP_Log::LEVEL_VERBOSE); if($sentbytes === FALSE) { + $this->log->log("ERROR sending message; reconnecting."); $this->doReconnect(); - } elseif ($sentbytes != mb_strlen($msg, '8bit')) { - $this->send(mb_substr($msg, $sentbytes, mb_strlen($msg, '8bit'), '8bit'), true); + return false; } + $this->log->log("Successfully sent $sentbytes bytes."); + return $sentbytes; } public function time() {