]> git.mxchange.org Git - hub.git/commitdiff
Added a new task for listener pools and network package readers (for abstract Network...
authorRoland Häder <roland@mxchange.org>
Mon, 18 Apr 2011 00:51:08 +0000 (00:51 +0000)
committerRoland Häder <roland@mxchange.org>
Mon, 18 Apr 2011 00:51:08 +0000 (00:51 +0000)
24 files changed:
.gitattributes
application/hub/config.php
application/hub/interfaces/package/class_Receivable.php [new file with mode: 0644]
application/hub/main/filter/task/node/class_NodeTaskHandlerInitializerFilter.php
application/hub/main/handler/network/tcp/class_TcpNetworkPackageHandler.php
application/hub/main/helper/connection/class_BaseConnectionHelper.php
application/hub/main/listener/tcp/class_TcpListener.php
application/hub/main/package/class_NetworkPackage.php
application/hub/main/package/fragmenter/class_PackageFragmenter.php
application/hub/main/pools/class_BasePool.php
application/hub/main/pools/peer/class_DefaultPeerPool.php
application/hub/main/streams/package/input/class_PackageInputStream.php [new file with mode: 0644]
application/hub/main/streams/package/output/class_PackageOutputStream.php
application/hub/main/tasks/cruncher/class_CruncherKeyProducerTask.php
application/hub/main/tasks/cruncher/class_CruncherTestUnitProducerTask.php
application/hub/main/tasks/cruncher/class_CruncherWorkUnitFetcherTask.php
application/hub/main/tasks/hub/announcement/class_HubSelfAnnouncementTask.php
application/hub/main/tasks/hub/class_HubSelfConnectTask.php
application/hub/main/tasks/hub/class_HubSocketListenerTask.php [new file with mode: 0644]
application/hub/main/tasks/hub/ping/class_HubPingTask.php
application/hub/main/tasks/hub/update/class_HubUpdateCheckTask.php
application/hub/main/tasks/idle/class_IdleLoopTask.php
application/hub/main/tasks/network/class_NetworkPackageReaderTask.php [new file with mode: 0644]
application/hub/main/tasks/network/class_NetworkPackageWriterTask.php

index 7327360bcc98e1c146179b8ae24b55956fcd01b7..f94e47e869566167d15d3405c53949697fcf1f48 100644 (file)
@@ -64,6 +64,7 @@ application/hub/interfaces/nodes/.htaccess -text svneol=unset#text/plain
 application/hub/interfaces/nodes/class_NodeHelper.php -text svneol=unset#text/plain
 application/hub/interfaces/package/.htaccess -text svneol=unset#text/plain
 application/hub/interfaces/package/class_Deliverable.php -text svneol=unset#text/plain
+application/hub/interfaces/package/class_Receivable.php -text
 application/hub/interfaces/package/fragmenter/.htaccess -text svneol=unset#text/plain
 application/hub/interfaces/package/fragmenter/class_Fragmentable.php -text
 application/hub/interfaces/pool/.htaccess -text svneol=unset#text/plain
@@ -462,6 +463,7 @@ application/hub/main/states/peer/new/class_NewConnectionPeerState.php svneol=nat
 application/hub/main/streams/.htaccess svneol=native#text/plain
 application/hub/main/streams/package/.htaccess svneol=native#text/plain
 application/hub/main/streams/package/input/.htaccess svneol=native#text/plain
+application/hub/main/streams/package/input/class_PackageInputStream.php -text
 application/hub/main/streams/package/output/.htaccess svneol=native#text/plain
 application/hub/main/streams/package/output/class_PackageOutputStream.php svneol=native#text/plain
 application/hub/main/tags/.htaccess -text svneol=unset#text/plain
@@ -481,6 +483,7 @@ application/hub/main/tasks/hub/.htaccess -text svneol=unset#text/plain
 application/hub/main/tasks/hub/announcement/.htaccess -text svneol=unset#text/plain
 application/hub/main/tasks/hub/announcement/class_HubSelfAnnouncementTask.php -text svneol=unset#text/plain
 application/hub/main/tasks/hub/class_HubSelfConnectTask.php -text svneol=unset#text/plain
+application/hub/main/tasks/hub/class_HubSocketListenerTask.php -text
 application/hub/main/tasks/hub/ping/.htaccess -text svneol=unset#text/plain
 application/hub/main/tasks/hub/ping/class_HubPingTask.php -text svneol=unset#text/plain
 application/hub/main/tasks/hub/update/.htaccess -text svneol=unset#text/plain
@@ -488,6 +491,7 @@ application/hub/main/tasks/hub/update/class_HubUpdateCheckTask.php -text svneol=
 application/hub/main/tasks/idle/.htaccess -text svneol=unset#text/plain
 application/hub/main/tasks/idle/class_IdleLoopTask.php -text svneol=unset#text/plain
 application/hub/main/tasks/network/.htaccess -text svneol=unset#text/plain
+application/hub/main/tasks/network/class_NetworkPackageReaderTask.php -text
 application/hub/main/tasks/network/class_NetworkPackageWriterTask.php svneol=native#text/plain
 application/hub/main/template/.htaccess -text svneol=unset#text/plain
 application/hub/main/template/announcement/.htaccess -text svneol=unset#text/plain
index 738f88b9e46c8fe2711a4f649ab7b19975691972..56d04545bc5485efc0eb7e9420de0542602edde7 100644 (file)
@@ -315,15 +315,6 @@ $cfg->setConfigEntry('news_main_limit', 5);
 // CFG: TASK-HANDLER-CLASS
 $cfg->setConfigEntry('task_handler_class', 'TaskHandler');
 
-// CFG: TASK-NETWORK-PACKAGE-READER-STARTUP-DELAY
-$cfg->setConfigEntry('task_network_package_reader_startup_delay', 2000);
-
-// CFG: TASK-NETWORK-PACKAGE-READER-INTERVAL-DELAY
-$cfg->setConfigEntry('task_network_package_reader_interval_delay', 10);
-
-// CFG: TASK-NETWORK-PACKAGE-READER-MAX-RUNS
-$cfg->setConfigEntry('task_network_package_reader_max_runs', 0);
-
 // CFG: TASK-QUERY-HANDLER-STARTUP-DELAY
 $cfg->setConfigEntry('task_query_handler_startup_delay', 1000);
 
@@ -405,6 +396,12 @@ $cfg->setConfigEntry('hub_self_announcement_task_class', 'HubSelfAnnouncementTas
 // CFG: HUB-PACKAGE-WRITER-TASK-CLASS
 $cfg->setConfigEntry('hub_package_writer_task_class', 'NetworkPackageWriterTask');
 
+// CFG: HUB-PACKAGE-READER-TASK-CLASS
+$cfg->setConfigEntry('hub_package_reader_task_class', 'NetworkPackageReaderTask');
+
+// CFG: HUB-SOCKET-LISTENER-TASK-CLASS
+$cfg->setConfigEntry('hub_socket_listener_task_class', 'HubSocketListenerTask');
+
 // CFG: CRUNCHER-WORK-UNIT-FETCHER-TASK-CLASS
 $cfg->setConfigEntry('cruncher_work_unit_fetcher_task_class', 'CruncherWorkUnitFetcherTask');
 
@@ -423,6 +420,24 @@ $cfg->setConfigEntry('task_network_package_writer_interval_delay', 10);
 // CFG: TASK-NETWORK-PACKAGE-WRITER-MAX-RUNS
 $cfg->setConfigEntry('task_network_package_writer_max_runs', 0);
 
+// CFG: TASK-NETWORK-PACKAGE-READER-STARTUP-DELAY
+$cfg->setConfigEntry('task_network_package_reader_startup_delay', 2000);
+
+// CFG: TASK-NETWORK-PACKAGE-READER-INTERVAL-DELAY
+$cfg->setConfigEntry('task_network_package_reader_interval_delay', 10);
+
+// CFG: TASK-NETWORK-PACKAGE-READER-MAX-RUNS
+$cfg->setConfigEntry('task_network_package_reader_max_runs', 0);
+
+// CFG: TASK-SOCKET-LISTENER-STATUP-DELAY
+$cfg->setConfigEntry('task_socket_listener_startup_delay', 2500);
+
+// CFG: TASK-SOCKET-LISTENER-INTERVAL-DELAY
+$cfg->setConfigEntry('task_socket_listener_interval_delay', 10);
+
+// CFG: TASK-SOCKET-LISTENER-MAX-RUNS
+$cfg->setConfigEntry('task_socket_listener_max_runs', 0);
+
 // CFG: TASK-CRUNCHER-WORK-UNIT-FETCHER-STARTUP-DELAY
 $cfg->setConfigEntry('task_cruncher_work_unit_fetcher_startup_delay', 1000);
 
@@ -555,8 +570,11 @@ $cfg->setConfigEntry('package_fragmenter_class', 'PackageFragmenter');
 // CFG: NODE-RAW-PACKAGE-OUTPUT-STREAM
 $cfg->setConfigEntry('node_raw_package_output_stream', 'PackageOutputStream');
 
+// CFG: NODE-RAW-PACKAGE-INPUT-STREAM
+$cfg->setConfigEntry('node_raw_package_input_stream', 'PackageInputStream');
+
 // CFG: PACKAGE-CHUNK-SIZE
-$cfg->setConfigEntry('package_chunk_size', 8*512);
+$cfg->setConfigEntry('package_chunk_size', 256*8);
 
 // CFG: CRUNCHER-TEST-UNITS-ENABLED
 $cfg->setConfigEntry('cruncher_test_units_enabled', 'Y');
diff --git a/application/hub/interfaces/package/class_Receivable.php b/application/hub/interfaces/package/class_Receivable.php
new file mode 100644 (file)
index 0000000..9f52f77
--- /dev/null
@@ -0,0 +1,41 @@
+<?php
+/**
+ * An interface for package receivers
+ *
+ * @author             Roland Haeder <webmaster@ship-simu.org>
+ * @version            0.0.0
+ * @copyright  Copyright (c) 2007, 2008 Roland Haeder, 2009 - 2011 Hub Developer Team
+ * @license            GNU GPL 3.0 or any newer version
+ * @link               http://www.ship-simu.org
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+interface Receivable extends FrameworkInterface {
+       /**
+        * Checks wether new raw data from the socket has arrived
+        *
+        * @return      $hasArrived             Wether raw package data has arrived at a socket
+        */
+       function isNewRawDataPending ();
+
+       /**
+        * Checks wether a new package has arrived
+        *
+        * @return      $hasArrived             Wether a new package has arrived for processing
+        */
+       function isNewPackageArrived ();
+}
+
+// [EOF]
+?>
index 7e46b791d9ac4ec7de6f8ae78798bcc6ccfbdd3d..ac4ffff0d3e3fb7cd034965fa77b9956f8026509 100644 (file)
@@ -61,8 +61,17 @@ class NodeTaskHandlerInitializerFilter extends BaseFilter implements Filterable
                // Get a new task handler instance
                $handlerInstance = ObjectFactory::createObjectByConfiguredName('task_handler_class');
 
+               // Generate socket listener task
+               $taskInstance = ObjectFactory::createObjectByConfiguredName('hub_socket_listener_task_class');
+
+               // Network package reader, needs to be delayed a little
+               $handlerInstance->registerTask('socket_listener', $taskInstance);
+
+               // Generate package reader task
+               $taskInstance = ObjectFactory::createObjectByConfiguredName('hub_package_reader_task_class');
+
                // Network package reader, needs to be delayed a little
-               $handlerInstance->registerTask('network_package_reader', $nodeInstance->getListenerPoolInstance());
+               $handlerInstance->registerTask('network_package_reader', $taskInstance);
 
                // Generate package writer task
                $taskInstance = ObjectFactory::createObjectByConfiguredName('hub_package_writer_task_class');
index 701e44497504fa120825a2c4edf393afcf71f8f6..61a9a941ec7707504d14e73dc8ddb036c4329a9c 100644 (file)
@@ -54,46 +54,13 @@ class TcpNetworkPackageHandler extends BaseNetworkPackageHandler implements Netw
                return $handlerInstance;
        }
 
