]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
switch around how XMLStream does processing
authorEvan Prodromou <evan@prodromou.name>
Sun, 31 Aug 2008 00:32:10 +0000 (20:32 -0400)
committerEvan Prodromou <evan@prodromou.name>
Sun, 31 Aug 2008 00:32:10 +0000 (20:32 -0400)
darcs-hash:20080831003210-84dde-92ccffd5b2e1d50963b18babd93c70fb1d20cdba.gz

extlib/XMPPHP/XMLStream.php
lib/queuehandler.php
scripts/publicqueuehandler.php
scripts/xmppconfirmhandler.php
scripts/xmppqueuehandler.php

index 1190167681cffbc720a00e821093790163f7f930..3f85ed0f86c179203aef7480ec84ff347c3f010a 100644 (file)
@@ -310,24 +310,50 @@ class XMPPHP_XMLStream {
                return $this->disconnected;
        }
 
-       private function __process() {
-               $read = array($this->socket);
-               $write = null;
-               $except = null;
-               $updated = @stream_select($read, $write, $except, 1);
-               if ($updated > 0) {
-                       $buff = @fread($this->socket, 1024);
-                       if(!$buff) { 
-                               if($this->reconnect) {
-                                       $this->doReconnect();
-                               } else {
-                                       fclose($this->socket);
-                                       return false;
+       /**
+        * Core reading tool
+        * 0 -> only read if data is immediately ready
+        * NULL -> wait forever and ever
+        * integer -> process for this amount of time 
+        */
+       
+       private function __process($maximum=0) {
+               
+               $remaining = $maximum;
+               
+               do {
+                       $starttime = microtime();
+                       $read = array($this->socket);
+                       $write = array();
+                       $except = array();
+                       if (is_null($maximum)) {
+                               $secs = NULL;
+                               $usecs = NULL;
+                       } else if ($maximum == 0) {
+                               $secs = 0;
+                               $usecs = 0;
+                       } else {
+                               $secs = $remaining / 1000000;
+                               $usecs = $remaining % 1000000;
+                       }
+                       $updated = @stream_select($read, $write, $except, $secs, $usecs);
+                       if ($updated > 0) {
+                               # XXX: Is this big enough?
+                               $buff = @fread($this->socket, 4096);
+                               if(!$buff) { 
+                                       if($this->reconnect) {
+                                               $this->doReconnect();
+                                       } else {
+                                               fclose($this->socket);
+                                               return false;
+                                       }
                                }
+                               $this->log->log("RECV: $buff",  XMPPHP_Log::LEVEL_VERBOSE);
+                               xml_parse($this->parser, $buff, false);
                        }
-                       $this->log->log("RECV: $buff",  XMPPHP_Log::LEVEL_VERBOSE);
-                       xml_parse($this->parser, $buff, false);
-               }
+                       $remaining -= (microtime() - $starttime);
+               } while (is_null($maximum) || $remaining > 0);
+               return true;
        }
        
        /**
@@ -336,10 +362,7 @@ class XMPPHP_XMLStream {
         * @return string
         */
        public function process() {
-               $updated = '';
-               while(!$this->disconnect) {
-                       $this->__process();
-               }
+               $this->__process(NULL);
        }
 
        /**
@@ -348,11 +371,11 @@ class XMPPHP_XMLStream {
         * @param integer $timeout
         * @return string
         */
-       public function processTime($timeout = -1) {
-               $start = time();
-               $updated = '';
-               while(!$this->disconnected and ($timeout == -1 or time() - $start < $timeout)) {
-                       $this->__process();
+       public function processTime($timeout=NULL) {
+               if (is_null($timeout)) {
+                       return $this->__process(NULL);
+               } else {
+                       return $this->__process($timeout * 1000000);
                }
        }
 
@@ -372,7 +395,7 @@ class XMPPHP_XMLStream {
                reset($this->until);
                $updated = '';
                while(!$this->disconnected and $this->until[$event_key] and (time() - $start < $timeout or $timeout == -1)) {
-                       $this->__process();
+                       $this->__process(0);
                }
                if(array_key_exists($event_key, $this->until_payload)) {
                        $payload = $this->until_payload[$event_key];
index ba7a93ab29b909ed7c9d219734da80a8b5d3d61b..3115ea38d2a470334ee5dd8783d606fd383e13b8 100644 (file)
@@ -81,21 +81,18 @@ class QueueHandler {
                                        $this->log(LOG_WARNING, 'queue item for notice that does not exist');
                                }
                                $qi->delete();
-                               $this->idle();
+                               $this->idle(0);
                        } else {
                                $this->clear_old_claims();
-                               $start = microtime();
-                               $this->idle();
-                               $used = microtime() - $start;
-                               if ($used < 1000000) {
-                                       usleep(1000000 - $used);
-                               }
+                               $this->idle(5);
                        }       
                } while (true);
        }
 
-       function idle() {
-               return true;
+       function idle($timeout=0) {
+               if ($timeout>0) {
+                       sleep($timeout);
+               }
        }
        
        function clear_old_claims() {
index 9a7b6df5faf6af2373dc4674176806d245f94cba..555298f6a9240674d9e4693b4d98fabfa388ff84 100755 (executable)
@@ -54,16 +54,8 @@ class PublicQueueHandler extends QueueHandler {
                return jabber_public_notice($notice);
        }
        
-       function idle() {
-           $this->log(LOG_DEBUG, 'Checking the incoming message queue.');
-               # Process the queue for a second
-               if ($this->conn->readyToProcess()) {
-                       $this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.');
-                       $this->conn->processTime(1);
-                       $this->log(LOG_DEBUG, 'Done processing incoming message queue.');
-               } else {
-                       $this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.');
-               }
+       function idle($timeout=0) {
+               $this->conn->processTime($timeout);
        }
 
        function forward_message(&$pl) {
index 08a397fc431bc96bd5bb19907be69613a733811b..7971198b1e370cabacad98189d80f25d8494ed05 100755 (executable)
@@ -86,16 +86,10 @@ class XmppConfirmHandler {
                                                continue;
                                        }
                                }
-                               $this->idle();
+                               $this->idle(0);
                        } else {
 #                              $this->clear_old_confirm_claims();
-                               $start = microtime();
-                               $this->idle();
-                               $used = microtime() - $start;
-                               if ($used < 10000000) {
-                                       usleep(10000000 - $used);
-                               }
-                               sleep(10);
+                               $this->idle(10);
                        }
                } while (true);
        }
@@ -137,16 +131,8 @@ class XmppConfirmHandler {
                common_log($level, 'XmppConfirmHandler ('. $this->_id .'): '.$msg);
        }
        
-       function idle() {
-           $this->log(LOG_DEBUG, 'Checking the incoming message queue.');
-               # Process the queue for a second
-               if ($this->conn->readyToProcess()) {
-                       $this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.');
-                       $this->conn->processTime(1);
-                       $this->log(LOG_DEBUG, 'Done processing incoming message queue.');
-               } else {
-                       $this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.');
-               }
+       function idle($timeout=0) {
+               $this->conn->processTime($timeout);
        }
 
        function forward_message(&$pl) {
index c6f5c3f1210654152be7937e0fd153fbe8fc96a9..7f1e6c28f4629e5a2a1741a28e51fcd7b11f1a72 100755 (executable)
@@ -56,16 +56,9 @@ class XmppQueueHandler extends QueueHandler {
                return jabber_broadcast_notice($notice);
        }
 
-       function idle() {
-           $this->log(LOG_DEBUG, 'Checking the incoming message queue.');
+       function idle($timeout=0) {
                # Process the queue for a second
-               if ($this->conn->readyToProcess()) {
-                       $this->log(LOG_DEBUG, 'Something in the incoming message queue; processing it.');
-                       $this->conn->processTime(1);
-                       $this->log(LOG_DEBUG, 'Done processing incoming message queue.');
-               } else {
-                       $this->log(LOG_DEBUG, 'Nothing in the incoming message queue; skipping it.');
-               }
+               $this->conn->processTime($timeout);
        }
 
        function finish() {