X-Git-Url: https://git.mxchange.org/?a=blobdiff_plain;ds=sidebyside;f=extlib%2FXMPPHP%2FXMLStream.php;h=d33411ec54140dafc1c527c946d05cbc931807c9;hb=1a86bf9c65d2579d9245c6edcc968fed3d674f39;hp=1190167681cffbc720a00e821093790163f7f930;hpb=e4d42b2355690c09550a478d5c4106390ddc91b0;p=quix0rs-gnu-social.git diff --git a/extlib/XMPPHP/XMLStream.php b/extlib/XMPPHP/XMLStream.php index 1190167681..d33411ec54 100644 --- a/extlib/XMPPHP/XMLStream.php +++ b/extlib/XMPPHP/XMLStream.php @@ -22,17 +22,18 @@ * @package XMPPHP * @author Nathanael C. Fritz * @author Stephan Wentz + * @author Michael Garvin * @copyright 2008 Nathanael C. Fritz */ /** XMPPHP_Exception */ -require_once 'Exception.php'; +require_once dirname(__FILE__) . '/Exception.php'; /** XMPPHP_XMLObj */ -require_once 'XMLObj.php'; +require_once dirname(__FILE__) . '/XMLObj.php'; /** XMPPHP_Log */ -require_once 'Log.php'; +require_once dirname(__FILE__) . '/Log.php'; /** * XMPPHP XML Stream @@ -41,6 +42,7 @@ require_once 'Log.php'; * @package XMPPHP * @author Nathanael C. Fritz * @author Stephan Wentz + * @author Michael Garvin * @copyright 2008 Nathanael C. Fritz * @version $Id$ */ @@ -101,6 +103,10 @@ class XMPPHP_XMLStream { * @var array */ protected $nshandlers = array(); + /** + * @var array + */ + protected $xpathhandlers = array(); /** * @var array */ @@ -121,6 +127,10 @@ class XMPPHP_XMLStream { * @var string */ protected $until = ''; + /** + * @var string + */ + protected $until_count = ''; /** * @var array */ @@ -153,6 +163,10 @@ class XMPPHP_XMLStream { * @var boolean */ protected $use_ssl = false; + /** + * @var integer + */ + protected $reconnectTimeout = 30; /** * Constructor @@ -224,18 +238,44 @@ class XMPPHP_XMLStream { /** * Add Handler * - * @param integer $id + * @param string $name * @param string $ns * @param string $pointer * @param string $obj * @param integer $depth */ public function addHandler($name, $ns, $pointer, $obj = null, $depth = 1) { + #TODO deprication warning $this->nshandlers[] = array($name,$ns,$pointer,$obj, $depth); } /** - * Add Evemt Handler + * Add XPath Handler + * + * @param string $xpath + * @param string $pointer + * @param + */ + public function addXPathHandler($xpath, $pointer, $obj = null) { + if (preg_match_all("/\(?{[^\}]+}\)?(\/?)[^\/]+/", $xpath, $regs)) { + $ns_tags = $regs[0]; + } else { + $ns_tags = array($xpath); + } + foreach($ns_tags as $ns_tag) { + list($l, $r) = split("}", $ns_tag); + if ($r != null) { + $xpart = array(substr($l, 1), $r); + } else { + $xpart = array(null, $l); + } + $xpath_array[] = $xpart; + } + $this->xpathhandlers[] = array($xpath_array, $pointer, $obj); + } + + /** + * Add Event Handler * * @param integer $id * @param string $pointer @@ -253,29 +293,39 @@ class XMPPHP_XMLStream { * @param boolean $sendinit */ public function connect($timeout = 30, $persistent = false, $sendinit = true) { - $this->disconnected = false; $this->sent_disconnect = false; - if($persistent) { - $conflag = STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT; + $starttime = time(); + + do { + $this->disconnected = false; + $this->sent_disconnect = false; + if($persistent) { + $conflag = STREAM_CLIENT_CONNECT | STREAM_CLIENT_PERSISTENT; + } else { + $conflag = STREAM_CLIENT_CONNECT; + } + $conntype = 'tcp'; + if($this->use_ssl) $conntype = 'ssl'; + $this->log->log("Connecting to $conntype://{$this->host}:{$this->port}"); + try { + $this->socket = @stream_socket_client("$conntype://{$this->host}:{$this->port}", $errno, $errstr, $timeout, $conflag); + } catch (Exception $e) { + throw new XMPPHP_Exception($e->getMessage()); + } + if(!$this->socket) { + $this->log->log("Could not connect.", XMPPHP_Log::LEVEL_ERROR); + $this->disconnected = true; + # Take it easy for a few seconds + sleep(min($timeout, 5)); + } + } while (!$this->socket && (time() - $starttime) < $timeout); + + if ($this->socket) { + stream_set_blocking($this->socket, 1); + if($sendinit) $this->send($this->stream_start); } else { - $conflag = STREAM_CLIENT_CONNECT; - } - $conntype = 'tcp'; - if($this->use_ssl) $conntype = 'ssl'; - $this->log->log("Connecting to $conntype://{$this->host}:{$this->port}"); - try { - $this->socket = @stream_socket_client("$conntype://{$this->host}:{$this->port}", $errno, $errstr, $timeout, $conflag); - } catch (Exception $e) { - throw new XMPPHP_Exception($e->getMessage()); + throw new XMPPHP_Exception("Could not connect before timeout."); } - if(!$this->socket) { - $this->log->log("Could not connect.", XMPPHP_Log::LEVEL_ERROR); - $this->disconnected = true; - - throw new XMPPHP_Exception('Could not connect.'); - } - stream_set_blocking($this->socket, 1); - if($sendinit) $this->send($this->stream_start); } /** @@ -283,17 +333,25 @@ class XMPPHP_XMLStream { */ public function doReconnect() { if(!$this->is_server) { - $this->log->log("Reconnecting...", XMPPHP_Log::LEVEL_WARNING); - $this->connect(30, false, false); + $this->log->log("Reconnecting ($this->reconnectTimeout)...", XMPPHP_Log::LEVEL_WARNING); + $this->connect($this->reconnectTimeout, false, false); $this->reset(); + $this->event('reconnect'); } } + public function setReconnectTimeout($timeout) { + $this->reconnectTimeout = $timeout; + } + /** * Disconnect from XMPP Host */ public function disconnect() { $this->log->log("Disconnecting...", XMPPHP_Log::LEVEL_VERBOSE); + if(false == (bool) $this->socket) { + return; + } $this->reconnect = false; $this->send($this->stream_end); $this->sent_disconnect = true; @@ -310,24 +368,64 @@ class XMPPHP_XMLStream { return $this->disconnected; } - private function __process() { - $read = array($this->socket); - $write = null; - $except = null; - $updated = @stream_select($read, $write, $except, 1); - if ($updated > 0) { - $buff = @fread($this->socket, 1024); - if(!$buff) { - if($this->reconnect) { + /** + * Core reading tool + * 0 -> only read if data is immediately ready + * NULL -> wait forever and ever + * integer -> process for this amount of time + */ + + private function __process($maximum=5) { + + $remaining = $maximum; + + do { + $starttime = (microtime(true) * 1000000); + $read = array($this->socket); + $write = array(); + $except = array(); + if (is_null($maximum)) { + $secs = NULL; + $usecs = NULL; + } else if ($maximum == 0) { + $secs = 0; + $usecs = 0; + } else { + $usecs = $remaining % 1000000; + $secs = floor(($remaining - $usecs) / 1000000); + } + $updated = @stream_select($read, $write, $except, $secs, $usecs); + if ($updated === false) { + $this->log->log("Error on stream_select()", XMPPHP_Log::LEVEL_VERBOSE); + if ($this->reconnect) { $this->doReconnect(); } else { fclose($this->socket); + $this->socket = NULL; return false; } + } else if ($updated > 0) { + # XXX: Is this big enough? + $buff = @fread($this->socket, 4096); + if(!$buff) { + if($this->reconnect) { + $this->doReconnect(); + } else { + fclose($this->socket); + $this->socket = NULL; + return false; + } + } + $this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE); + xml_parse($this->parser, $buff, false); + } else { + # $updated == 0 means no changes during timeout. } - $this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE); - xml_parse($this->parser, $buff, false); - } + $endtime = (microtime(true)*1000000); + $time_past = $endtime - $starttime; + $remaining = $remaining - $time_past; + } while (is_null($maximum) || $remaining > 0); + return true; } /** @@ -336,10 +434,7 @@ class XMPPHP_XMLStream { * @return string */ public function process() { - $updated = ''; - while(!$this->disconnect) { - $this->__process(); - } + $this->__process(NULL); } /** @@ -348,11 +443,11 @@ class XMPPHP_XMLStream { * @param integer $timeout * @return string */ - public function processTime($timeout = -1) { - $start = time(); - $updated = ''; - while(!$this->disconnected and ($timeout == -1 or time() - $start < $timeout)) { - $this->__process(); + public function processTime($timeout=NULL) { + if (is_null($timeout)) { + return $this->__process(NULL); + } else { + return $this->__process($timeout * 1000000); } } @@ -370,16 +465,19 @@ class XMPPHP_XMLStream { end($this->until); $event_key = key($this->until); reset($this->until); + $this->until_count[$event_key] = 0; $updated = ''; - while(!$this->disconnected and $this->until[$event_key] and (time() - $start < $timeout or $timeout == -1)) { + while(!$this->disconnected and $this->until_count[$event_key] < 1 and (time() - $start < $timeout or $timeout == -1)) { $this->__process(); } if(array_key_exists($event_key, $this->until_payload)) { $payload = $this->until_payload[$event_key]; + unset($this->until_payload[$event_key]); + unset($this->until_count[$event_key]); + unset($this->until[$event_key]); } else { $payload = array(); } - unset($this->until_payload[$event_key]); return $payload; } @@ -449,9 +547,30 @@ class XMPPHP_XMLStream { $this->xml_depth--; if($this->xml_depth == 1) { #clean-up old objects - $found = false; + #$found = false; #FIXME This didn't appear to be in use --Gar + foreach($this->xpathhandlers as $handler) { + if (is_array($this->xmlobj) && array_key_exists(2, $this->xmlobj)) { + $searchxml = $this->xmlobj[2]; + $nstag = array_shift($handler[0]); + if (($nstag[0] == null or $searchxml->ns == $nstag[0]) and ($nstag[1] == "*" or $nstag[1] == $searchxml->name)) { + foreach($handler[0] as $nstag) { + if ($searchxml !== null and $searchxml->hasSub($nstag[1], $ns=$nstag[0])) { + $searchxml = $searchxml->sub($nstag[1], $ns=$nstag[0]); + } else { + $searchxml = null; + break; + } + } + if ($searchxml !== null) { + if($handler[2] === null) $handler[2] = $this; + $this->log->log("Calling {$handler[1]}", XMPPHP_Log::LEVEL_DEBUG); + $handler[2]->$handler[1]($this->xmlobj[2]); + } + } + } + } foreach($this->nshandlers as $handler) { - if($handler[4] != 1 and $this->xmlobj[2]->hasSub($handler[0])) { + if($handler[4] != 1 and array_key_exists(2, $this->xmlobj) and $this->xmlobj[2]->hasSub($handler[0])) { $searchxml = $this->xmlobj[2]->sub($handler[0]); } elseif(is_array($this->xmlobj) and array_key_exists(2, $this->xmlobj)) { $searchxml = $this->xmlobj[2]; @@ -528,7 +647,11 @@ class XMPPHP_XMLStream { if(is_array($until)) { if(in_array($name, $until)) { $this->until_payload[$key][] = array($name, $payload); - $this->until[$key] = false; + if(!isset($this->until_count[$key])) { + $this->until_count[$key] = 0; + } + $this->until_count[$key] += 1; + #$this->until[$key] = false; } } } @@ -556,34 +679,47 @@ class XMPPHP_XMLStream { * * @param string $msg */ - public function send($msg, $rec=false) { - if($this->time() - $this->last_send < .1) { - usleep(100000); + public function send($msg, $timeout=NULL) { + + if (is_null($timeout)) { + $secs = NULL; + $usecs = NULL; + } else if ($timeout == 0) { + $secs = 0; + $usecs = 0; + } else { + $maximum = $timeout * 1000000; + $usecs = $maximum % 1000000; + $secs = floor(($maximum - $usecs) / 1000000); } - $wait = true; - while($wait) { - $read = null; - $write = array($this->socket); - $except = null; - $select = @stream_select($read, $write, $except, 0, 0); - if($select === False) { - $this->doReconnect(); - return false; - } elseif ($select > 0) { - $wait = false; - } else { - usleep(100000); - //$this->processTime(.25); - } + + $read = array(); + $write = array($this->socket); + $except = array(); + + $select = @stream_select($read, $write, $except, $secs, $usecs); + + if($select === False) { + $this->log->log("ERROR sending message; reconnecting."); + $this->doReconnect(); + # TODO: retry send here + return false; + } elseif ($select > 0) { + $this->log->log("Socket is ready; send it.", XMPPHP_Log::LEVEL_VERBOSE); + } else { + $this->log->log("Socket is not ready; break.", XMPPHP_Log::LEVEL_ERROR); + return false; } - $sentbytes = @fwrite($this->socket, $msg, 1024); - $this->last_send = $this->time(); - $this->log->log("SENT: " . mb_substr($msg, 0, $sentbytes, '8bit'), XMPPHP_Log::LEVEL_VERBOSE); + + $sentbytes = @fwrite($this->socket, $msg); + $this->log->log("SENT: " . mb_substr($msg, 0, $sentbytes, '8bit'), XMPPHP_Log::LEVEL_VERBOSE); if($sentbytes === FALSE) { + $this->log->log("ERROR sending message; reconnecting.", XMPPHP_Log::LEVEL_ERROR); $this->doReconnect(); - } elseif ($sentbytes != mb_strlen($msg, '8bit')) { - $this->send(mb_substr($msg, $sentbytes, mb_strlen($msg, '8bit'), '8bit'), true); + return false; } + $this->log->log("Successfully sent $sentbytes bytes.", XMPPHP_Log::LEVEL_VERBOSE); + return $sentbytes; } public function time() {