X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;f=extlib%2FXMPPHP%2FXMLStream.php;h=04877fda4c8636672f1e91d5e8669f7f7e010dc0;hb=69a1cea319f22362d197cb8a0fc9ab19abdf8548;hp=4de23ef7fba951c9635b11e7138d8a90a1358281;hpb=b69b206c97a97e89b9a7d80a1b4f7b65447f8039;p=quix0rs-gnu-social.git diff --git a/extlib/XMPPHP/XMLStream.php b/extlib/XMPPHP/XMLStream.php index 4de23ef7fb..04877fda4c 100644 --- a/extlib/XMPPHP/XMLStream.php +++ b/extlib/XMPPHP/XMLStream.php @@ -153,6 +153,10 @@ class XMPPHP_XMLStream { * @var boolean */ protected $use_ssl = false; + /** + * @var integer + */ + protected $reconnectTimeout = 30; /** * Constructor @@ -253,29 +257,38 @@ class XMPPHP_XMLStream { * @param boolean $sendinit */ public function connect($timeout = 30, $persistent = false, $sendinit = true) { - $this->disconnected = false; - $this->sent_disconnect = false; - if($persistent) { - $conflag = STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT; + $starttime = time(); + + do { + $this->disconnected = false; + $this->sent_disconnect = false; + if($persistent) { + $conflag = STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT; + } else { + $conflag = STREAM_CLIENT_CONNECT; + } + $conntype = 'tcp'; + if($this->use_ssl) $conntype = 'ssl'; + $this->log->log("Connecting to $conntype://{$this->host}:{$this->port}"); + try { + $this->socket = @stream_socket_client("$conntype://{$this->host}:{$this->port}", $errno, $errstr, $timeout, $conflag); + } catch (Exception $e) { + throw new XMPPHP_Exception($e->getMessage()); + } + if(!$this->socket) { + $this->log->log("Could not connect.", XMPPHP_Log::LEVEL_ERROR); + $this->disconnected = true; + # Take it easy for a few seconds + sleep(min($timeout, 5)); + } + } while (!$this->socket && (time() - $starttime) < $timeout); + + if ($this->socket) { + stream_set_blocking($this->socket, 1); + if($sendinit) $this->send($this->stream_start); } else { - $conflag = STREAM_CLIENT_CONNECT; - } - $conntype = 'tcp'; - if($this->use_ssl) $conntype = 'ssl'; - $this->log->log("Connecting to $conntype://{$this->host}:{$this->port}"); - try { - $this->socket = @stream_socket_client("$conntype://{$this->host}:{$this->port}", $errno, $errstr, $timeout, $conflag); - } catch (Exception $e) { - throw new XMPPHP_Exception($e->getMessage()); - } - if(!$this->socket) { - $this->log->log("Could not connect.", XMPPHP_Log::LEVEL_ERROR); - $this->disconnected = true; - - throw new XMPPHP_Exception('Could not connect.'); + throw new XMPPHP_Exception("Could not connect before timeout."); } - stream_set_blocking($this->socket, 1); - if($sendinit) $this->send($this->stream_start); } /** @@ -283,12 +296,17 @@ class XMPPHP_XMLStream { */ public function doReconnect() { if(!$this->is_server) { - $this->log->log("Reconnecting...", XMPPHP_Log::LEVEL_WARNING); - $this->connect(30, false, false); + $this->log->log("Reconnecting ($this->reconnectTimeout)...", XMPPHP_Log::LEVEL_WARNING); + $this->connect($this->reconnectTimeout, false, false); $this->reset(); + $this->event('reconnect'); } } + public function setReconnectTimeout($timeout) { + $this->reconnectTimeout = $timeout; + } + /** * Disconnect from XMPP Host */ @@ -310,24 +328,64 @@ class XMPPHP_XMLStream { return $this->disconnected; } - private function __process() { - $read = array($this->socket); - $write = null; - $except = null; - $updated = @stream_select($read, $write, $except, 1); - if ($updated > 0) { - $buff = @fread($this->socket, 1024); - if(!$buff) { - if($this->reconnect) { + /** + * Core reading tool + * 0 -> only read if data is immediately ready + * NULL -> wait forever and ever + * integer -> process for this amount of time + */ + + private function __process($maximum=0) { + + $remaining = $maximum; + + do { + $starttime = (microtime(true) * 1000000); + $read = array($this->socket); + $write = array(); + $except = array(); + if (is_null($maximum)) { + $secs = NULL; + $usecs = NULL; + } else if ($maximum == 0) { + $secs = 0; + $usecs = 0; + } else { + $usecs = $remaining % 1000000; + $secs = floor(($remaining - $usecs) / 1000000); + } + $updated = @stream_select($read, $write, $except, $secs, $usecs); + if ($updated === false) { + $this->log->log("Error on stream_select()", XMPPHP_Log::LEVEL_VERBOSE); + if ($this->reconnect) { $this->doReconnect(); } else { fclose($this->socket); + $this->socket = NULL; return false; } + } else if ($updated > 0) { + # XXX: Is this big enough? + $buff = @fread($this->socket, 4096); + if(!$buff) { + if($this->reconnect) { + $this->doReconnect(); + } else { + fclose($this->socket); + $this->socket = NULL; + return false; + } + } + $this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE); + xml_parse($this->parser, $buff, false); + } else { + # $updated == 0 means no changes during timeout. } - $this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE); - xml_parse($this->parser, $buff, false); - } + $endtime = (microtime(true)*1000000); + $time_past = $endtime - $starttime; + $remaining = $remaining - $time_past; + } while (is_null($maximum) || $remaining > 0); + return true; } /** @@ -336,10 +394,7 @@ class XMPPHP_XMLStream { * @return string */ public function process() { - $updated = ''; - while(!$this->disconnect) { - $this->__process(); - } + $this->__process(NULL); } /** @@ -348,11 +403,11 @@ class XMPPHP_XMLStream { * @param integer $timeout * @return string */ - public function processTime($timeout = -1) { - $start = time(); - $updated = ''; - while(!$this->disconnected and ($timeout == -1 or time() - $start < $timeout)) { - $this->__process(); + public function processTime($timeout=NULL) { + if (is_null($timeout)) { + return $this->__process(NULL); + } else { + return $this->__process($timeout * 1000000); } } @@ -372,7 +427,7 @@ class XMPPHP_XMLStream { reset($this->until); $updated = ''; while(!$this->disconnected and $this->until[$event_key] and (time() - $start < $timeout or $timeout == -1)) { - $this->__process(); + $this->__process(0); } if(array_key_exists($event_key, $this->until_payload)) { $payload = $this->until_payload[$event_key]; @@ -556,34 +611,47 @@ class XMPPHP_XMLStream { * * @param string $msg */ - public function send($msg, $rec=false) { - if($this->time() - $this->last_send < .1) { - usleep(100000); + public function send($msg, $timeout=NULL) { + + if (is_null($timeout)) { + $secs = NULL; + $usecs = NULL; + } else if ($timeout == 0) { + $secs = 0; + $usecs = 0; + } else { + $maximum = $timeout * 1000000; + $usecs = $maximum % 1000000; + $secs = floor(($maximum - $usecs) / 1000000); } - $wait = true; - while($wait) { - $read = null; - $write = array($this->socket); - $except = null; - $select = @stream_select($read, $write, $except, 0, 0); - if($select === False) { - $this->doReconnect(); - return false; - } elseif ($select > 0) { - $wait = false; - } else { - usleep(100000); - //$this->processTime(.25); - } + + $read = array(); + $write = array($this->socket); + $except = array(); + + $select = @stream_select($read, $write, $except, $secs, $usecs); + + if($select === False) { + $this->log->log("ERROR sending message; reconnecting."); + $this->doReconnect(); + # TODO: retry send here + return false; + } elseif ($select > 0) { + $this->log->log("Socket is ready; send it."); + } else { + $this->log->log("Socket is not ready; break."); + return false; } - $sentbytes = @fwrite($this->socket, $msg, 1024); - $this->last_send = $this->time(); + + $sentbytes = @fwrite($this->socket, $msg); $this->log->log("SENT: " . mb_substr($msg, 0, $sentbytes, '8bit'), XMPPHP_Log::LEVEL_VERBOSE); if($sentbytes === FALSE) { + $this->log->log("ERROR sending message; reconnecting."); $this->doReconnect(); - } elseif ($sentbytes != mb_strlen($msg, '8bit')) { - $this->send(mb_substr($msg, $sentbytes, mb_strlen($msg, '8bit'), '8bit'), true); + return false; } + $this->log->log("Successfully sent $sentbytes bytes."); + return $sentbytes; } public function time() { @@ -616,4 +684,12 @@ class XMPPHP_XMLStream { xml_set_element_handler($this->parser, 'startXML', 'endXML'); xml_set_character_data_handler($this->parser, 'charXML'); } + + public function readyToProcess() { + $read = array($this->socket); + $write = array(); + $except = array(); + $updated = @stream_select($read, $write, $except, 0); + return (($updated !== false) && ($updated > 0)); + } }