return $this->disconnected;
}
- private function __process() {
- $read = array($this->socket);
- $write = null;
- $except = null;
- $updated = @stream_select($read, $write, $except, 1);
- if ($updated > 0) {
- $buff = @fread($this->socket, 1024);
- if(!$buff) {
- if($this->reconnect) {
- $this->doReconnect();
- } else {
- fclose($this->socket);
- return false;
+ /**
+ * Core reading tool
+ * 0 -> only read if data is immediately ready
+ * NULL -> wait forever and ever
+ * integer -> process for this amount of time
+ */
+
+ private function __process($maximum=0) {
+
+ $remaining = $maximum;
+
+ do {
+ $starttime = microtime();
+ $read = array($this->socket);
+ $write = array();
+ $except = array();
+ if (is_null($maximum)) {
+ $secs = NULL;
+ $usecs = NULL;
+ } else if ($maximum == 0) {
+ $secs = 0;
+ $usecs = 0;
+ } else {
+ $secs = $remaining / 1000000;
+ $usecs = $remaining % 1000000;
+ }
+ $updated = @stream_select($read, $write, $except, $secs, $usecs);
+ if ($updated > 0) {
+ # XXX: Is this big enough?
+ $buff = @fread($this->socket, 4096);
+ if(!$buff) {
+ if($this->reconnect) {
+ $this->doReconnect();
+ } else {
+ fclose($this->socket);
+ return false;
+ }
}
+ $this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE);
+ xml_parse($this->parser, $buff, false);
}
- $this->log->log("RECV: $buff", XMPPHP_Log::LEVEL_VERBOSE);
- xml_parse($this->parser, $buff, false);
- }
+ $remaining -= (microtime() - $starttime);
+ } while (is_null($maximum) || $remaining > 0);
+ return true;
}
/**
* @return string
*/
public function process() {
- $updated = '';
- while(!$this->disconnect) {
- $this->__process();
- }
+ $this->__process(NULL);
}
/**
* @param integer $timeout
* @return string
*/
- public function processTime($timeout = -1) {
- $start = time();
- $updated = '';
- while(!$this->disconnected and ($timeout == -1 or time() - $start < $timeout)) {
- $this->__process();
+ public function processTime($timeout=NULL) {
+ if (is_null($timeout)) {
+ return $this->__process(NULL);
+ } else {
+ return $this->__process($timeout * 1000000);
}
}
reset($this->until);
$updated = '';
while(!$this->disconnected and $this->until[$event_key] and (time() - $start < $timeout or $timeout == -1)) {
- $this->__process();
+ $this->__process(0);
}
if(array_key_exists($event_key, $this->until_payload)) {
$payload = $this->until_payload[$event_key];
$this->log(LOG_WARNING, 'queue item for notice that does not exist');
}
$qi->delete();
- $this->idle();
+ $this->idle(0);
} else {
$this->clear_old_claims();
- $start = microtime();
- $this->idle();
- $used = microtime() - $start;
- if ($used < 1000000) {
- usleep(1000000 - $used);
- }
+ $this->idle(5);
}
} while (true);
}
- function idle() {
- return true;
+ function idle($timeout=0) {
+ if ($timeout>0) {
+ sleep($timeout);
+ }
}
function clear_old_claims() {
return jabber_public_notice($notice);
}
- function idle() {
- $this->log(LOG_DEBUG, 'Checking the incoming message queue.');
- # Process the queue for a second
- if ($this->conn->readyToProcess()) {
- $this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.');
- $this->conn->processTime(1);
- $this->log(LOG_DEBUG, 'Done processing incoming message queue.');
- } else {
- $this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.');
- }
+ function idle($timeout=0) {
+ $this->conn->processTime($timeout);
}
function forward_message(&$pl) {
continue;
}
}
- $this->idle();
+ $this->idle(0);
} else {
# $this->clear_old_confirm_claims();
- $start = microtime();
- $this->idle();
- $used = microtime() - $start;
- if ($used < 10000000) {
- usleep(10000000 - $used);
- }
- sleep(10);
+ $this->idle(10);
}
} while (true);
}
common_log($level, 'XmppConfirmHandler ('. $this->_id .'): '.$msg);
}
- function idle() {
- $this->log(LOG_DEBUG, 'Checking the incoming message queue.');
- # Process the queue for a second
- if ($this->conn->readyToProcess()) {
- $this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.');
- $this->conn->processTime(1);
- $this->log(LOG_DEBUG, 'Done processing incoming message queue.');
- } else {
- $this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.');
- }
+ function idle($timeout=0) {
+ $this->conn->processTime($timeout);
}
function forward_message(&$pl) {
return jabber_broadcast_notice($notice);
}
- function idle() {
- $this->log(LOG_DEBUG, 'Checking the incoming message queue.');
+ function idle($timeout=0) {
# Process the queue for a second
- if ($this->conn->readyToProcess()) {
- $this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.');
- $this->conn->processTime(1);
- $this->log(LOG_DEBUG, 'Done processing incoming message queue.');
- } else {
- $this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.');
- }
+ $this->conn->processTime($timeout);
}
function finish() {