-       /**
-        * Verifies the package data itself (only rudymentary check)
-        *
-        * @param       $rawData        Raw package data (as string
-        * @return      $isValid        Wether the package data is valid
-        */
-       private function isPackageDataValid ($rawData) {
-               // Default is not valid
-               $isValid = false;
-
-               // Convert it back into an array
-               $packageData = explode(NetworkPackage::PACKAGE_DATA_SEPERATOR, $rawData);
-
-               // This should be at least three entries: sender|recipient|raw data
-               if (count($packageData) < 3) {
-                       // Not enougth fields in $packageData!
-                       $this->setErrorCode(self::PACKAGE_ERROR_INCOMPLETE_DATA);
-               } elseif (count(explode(NetworkPackage::PACKAGE_MASK_SEPERATOR, $packageData[NetworkPackage::INDEX_PACKAGE_CONTENT])) < 2) {
-                       // Not entougth fields in content
-                       $this->setErrorCode(self::PACKAGE_ERROR_INVALID_CONTENT);
-               } elseif (!$this->ifRecipientMatchesOwnAddress($packageData)) {
-                       // Field 'recipient' doesn't match our address, this must always be the case
-                       $this->setErrorCode(self::PACKAGE_ERROR_RECIPIENT_MISMATCH);
-               } else {
-                       // This check went fine...
-                       $isValid = true;
-               }
-
-               // Return the result
-               return $isValid;
-       }
-
        /**
         * Processes a package from given resource. This is mostly useful for TCP
         * package handling and is implemented in the TcpListener class
         *
         * @param       $resource       A valid resource identifier
         * @return      void
-        * @throws      InvalidResourceException        If the given resource is invalid
-        * @todo        ~10% done
+        * @todo        We need to handle the raw data over to the NetworkPackage class if the state if fine
         */
        public function processResourcePackage ($resource) {
                // Check the resource
@@ -112,7 +79,10 @@ class TcpNetworkPackageHandler extends BaseNetworkPackageHandler implements Netw
                $this->debugOutput('HANDLER: Handling TCP package from peer ' . $resource);
 
                // Read the raw data from socket
-               $rawData = socket_read($resource, $this->getConfigInstance()->getConfigEntry('tcp_buffer_length'), PHP_NORMAL_READ);
+               $rawData = socket_read($resource, $this->getConfigInstance()->getConfigEntry('tcp_buffer_length'), PHP_BINARY_READ);
+
+               // Debug output of read data length
+               $this->debugOutput('rawData[]=' . strlen($rawData));
 
                // Is it valid?
                if (($rawData === false) || (socket_last_error($resource) > 0)) {
@@ -121,24 +91,9 @@ class TcpNetworkPackageHandler extends BaseNetworkPackageHandler implements Netw
                } elseif (empty($rawData)) {
                        // The peer did send nothing to us
                        $this->setErrorCode(self::SOCKET_ERROR_EMPTY_DATA);
-               } elseif (!$this->isPackageDataValid($rawData)) {
-                       // Invalid package data
-                       if ($this->getErrorCode() == self::SOCKET_ERROR_UNHANDLED) {
-                               // Set it to PACKAGE_ERROR_INVALID_DATA, because SOCKET_ERROR_UNHANDLED should not be used
-                               $this->setErrorCode(self::PACKAGE_ERROR_INVALID_DATA);
-                       } // END - if
-               } else {
-                       // Prepare the package data
-                       $packageData = explode(NetworkPackage::PACKAGE_DATA_SEPERATOR, $rawData);
-
-                       // Low-level checks are all green
-                       $this->setErrorCode(self::PACKAGE_LEVEL_CHECK_OKAY);
                }
 
-               // Debug output of error code
-               $this->debugOutput('errorCode=' . $this->getErrorCode());
-
-               // Get a state from the resolver for this package
+               // Get a state from the resolver for this package data array
                $stateInstance = $this->getResolverInstance()->resolveStateByPackage($this, $packageData, $resource);
                die('UNFINISHED:'.$stateInstance->__toString()."\n");
        }
index 4771e4d669b0531472f802200c76efaa38760488..c88e89e53d49d298ac1acf3e3a053c424034cece 100644 (file)
@@ -43,9 +43,9 @@ class BaseConnectionHelper extends BaseHubHelper implements Registerable, Protoc
        private $sentData = 0;
 
        /**
-        * Offset
+        * Difference
         */
-       private $offset = 0;
+       private $diff = 0;
 
        /**
         * Connect retries for this connection
@@ -167,7 +167,13 @@ class BaseConnectionHelper extends BaseHubHelper implements Registerable, Protoc
                }
 
                // Implode the package data array and fragement the resulting string, returns the final hash
-               $this->currentFinalHash = $fragmenterInstance->fragmentPackageArray($packageData, $this);
+               $finalHash = $fragmenterInstance->fragmentPackageArray($packageData, $this);
+               if ($finalHash !== true) {
+                       $this->currentFinalHash = $finalHash;
+               } // END - if
+
+               // Debug message
+               //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: currentFinalHash=' . $this->currentFinalHash);
 
                // Get the next raw data chunk from the fragmenter
                $rawDataChunk = $fragmenterInstance->getNextRawDataChunk($this->currentFinalHash);
@@ -176,11 +182,18 @@ class BaseConnectionHelper extends BaseHubHelper implements Registerable, Protoc
                $chunkHashes = array_keys($rawDataChunk);
                $chunkData   = array_values($rawDataChunk);
 
-               // Remember this chunk as queued
-               $this->queuedChunks[$chunkHashes[0]] = $chunkData[0];
+               // Is the required data there?
+               //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: chunkHashes[]=' . count($chunkHashes) . ',chunkData[]=' . count($chunkData));
+               if ((isset($chunkHashes[0])) && (isset($chunkData[0]))) {
+                       // Remember this chunk as queued
+                       $this->queuedChunks[$chunkHashes[0]] = $chunkData[0];
 
-               // Return the raw data
-               return $chunkData[0];
+                       // Return the raw data
+                       return $chunkData[0];
+               } else {
+                       // Return zero string
+                       return '';
+               }
        }
 
        /**
@@ -197,43 +210,88 @@ class BaseConnectionHelper extends BaseHubHelper implements Registerable, Protoc
        /**
         * Sends raw package data to the recipient
         *
-        * @param       $packageData    Raw package data
-        * @return      $sentBytes              Actual sent bytes to the peer
+        * @param       $packageData            Raw package data
+        * @return      $totalSentBytes         Total sent bytes to the peer
         * @throws      InvalidSocketException  If we got a problem with this socket
         */
        public function sendRawPackageData (array $packageData) {
-               // Convert the package data array to a raw data stream
-               $rawData = $this->getRawDataFromPackageArray($packageData);
+               // Cache buffer length
+               $bufferSize = $this->getConfigInstance()->getConfigEntry($this->getProtocol() . '_buffer_length');
+
+               // Init variables
+               $rawData = '';
+               $dataStream = ' ';
+               $totalSentBytes = 0;
+
+               // Fill sending buffer with data
+               while ((strlen($rawData) < $bufferSize) && (strlen($dataStream) > 0)) {
+                       // Convert the package data array to a raw data stream
+                       $dataStream = $this->getRawDataFromPackageArray($packageData);
+                       //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: Adding ' . strlen($dataStream) . ' bytes to the sending buffer ...');
+                       $rawData .= $dataStream;
+               } // END - while
+
+               // Nothing to sent is bad news!
+               assert(strlen($rawData) > 0);
+
+               // Calculate difference
+               $this->diff = $bufferSize - strlen($rawData);
 
                // Get socket resource
                $socketResource = $this->getSocketResource();
 
-               // And deliver it
-               $sentBytes = @socket_write($socketResource, $rawData, $this->getConfigInstance()->getConfigEntry($this->getProtocol() . '_buffer_length') - $this->offset);
+               // Init sent bytes
+               $sentBytes = 0;
 
-               // If there was an error, we don't continue here
-               if ($sentBytes === false) {
-                       // Get socket error code for verification
-                       $socketError = socket_last_error($socketResource);
+               // Deliver all data
+               while ($sentBytes !== false) {
+                       // And deliver it
+                       //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: Sending out ' . strlen($rawData) . ' bytes,bufferSize=' . $bufferSize . ',diff=' . $this->diff);
+                       $sentBytes = @socket_write($socketResource, $rawData, ($bufferSize - $this->diff));
 
-                       // Get error message
-                       $errorMessage = socket_strerror($socketError);
+                       // If there was an error, we don't continue here
+                       if ($sentBytes === false) {
+                               // Get socket error code for verification
+                               $socketError = socket_last_error($socketResource);
 
-                       // Shutdown this socket
-                       $this->shutdownSocket($socketResource);
+                               // Get error message
+                               $errorMessage = socket_strerror($socketError);
 
-                       // And throw it
-                       throw new InvalidSocketException(array($this, gettype($socketResource), $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET);
-               } elseif ($sentBytes == 0) {
-                       // Nothing sent is bad news
-                       die(__METHOD__.': Unhandled 0 sent bytes! rawData[]=' . strlen($rawData));
-               }
+                               // Shutdown this socket
+                               $this->shutdownSocket($socketResource);
+
+                               // And throw it
+                               throw new InvalidSocketException(array($this, gettype($socketResource), $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET);
+                       } elseif (($sentBytes == 0) && (strlen($rawData) > 0)) {
+                               // Nothing sent means we are done
+                               //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: All sent! (' . __LINE__ . ')');
+                               break;
+                       }
+
+                       // The difference between sent bytes and length of raw data should not be below zero
+                       assert((strlen($rawData) - $sentBytes) >= 0);
+
+                       // Add total sent bytes
+                       $totalSentBytes += $sentBytes;
+
+                       // Cut out the last unsent bytes
+                       //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: Sent out ' . $sentBytes . ' of ' . strlen($rawData) . ' bytes ...');
+                       $rawData = substr($rawData, $sentBytes);
+
+                       // Calculate difference again
+                       $this->diff = $bufferSize - strlen($rawData);
 
-               // The difference between sent bytes and length of raw data should not be below zero
-               assert((strlen($rawData) - $sentBytes) >= 0);
+                       // Can we abort?
+                       if (strlen($rawData) <= 0) {
+                               // Abort here, all sent!
+                               //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: All sent! (' . __LINE__ . ')');
+                               break;
+                       } // END - if
+               } // END - while
 
                // Return sent bytes
-               return $sentBytes;
+               //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: totalSentBytes=' . $totalSentBytes);
+               return $totalSentBytes;
        }
 
        /**
index b2325c4182091ad68b29def5438bc1e9584a295a..54842ca6adea9ad504825348a4b420a089406b23 100644 (file)
@@ -92,9 +92,11 @@ class TcpListener extends BaseListener implements Listenable {
                        throw new InvalidSocketException(array($this, gettype($mainSocket), $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET);
                } // END - if
 
-               // Now, we want non-blocking mode
-               $this->debugOutput('LISTENER: Setting non-blocking mode.');
-               if (!socket_set_nonblock($mainSocket)) {
+               // "Bind" the socket to the given address, on given port so this means
+               // that all connections on this port are now our resposibility to
+               // send/recv data, disconnect, etc..
+               $this->debugOutput('LISTENER: Binding to address ' . $this->getListenAddress() . ':' . $this->getListenPort());
+               if (!socket_bind($mainSocket, $this->getListenAddress(), $this->getListenPort())) {
                        // Get socket error code for verification
                        $socketError = socket_last_error($mainSocket);
 
@@ -108,11 +110,9 @@ class TcpListener extends BaseListener implements Listenable {
                        throw new InvalidSocketException(array($this, gettype($mainSocket), $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET);
                } // END - if
 
-               // "Bind" the socket to the given address, on given port so this means
-               // that all connections on this port are now our resposibility to
-               // send/recv data, disconnect, etc..
-               $this->debugOutput('LISTENER: Binding to address ' . $this->getListenAddress() . ':' . $this->getListenPort());
-               if (!socket_bind($mainSocket, $this->getListenAddress(), $this->getListenPort())) {
+               // Start listen for connections
+               $this->debugOutput('LISTENER: Listening for connections.');
+               if (!socket_listen($mainSocket)) {
                        // Get socket error code for verification
                        $socketError = socket_last_error($mainSocket);
 
@@ -126,9 +126,9 @@ class TcpListener extends BaseListener implements Listenable {
                        throw new InvalidSocketException(array($this, gettype($mainSocket), $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET);
                } // END - if
 
-               // Start listen for connections
-               $this->debugOutput('LISTENER: Listening for connections.');
-               if (!socket_listen($mainSocket)) {
+               // Now, we want non-blocking mode
+               $this->debugOutput('LISTENER: Setting non-blocking mode.');
+               if (!socket_set_nonblock($mainSocket)) {
                        // Get socket error code for verification
                        $socketError = socket_last_error($mainSocket);
 
@@ -202,6 +202,7 @@ class TcpListener extends BaseListener implements Listenable {
                if (in_array($this->getSocketResource(), $readers)) {
                        // Then accept it
                        $newSocket = socket_accept($this->getSocketResource());
+                       //* NOISY-DEBUG: */ $this->debugOutput('LISTENER: newSocket=' . $newSocket);
 
                        // We want non-blocking here, too
                        if (!socket_set_nonblock($newSocket)) {
@@ -234,6 +235,7 @@ class TcpListener extends BaseListener implements Listenable {
                // Handle it here, if not main socket
                if ($currentSocket != $this->getSocketResource()) {
                        // ... or else it will raise warnings like 'Transport endpoint is not connected'
+                       //* NOISY-DEBUG: */ $this->debugOutput('LISTENER: currentSocket=' . $currentSocket);
                        $this->getPackageInstance()->processResourcePackage($currentSocket);
                } // END - if
 
index 0f93a3946d5723a0324aefffcc26a798edcdead7..9a4ce15df1299ad6ad4c3ec5ec91a52ea0435269 100644 (file)
@@ -1,9 +1,10 @@
 <?php
 /**
- * A NetworkPackage class. This class implements Deliverable because all network
- * packages should be deliverable to other nodes. It further provides methods
- * for reading raw content from template engines and feeding it to the stacker
- * for undeclared packages.
+ * A NetworkPackage class. This class implements Deliverable and Receivable
+ * because all network packages should be deliverable to other nodes and
+ * receivable from other nodes. It further provides methods for reading raw
+ * content from template engines and feeding it to the stacker for undeclared
+ * packages.
  *
  * The factory method requires you to provide a compressor class (which must
  * implement the Compressor interface). If you don't want any compression (not
@@ -34,7 +35,7 @@
  * You should have received a copy of the GNU General Public License
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
-class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Registerable {
+class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Receivable, Registerable {
        /**
         * Package mask for compressing package data:
         * 0: Compressor extension
@@ -106,7 +107,7 @@ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Registe
        const NETWORK_TARGET_SELF = 'self';
 
        /**
-        * TCP package size
+        * TCP package size in bytes
         */
        const TCP_PACKAGE_SIZE = 512;
 
@@ -167,6 +168,10 @@ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Registe
                return $hash;
        }
 
+       ///////////////////////////////////////////////////////////////////////////
+       //                   Delivering packages / raw data
+       ///////////////////////////////////////////////////////////////////////////
+
        /**
         * Delivers the given raw package data.
         *
@@ -270,7 +275,7 @@ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Registe
                        return;
                } // END - if
 
-               // Sent it away (we catch exceptions one method above
+               // Sent it away (we catch exceptions one method above)
                $sentBytes = $connectionInstance->sendRawPackageData($packageData);
 
                // Remember unsent raw bytes in back-buffer, if any
@@ -435,6 +440,28 @@ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Registe
                        $this->debugOutput('PACKAGE: Package was not delivered: ' . $e->getMessage());
                }
        }
+
+       ///////////////////////////////////////////////////////////////////////////
+       //                   Receiving packages / raw data
+       ///////////////////////////////////////////////////////////////////////////
+
+       /**
+        * Checks wether new raw package data has arrived at a socket
+        *
+        * @return      $hasArrived             Wether new raw package data has arrived for processing
+        */
+       public function isNewRawDataPending () {
+               // @TODO Add some content here
+       }
+
+       /**
+        * Checks wether a new package has arrived
+        *
+        * @return      $hasArrived             Wether a new package has arrived for processing
+        */
+       public function isNewPackageArrived () {
+               // @TODO Add some content here
+       }
 }
 
 // [EOF]
index e7d2349ff1ac9fbb61754b27057187a4fb0ca7e9..1cf6db7ea45fbfe2e0fe52638c5eab910119d5dd 100644 (file)
@@ -302,6 +302,7 @@ class PackageFragmenter extends BaseFrameworkSystem implements Fragmentable, Reg
 
                // Calculate real (data) chunk size
                $dataChunkSize = $this->getDataChunkSizeFromHash($finalHash);
+               //* NOISY-DEBUG: */ $this->debugOutput('FRAGMENTER: dataChunkSize=' . $dataChunkSize);
 
                // Now split it up
                for ($idx = 0; $idx < strlen($encodedData); $idx += $dataChunkSize) {
@@ -319,6 +320,7 @@ class PackageFragmenter extends BaseFrameworkSystem implements Fragmentable, Reg
                        assert(strlen($chunk) <= NetworkPackage::TCP_PACKAGE_SIZE);
 
                        // Add it to the array
+                       //* NOISY-DEBUG: */ $this->debugOutput('FRAGMENTER: Adding ' . strlen($chunk) . ' bytes of a chunk.');
                        $this->chunks[$finalHash][] = $chunk;
                } // END - for
 
@@ -362,6 +364,7 @@ class PackageFragmenter extends BaseFrameworkSystem implements Fragmentable, Reg
                        assert(strlen($chunk) <= NetworkPackage::TCP_PACKAGE_SIZE);
 
                        // Add it to the array
+                       //* NOISY-DEBUG: */ $this->debugOutput('FRAGMENTER: Adding ' . strlen($chunk) . ' bytes of a chunk.');
                        array_unshift($this->chunks[$finalHash], $chunk);
                } // END - for
        }
@@ -409,6 +412,7 @@ class PackageFragmenter extends BaseFrameworkSystem implements Fragmentable, Reg
                }
 
                // Return final hash
+               //* NOISY-DEBUG: */ $this->debugOutput('FRAGMENTER: finalHash[' . gettype($finalHash) . ']=' . $finalHash);
                return $finalHash;
        }
 
index 26178b84cd4639d27aea162d03ffcfb09ee2405a..458dc656dac16e5d977308a95d64dfb866403ff3 100644 (file)
@@ -87,7 +87,7 @@ class BasePool extends BaseHubSystem implements Visitable {
         */
        public function accept (Visitor $visitorInstance) {
                // Debug message
-               //* DEBUG: */ $this->debugOutput('POOL: ' . $visitorInstance->__toString() . ' has visited - START');
+               //* NOISY-DEBUG: */ $this->debugOutput('POOL: ' . $visitorInstance->__toString() . ' has visited - START');
 
                // Visit this pool
                $visitorInstance->visitPool($this);
@@ -117,7 +117,7 @@ class BasePool extends BaseHubSystem implements Visitable {
                } // END - while
 
                // Debug message
-               //* DEBUG: */ $this->debugOutput('POOL: ' . $visitorInstance->__toString() . ' has visited - FINISHED');
+               //* NOISY-DEBUG: */ $this->debugOutput('POOL: ' . $visitorInstance->__toString() . ' has visited - FINISHED');
        }
 
        /**
index fc62b4134b4f321dfa56a4be8a430444e50d732d..daab465ee1cf9e348b03d6f3862d649f90c785bf 100644 (file)
@@ -104,11 +104,11 @@ class DefaultPeerPool extends BasePool implements PoolablePeer {
                        } // END - if
                } else {
                        // Server sockets won't work with socket_getpeername()
-                       $this->debugOutput('POOL: Socket resource is server socket. This is not a bug.');
+                       $this->debugOutput('POOL: Socket resource is server socket (' . $socketResource . '). This is not a bug.');
                }
 
                // Output error message
-               $this->debugOutput('POOL: Adding peer ' . $peerName);
+               $this->debugOutput('POOL: Adding peer ' . $peerName . ', socketResource=' . $socketResource);
 
                // Add it finally to the pool
                $this->addPoolEntry($socketResource);
diff --git a/application/hub/main/streams/package/input/class_PackageInputStream.php b/application/hub/main/streams/package/input/class_PackageInputStream.php
new file mode 100644 (file)
index 0000000..aa9548f
--- /dev/null
@@ -0,0 +1,65 @@
+<?php
+/**
+ * A PackageInputStream class
+ *
+ * @author             Roland Haeder <webmaster@ship-simu.org>
+ * @version            0.0.0
+ * @copyright  Copyright (c) 2007, 2008 Roland Haeder, 2009  Developer Team
+ * @license            GNU GPL 3.0 or any newer version
+ * @link               http://www.ship-simu.org
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+class PackageInputStream extends BaseStream implements InputStreamable {
+       /**
+        * Protected constructor
+        *
+        * @return      void
+        */
+       protected function __construct () {
+               // Call parent constructor
+               parent::__construct(__CLASS__);
+       }
+
+       /**
+        * Creates an instance of this node class
+        *
+        * @return      $streamInstance         An instance of this node class
+        */
+       public final static function createPackageInputStream () {
+               // Get a new instance
+               $streamInstance = new PackageInputStream();
+
+               // Return the instance
+               return $streamInstance;
+       }
+
+       /**
+        * Streams the data and maybe does something to it
+        *
+        * @param       $data   The data (string mostly) to "stream"
+        * @return      $data   The data (string mostly) to "stream"
+        * @todo        Do we need to do something more here?
+        */
+       public function streamData ($data) {
+               // Encode the data with BASE64 encoding
+               $data = base64_decode($data);
+
+               // Return it
+               return $data;
+       }
+}
+
+// [EOF]
+?>
index af98e5eef81093c19241c8f5e5e596fd872ea85c..35b382ae31b19b8c29c0e8676e3d3ccd94e92a1b 100644 (file)
@@ -50,7 +50,7 @@ class PackageOutputStream extends BaseStream implements OutputStreamable {
         *
         * @param       $data   The data (string mostly) to "stream"
         * @return      $data   The data (string mostly) to "stream"
-        * @throws      UnsupportedOperationException   If this method is called
+        * @todo        Do we need to do something more here?
         */
        public function streamData ($data) {
                // Encode the data with BASE64 encoding
index 7e1c4a2d823955bc351b1fc64b8ef8bf0c92a7de..43673a23b01bd088b592f2435e61d3c98db22404 100644 (file)
@@ -35,7 +35,7 @@ class CruncherKeyProducerTask extends BaseTask implements Taskable, Visitable {
        /**
         * Creates an instance of this class
         *
-        * @return      $taskInstance   An instance of a Visitable class
+        * @return      $taskInstance   An instance of a Taskable/Visitable class
         */
        public final static function createCruncherKeyProducerTask () {
                // Get new instance
index 5b62110b9ad53cd8c01b3af748d6ceefa4153405..f47f633e320c823fece58500797be538b77fb8c3 100644 (file)
@@ -35,7 +35,7 @@ class CruncherTestUnitProducerTask extends BaseTask implements Taskable, Visitab
        /**
         * Creates an instance of this class
         *
-        * @return      $taskInstance   An instance of a Visitable class
+        * @return      $taskInstance   An instance of a Taskable/Visitable class
         */
        public final static function createCruncherTestUnitProducerTask () {
                // Get new instance
index 5c75860694ea27dd910fe9382258a7e8bdbc7ea7..d8bca71d9073dfdd4acecaa62d870a625935cf35 100644 (file)
@@ -36,7 +36,7 @@ class CruncherWorkUnitFetcherTask extends BaseTask implements Taskable, Visitabl
        /**
         * Creates an instance of this class
         *
-        * @return      $taskInstance   An instance of a Visitable class
+        * @return      $taskInstance   An instance of a Taskable/Visitable class
         */
        public final static function createCruncherWorkUnitFetcherTask () {
                // Get new instance
index d4c3bcc2ff50679aaec8b394da101e28ee6f6525..316a986c9fc67cb0b9e05f745c11af180001ef83 100644 (file)
@@ -35,7 +35,7 @@ class HubSelfAnnouncementTask extends BaseTask implements Taskable, Visitable {
        /**
         * Creates an instance of this class
         *
-        * @return      $taskInstance           An instance of a Visitable class
+        * @return      $taskInstance   An instance of a Taskable/Visitable class
         */
        public static final function createHubSelfAnnouncementTask () {
                // Get new instance
index 811a7c138bb4a0cde03adcb1751ba3766fe82bd5..616c25a57377e6d0f2c6a26a6b012a84214ec0f8 100644 (file)
@@ -35,7 +35,7 @@ class HubSelfConnectTask extends BaseTask implements Taskable, Visitable {
        /**
         * Creates an instance of this class
         *
-        * @return      $taskInstance           An instance of a Visitable class
+        * @return      $taskInstance   An instance of a Taskable/Visitable class
         */
        public static final function createHubSelfConnectTask () {
                // Get new instance
diff --git a/application/hub/main/tasks/hub/class_HubSocketListenerTask.php b/application/hub/main/tasks/hub/class_HubSocketListenerTask.php
new file mode 100644 (file)
index 0000000..139a2b4
--- /dev/null
@@ -0,0 +1,76 @@
+<?php
+/**
+ * A HubSocketListener task
+ *
+ * @author             Roland Haeder <webmaster@ship-simu.org>
+ * @version            0.0.0
+ * @copyright  Copyright (c) 2007, 2008 Roland Haeder, 2009 - 2011 Hub Developer Team
+ * @license            GNU GPL 3.0 or any newer version
+ * @link               http://www.ship-simu.org
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+class HubSocketListenerTask extends BaseTask implements Taskable, Visitable {
+       /**
+        * Protected constructor
+        *
+        * @return      void
+        */
+       protected function __construct () {
+               // Call parent constructor
+               parent::__construct(__CLASS__);
+       }
+
+       /**
+        * Creates an instance of this class
+        *
+        * @return      $taskInstance   An instance of a Taskable/Visitable class
+        */
+       public static final function createHubSocketListenerTask () {
+               // Get new instance
+               $taskInstance = new HubSocketListenerTask();
+
+               // Return the prepared instance
+               return $taskInstance;
+       }
+
+       /**
+        * Accepts the visitor to process the visit "request"
+        *
+        * @param       $visitorInstance        An instance of a Visitor class
+        * @return      void
+        */
+       public function accept (Visitor $visitorInstance) {
+               // Get the node instance from registry
+               $nodeInstance = Registry::getRegistry()->getInstance('node');
+
+               // Visit the pool listener task
+               $nodeInstance->getListenerPoolInstance()->accept($visitorInstance);
+
+               // Visit this task
+               // @TODO Do we need to visit this task? $visitorInstance->visitTask($this);
+       }
+
+       /**
+        * Executes the task
+        *
+        * @return      void
+        * @todo        0% done
+        */
+       public function executeTask () {
+       }
+}
+
+// [EOF]
+?>
index cda20d1256bea0ab8ae64f7aa0c4b08759a18018..19ad14978ede3406fdc7f3e27d8a4cc31a95296c 100644 (file)
@@ -21,7 +21,7 @@
  * You should have received a copy of the GNU General Public License
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
-class HubPingTask extends BaseTask implements Visitable, Taskable {
+class HubPingTask extends BaseTask implements Taskable, Visitable {
        /**
         * Protected constructor
         *
@@ -36,7 +36,7 @@ class HubPingTask extends BaseTask implements Visitable, Taskable {
         * Creates an instance of this class
         *
         * @param       $listInstance   A Listable instance
-        * @return      $taskInstance   An instance of a Visitable class
+        * @return      $taskInstance   An instance of a Taskable/Visitable class
         */
        public static final function createHubPingTask (Listable $listInstance) {
                // Get new instance
index b733b13c4ababefca338408f983b469b7e5dc928..e4ad02579ae4c5229adb80ff29e2bb4ea3cfa596 100644 (file)
@@ -21,7 +21,7 @@
  * You should have received a copy of the GNU General Public License
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
-class HubUpdateCheckTask extends BaseTask implements Visitable, Taskable {
+class HubUpdateCheckTask extends BaseTask implements Taskable, Visitable {
        /**
         * Protected constructor
         *
@@ -35,7 +35,7 @@ class HubUpdateCheckTask extends BaseTask implements Visitable, Taskable {
        /**
         * Creates an instance of this class
         *
-        * @return      $taskInstance           An instance of a Visitable class
+        * @return      $taskInstance   An instance of a Taskable/Visitable class
         */
        public static final function createHubUpdateCheckTask () {
                // Get new instance
index c355472e292c18807346a862f9d5444081bf091e..c4fc5d53e76ef26fce61f268ffe43b0adff43e3e 100644 (file)
@@ -21,7 +21,7 @@
  * You should have received a copy of the GNU General Public License
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
-class IdleLoopTask extends BaseTask implements Visitable, Taskable {
+class IdleLoopTask extends BaseTask implements Taskable, Visitable {
        /**
         * Protected constructor
         *
@@ -35,7 +35,7 @@ class IdleLoopTask extends BaseTask implements Visitable, Taskable {
        /**
         * Creates an instance of this class
         *
-        * @return      $taskInstance           An instance of a Visitable class
+        * @return      $taskInstance   An instance of a Taskable/Visitable class
         */
        public static final function createIdleLoopTask () {
                // Get new instance
diff --git a/application/hub/main/tasks/network/class_NetworkPackageReaderTask.php b/application/hub/main/tasks/network/class_NetworkPackageReaderTask.php
new file mode 100644 (file)
index 0000000..3f847aa
--- /dev/null
@@ -0,0 +1,80 @@
+<?php
+/**
+ * A NetworkPackageReader task
+ *
+ * @author             Roland Haeder <webmaster@ship-simu.org>
+ * @version            0.0.0
+ * @copyright  Copyright (c) 2007, 2008 Roland Haeder, 2009 - 2011 Hub Developer Team
+ * @license            GNU GPL 3.0 or any newer version
+ * @link               http://www.ship-simu.org
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+class NetworkPackageReaderTask extends BaseTask implements Taskable, Visitable {
+       /**
+        * Protected constructor
+        *
+        * @return      void
+        */
+       protected function __construct () {
+               // Call parent constructor
+               parent::__construct(__CLASS__);
+       }
+
+       /**
+        * Creates an instance of this class
+        *
+        * @return      $taskInstance   An instance of a Taskable/Visitable class
+        */
+       public static final function createNetworkPackageReaderTask () {
+               // Get new instance
+               $taskInstance = new NetworkPackageReaderTask();
+
+               // Return the prepared instance
+               return $taskInstance;
+       }
+
+       /**
+        * Accepts the visitor to process the visit "request"
+        *
+        * @param       $visitorInstance        An instance of a Visitor class
+        * @return      void
+        */
+       public function accept (Visitor $visitorInstance) {
+               // Visit this task
+               $visitorInstance->visitTask($this);
+       }
+
+       /**
+        * Executes the task
+        *
+        * @return      void
+        */
+       public function executeTask () {
+               // Get a singleton network package instance
+               $packageInstance = NetworkPackageFactory::createNetworkPackageInstance();
+
+               // Do we have something to handle?
+               if ($packageInstance->isNewRawDataPending()) {
+                       // We have to handle raw data from the socket
+                       $packageInstance->handleIncomingSocketRawData();
+               } elseif ($packageInstance->isNewPackageArrived()) {
+                       // Okay, then handle newly arrived package
+                       $packageInstance->handleNewlyArrivedPackage();
+               } // END - if
+       }
+}
+
+// [EOF]
+?>
index 2bba76f86a8b6df6ce7940e8a7aa1a43463b2070..39874dedde1ec896fa2f6b41b97b8c9b0862f866 100644 (file)
@@ -35,7 +35,7 @@ class NetworkPackageWriterTask extends BaseTask implements Taskable, Visitable {
        /**
         * Creates an instance of this class
         *
-        * @return      $taskInstance   An instance of a Visitable class
+        * @return      $taskInstance   An instance of a Taskable/Visitable class
         */
        public static final function createNetworkPackageWriterTask () {
                // Get new instance