From d391ecd222ae868265735539d68a2c148711b5e3 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Roland=20H=C3=A4der?= Date: Mon, 18 Apr 2011 00:51:08 +0000 Subject: [PATCH] Added a new task for listener pools and network package readers (for abstract NetworkPackage class), socket_read() does only not block in binary mode? --- .gitattributes | 4 + application/hub/config.php | 38 ++++-- .../interfaces/package/class_Receivable.php | 41 ++++++ ...class_NodeTaskHandlerInitializerFilter.php | 11 +- .../tcp/class_TcpNetworkPackageHandler.php | 57 +-------- .../connection/class_BaseConnectionHelper.php | 118 +++++++++++++----- .../main/listener/tcp/class_TcpListener.php | 24 ++-- .../hub/main/package/class_NetworkPackage.php | 41 ++++-- .../fragmenter/class_PackageFragmenter.php | 4 + application/hub/main/pools/class_BasePool.php | 4 +- .../main/pools/peer/class_DefaultPeerPool.php | 4 +- .../input/class_PackageInputStream.php | 65 ++++++++++ .../output/class_PackageOutputStream.php | 2 +- .../class_CruncherKeyProducerTask.php | 2 +- .../class_CruncherTestUnitProducerTask.php | 2 +- .../class_CruncherWorkUnitFetcherTask.php | 2 +- .../class_HubSelfAnnouncementTask.php | 2 +- .../tasks/hub/class_HubSelfConnectTask.php | 2 +- .../tasks/hub/class_HubSocketListenerTask.php | 76 +++++++++++ .../main/tasks/hub/ping/class_HubPingTask.php | 4 +- .../hub/update/class_HubUpdateCheckTask.php | 4 +- .../main/tasks/idle/class_IdleLoopTask.php | 4 +- .../class_NetworkPackageReaderTask.php | 80 ++++++++++++ .../class_NetworkPackageWriterTask.php | 2 +- 24 files changed, 466 insertions(+), 127 deletions(-) create mode 100644 application/hub/interfaces/package/class_Receivable.php create mode 100644 application/hub/main/streams/package/input/class_PackageInputStream.php create mode 100644 application/hub/main/tasks/hub/class_HubSocketListenerTask.php create mode 100644 application/hub/main/tasks/network/class_NetworkPackageReaderTask.php diff --git a/.gitattributes b/.gitattributes index 7327360bc..f94e47e86 100644 --- a/.gitattributes +++ b/.gitattributes @@ -64,6 +64,7 @@ application/hub/interfaces/nodes/.htaccess -text svneol=unset#text/plain application/hub/interfaces/nodes/class_NodeHelper.php -text svneol=unset#text/plain application/hub/interfaces/package/.htaccess -text svneol=unset#text/plain application/hub/interfaces/package/class_Deliverable.php -text svneol=unset#text/plain +application/hub/interfaces/package/class_Receivable.php -text application/hub/interfaces/package/fragmenter/.htaccess -text svneol=unset#text/plain application/hub/interfaces/package/fragmenter/class_Fragmentable.php -text application/hub/interfaces/pool/.htaccess -text svneol=unset#text/plain @@ -462,6 +463,7 @@ application/hub/main/states/peer/new/class_NewConnectionPeerState.php svneol=nat application/hub/main/streams/.htaccess svneol=native#text/plain application/hub/main/streams/package/.htaccess svneol=native#text/plain application/hub/main/streams/package/input/.htaccess svneol=native#text/plain +application/hub/main/streams/package/input/class_PackageInputStream.php -text application/hub/main/streams/package/output/.htaccess svneol=native#text/plain application/hub/main/streams/package/output/class_PackageOutputStream.php svneol=native#text/plain application/hub/main/tags/.htaccess -text svneol=unset#text/plain @@ -481,6 +483,7 @@ application/hub/main/tasks/hub/.htaccess -text svneol=unset#text/plain application/hub/main/tasks/hub/announcement/.htaccess -text svneol=unset#text/plain application/hub/main/tasks/hub/announcement/class_HubSelfAnnouncementTask.php -text svneol=unset#text/plain application/hub/main/tasks/hub/class_HubSelfConnectTask.php -text svneol=unset#text/plain +application/hub/main/tasks/hub/class_HubSocketListenerTask.php -text application/hub/main/tasks/hub/ping/.htaccess -text svneol=unset#text/plain application/hub/main/tasks/hub/ping/class_HubPingTask.php -text svneol=unset#text/plain application/hub/main/tasks/hub/update/.htaccess -text svneol=unset#text/plain @@ -488,6 +491,7 @@ application/hub/main/tasks/hub/update/class_HubUpdateCheckTask.php -text svneol= application/hub/main/tasks/idle/.htaccess -text svneol=unset#text/plain application/hub/main/tasks/idle/class_IdleLoopTask.php -text svneol=unset#text/plain application/hub/main/tasks/network/.htaccess -text svneol=unset#text/plain +application/hub/main/tasks/network/class_NetworkPackageReaderTask.php -text application/hub/main/tasks/network/class_NetworkPackageWriterTask.php svneol=native#text/plain application/hub/main/template/.htaccess -text svneol=unset#text/plain application/hub/main/template/announcement/.htaccess -text svneol=unset#text/plain diff --git a/application/hub/config.php b/application/hub/config.php index 738f88b9e..56d04545b 100644 --- a/application/hub/config.php +++ b/application/hub/config.php @@ -315,15 +315,6 @@ $cfg->setConfigEntry('news_main_limit', 5); // CFG: TASK-HANDLER-CLASS $cfg->setConfigEntry('task_handler_class', 'TaskHandler'); -// CFG: TASK-NETWORK-PACKAGE-READER-STARTUP-DELAY -$cfg->setConfigEntry('task_network_package_reader_startup_delay', 2000); - -// CFG: TASK-NETWORK-PACKAGE-READER-INTERVAL-DELAY -$cfg->setConfigEntry('task_network_package_reader_interval_delay', 10); - -// CFG: TASK-NETWORK-PACKAGE-READER-MAX-RUNS -$cfg->setConfigEntry('task_network_package_reader_max_runs', 0); - // CFG: TASK-QUERY-HANDLER-STARTUP-DELAY $cfg->setConfigEntry('task_query_handler_startup_delay', 1000); @@ -405,6 +396,12 @@ $cfg->setConfigEntry('hub_self_announcement_task_class', 'HubSelfAnnouncementTas // CFG: HUB-PACKAGE-WRITER-TASK-CLASS $cfg->setConfigEntry('hub_package_writer_task_class', 'NetworkPackageWriterTask'); +// CFG: HUB-PACKAGE-READER-TASK-CLASS +$cfg->setConfigEntry('hub_package_reader_task_class', 'NetworkPackageReaderTask'); + +// CFG: HUB-SOCKET-LISTENER-TASK-CLASS +$cfg->setConfigEntry('hub_socket_listener_task_class', 'HubSocketListenerTask'); + // CFG: CRUNCHER-WORK-UNIT-FETCHER-TASK-CLASS $cfg->setConfigEntry('cruncher_work_unit_fetcher_task_class', 'CruncherWorkUnitFetcherTask'); @@ -423,6 +420,24 @@ $cfg->setConfigEntry('task_network_package_writer_interval_delay', 10); // CFG: TASK-NETWORK-PACKAGE-WRITER-MAX-RUNS $cfg->setConfigEntry('task_network_package_writer_max_runs', 0); +// CFG: TASK-NETWORK-PACKAGE-READER-STARTUP-DELAY +$cfg->setConfigEntry('task_network_package_reader_startup_delay', 2000); + +// CFG: TASK-NETWORK-PACKAGE-READER-INTERVAL-DELAY +$cfg->setConfigEntry('task_network_package_reader_interval_delay', 10); + +// CFG: TASK-NETWORK-PACKAGE-READER-MAX-RUNS +$cfg->setConfigEntry('task_network_package_reader_max_runs', 0); + +// CFG: TASK-SOCKET-LISTENER-STATUP-DELAY +$cfg->setConfigEntry('task_socket_listener_startup_delay', 2500); + +// CFG: TASK-SOCKET-LISTENER-INTERVAL-DELAY +$cfg->setConfigEntry('task_socket_listener_interval_delay', 10); + +// CFG: TASK-SOCKET-LISTENER-MAX-RUNS +$cfg->setConfigEntry('task_socket_listener_max_runs', 0); + // CFG: TASK-CRUNCHER-WORK-UNIT-FETCHER-STARTUP-DELAY $cfg->setConfigEntry('task_cruncher_work_unit_fetcher_startup_delay', 1000); @@ -555,8 +570,11 @@ $cfg->setConfigEntry('package_fragmenter_class', 'PackageFragmenter'); // CFG: NODE-RAW-PACKAGE-OUTPUT-STREAM $cfg->setConfigEntry('node_raw_package_output_stream', 'PackageOutputStream'); +// CFG: NODE-RAW-PACKAGE-INPUT-STREAM +$cfg->setConfigEntry('node_raw_package_input_stream', 'PackageInputStream'); + // CFG: PACKAGE-CHUNK-SIZE -$cfg->setConfigEntry('package_chunk_size', 8*512); +$cfg->setConfigEntry('package_chunk_size', 256*8); // CFG: CRUNCHER-TEST-UNITS-ENABLED $cfg->setConfigEntry('cruncher_test_units_enabled', 'Y'); diff --git a/application/hub/interfaces/package/class_Receivable.php b/application/hub/interfaces/package/class_Receivable.php new file mode 100644 index 000000000..9f52f77dd --- /dev/null +++ b/application/hub/interfaces/package/class_Receivable.php @@ -0,0 +1,41 @@ + + * @version 0.0.0 + * @copyright Copyright (c) 2007, 2008 Roland Haeder, 2009 - 2011 Hub Developer Team + * @license GNU GPL 3.0 or any newer version + * @link http://www.ship-simu.org + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +interface Receivable extends FrameworkInterface { + /** + * Checks wether new raw data from the socket has arrived + * + * @return $hasArrived Wether raw package data has arrived at a socket + */ + function isNewRawDataPending (); + + /** + * Checks wether a new package has arrived + * + * @return $hasArrived Wether a new package has arrived for processing + */ + function isNewPackageArrived (); +} + +// [EOF] +?> diff --git a/application/hub/main/filter/task/node/class_NodeTaskHandlerInitializerFilter.php b/application/hub/main/filter/task/node/class_NodeTaskHandlerInitializerFilter.php index 7e46b791d..ac4ffff0d 100644 --- a/application/hub/main/filter/task/node/class_NodeTaskHandlerInitializerFilter.php +++ b/application/hub/main/filter/task/node/class_NodeTaskHandlerInitializerFilter.php @@ -61,8 +61,17 @@ class NodeTaskHandlerInitializerFilter extends BaseFilter implements Filterable // Get a new task handler instance $handlerInstance = ObjectFactory::createObjectByConfiguredName('task_handler_class'); + // Generate socket listener task + $taskInstance = ObjectFactory::createObjectByConfiguredName('hub_socket_listener_task_class'); + + // Network package reader, needs to be delayed a little + $handlerInstance->registerTask('socket_listener', $taskInstance); + + // Generate package reader task + $taskInstance = ObjectFactory::createObjectByConfiguredName('hub_package_reader_task_class'); + // Network package reader, needs to be delayed a little - $handlerInstance->registerTask('network_package_reader', $nodeInstance->getListenerPoolInstance()); + $handlerInstance->registerTask('network_package_reader', $taskInstance); // Generate package writer task $taskInstance = ObjectFactory::createObjectByConfiguredName('hub_package_writer_task_class'); diff --git a/application/hub/main/handler/network/tcp/class_TcpNetworkPackageHandler.php b/application/hub/main/handler/network/tcp/class_TcpNetworkPackageHandler.php index 701e44497..61a9a941e 100644 --- a/application/hub/main/handler/network/tcp/class_TcpNetworkPackageHandler.php +++ b/application/hub/main/handler/network/tcp/class_TcpNetworkPackageHandler.php @@ -54,46 +54,13 @@ class TcpNetworkPackageHandler extends BaseNetworkPackageHandler implements Netw return $handlerInstance; } - /** - * Verifies the package data itself (only rudymentary check) - * - * @param $rawData Raw package data (as string - * @return $isValid Wether the package data is valid - */ - private function isPackageDataValid ($rawData) { - // Default is not valid - $isValid = false; - - // Convert it back into an array - $packageData = explode(NetworkPackage::PACKAGE_DATA_SEPERATOR, $rawData); - - // This should be at least three entries: sender|recipient|raw data - if (count($packageData) < 3) { - // Not enougth fields in $packageData! - $this->setErrorCode(self::PACKAGE_ERROR_INCOMPLETE_DATA); - } elseif (count(explode(NetworkPackage::PACKAGE_MASK_SEPERATOR, $packageData[NetworkPackage::INDEX_PACKAGE_CONTENT])) < 2) { - // Not entougth fields in content - $this->setErrorCode(self::PACKAGE_ERROR_INVALID_CONTENT); - } elseif (!$this->ifRecipientMatchesOwnAddress($packageData)) { - // Field 'recipient' doesn't match our address, this must always be the case - $this->setErrorCode(self::PACKAGE_ERROR_RECIPIENT_MISMATCH); - } else { - // This check went fine... - $isValid = true; - } - - // Return the result - return $isValid; - } - /** * Processes a package from given resource. This is mostly useful for TCP * package handling and is implemented in the TcpListener class * * @param $resource A valid resource identifier * @return void - * @throws InvalidResourceException If the given resource is invalid - * @todo ~10% done + * @todo We need to handle the raw data over to the NetworkPackage class if the state if fine */ public function processResourcePackage ($resource) { // Check the resource @@ -112,7 +79,10 @@ class TcpNetworkPackageHandler extends BaseNetworkPackageHandler implements Netw $this->debugOutput('HANDLER: Handling TCP package from peer ' . $resource); // Read the raw data from socket - $rawData = socket_read($resource, $this->getConfigInstance()->getConfigEntry('tcp_buffer_length'), PHP_NORMAL_READ); + $rawData = socket_read($resource, $this->getConfigInstance()->getConfigEntry('tcp_buffer_length'), PHP_BINARY_READ); + + // Debug output of read data length + $this->debugOutput('rawData[]=' . strlen($rawData)); // Is it valid? if (($rawData === false) || (socket_last_error($resource) > 0)) { @@ -121,24 +91,9 @@ class TcpNetworkPackageHandler extends BaseNetworkPackageHandler implements Netw } elseif (empty($rawData)) { // The peer did send nothing to us $this->setErrorCode(self::SOCKET_ERROR_EMPTY_DATA); - } elseif (!$this->isPackageDataValid($rawData)) { - // Invalid package data - if ($this->getErrorCode() == self::SOCKET_ERROR_UNHANDLED) { - // Set it to PACKAGE_ERROR_INVALID_DATA, because SOCKET_ERROR_UNHANDLED should not be used - $this->setErrorCode(self::PACKAGE_ERROR_INVALID_DATA); - } // END - if - } else { - // Prepare the package data - $packageData = explode(NetworkPackage::PACKAGE_DATA_SEPERATOR, $rawData); - - // Low-level checks are all green - $this->setErrorCode(self::PACKAGE_LEVEL_CHECK_OKAY); } - // Debug output of error code - $this->debugOutput('errorCode=' . $this->getErrorCode()); - - // Get a state from the resolver for this package + // Get a state from the resolver for this package data array $stateInstance = $this->getResolverInstance()->resolveStateByPackage($this, $packageData, $resource); die('UNFINISHED:'.$stateInstance->__toString()."\n"); } diff --git a/application/hub/main/helper/connection/class_BaseConnectionHelper.php b/application/hub/main/helper/connection/class_BaseConnectionHelper.php index 4771e4d66..c88e89e53 100644 --- a/application/hub/main/helper/connection/class_BaseConnectionHelper.php +++ b/application/hub/main/helper/connection/class_BaseConnectionHelper.php @@ -43,9 +43,9 @@ class BaseConnectionHelper extends BaseHubHelper implements Registerable, Protoc private $sentData = 0; /** - * Offset + * Difference */ - private $offset = 0; + private $diff = 0; /** * Connect retries for this connection @@ -167,7 +167,13 @@ class BaseConnectionHelper extends BaseHubHelper implements Registerable, Protoc } // Implode the package data array and fragement the resulting string, returns the final hash - $this->currentFinalHash = $fragmenterInstance->fragmentPackageArray($packageData, $this); + $finalHash = $fragmenterInstance->fragmentPackageArray($packageData, $this); + if ($finalHash !== true) { + $this->currentFinalHash = $finalHash; + } // END - if + + // Debug message + //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: currentFinalHash=' . $this->currentFinalHash); // Get the next raw data chunk from the fragmenter $rawDataChunk = $fragmenterInstance->getNextRawDataChunk($this->currentFinalHash); @@ -176,11 +182,18 @@ class BaseConnectionHelper extends BaseHubHelper implements Registerable, Protoc $chunkHashes = array_keys($rawDataChunk); $chunkData = array_values($rawDataChunk); - // Remember this chunk as queued - $this->queuedChunks[$chunkHashes[0]] = $chunkData[0]; + // Is the required data there? + //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: chunkHashes[]=' . count($chunkHashes) . ',chunkData[]=' . count($chunkData)); + if ((isset($chunkHashes[0])) && (isset($chunkData[0]))) { + // Remember this chunk as queued + $this->queuedChunks[$chunkHashes[0]] = $chunkData[0]; - // Return the raw data - return $chunkData[0]; + // Return the raw data + return $chunkData[0]; + } else { + // Return zero string + return ''; + } } /** @@ -197,43 +210,88 @@ class BaseConnectionHelper extends BaseHubHelper implements Registerable, Protoc /** * Sends raw package data to the recipient * - * @param $packageData Raw package data - * @return $sentBytes Actual sent bytes to the peer + * @param $packageData Raw package data + * @return $totalSentBytes Total sent bytes to the peer * @throws InvalidSocketException If we got a problem with this socket */ public function sendRawPackageData (array $packageData) { - // Convert the package data array to a raw data stream - $rawData = $this->getRawDataFromPackageArray($packageData); + // Cache buffer length + $bufferSize = $this->getConfigInstance()->getConfigEntry($this->getProtocol() . '_buffer_length'); + + // Init variables + $rawData = ''; + $dataStream = ' '; + $totalSentBytes = 0; + + // Fill sending buffer with data + while ((strlen($rawData) < $bufferSize) && (strlen($dataStream) > 0)) { + // Convert the package data array to a raw data stream + $dataStream = $this->getRawDataFromPackageArray($packageData); + //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: Adding ' . strlen($dataStream) . ' bytes to the sending buffer ...'); + $rawData .= $dataStream; + } // END - while + + // Nothing to sent is bad news! + assert(strlen($rawData) > 0); + + // Calculate difference + $this->diff = $bufferSize - strlen($rawData); // Get socket resource $socketResource = $this->getSocketResource(); - // And deliver it - $sentBytes = @socket_write($socketResource, $rawData, $this->getConfigInstance()->getConfigEntry($this->getProtocol() . '_buffer_length') - $this->offset); + // Init sent bytes + $sentBytes = 0; - // If there was an error, we don't continue here - if ($sentBytes === false) { - // Get socket error code for verification - $socketError = socket_last_error($socketResource); + // Deliver all data + while ($sentBytes !== false) { + // And deliver it + //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: Sending out ' . strlen($rawData) . ' bytes,bufferSize=' . $bufferSize . ',diff=' . $this->diff); + $sentBytes = @socket_write($socketResource, $rawData, ($bufferSize - $this->diff)); - // Get error message - $errorMessage = socket_strerror($socketError); + // If there was an error, we don't continue here + if ($sentBytes === false) { + // Get socket error code for verification + $socketError = socket_last_error($socketResource); - // Shutdown this socket - $this->shutdownSocket($socketResource); + // Get error message + $errorMessage = socket_strerror($socketError); - // And throw it - throw new InvalidSocketException(array($this, gettype($socketResource), $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET); - } elseif ($sentBytes == 0) { - // Nothing sent is bad news - die(__METHOD__.': Unhandled 0 sent bytes! rawData[]=' . strlen($rawData)); - } + // Shutdown this socket + $this->shutdownSocket($socketResource); + + // And throw it + throw new InvalidSocketException(array($this, gettype($socketResource), $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET); + } elseif (($sentBytes == 0) && (strlen($rawData) > 0)) { + // Nothing sent means we are done + //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: All sent! (' . __LINE__ . ')'); + break; + } + + // The difference between sent bytes and length of raw data should not be below zero + assert((strlen($rawData) - $sentBytes) >= 0); + + // Add total sent bytes + $totalSentBytes += $sentBytes; + + // Cut out the last unsent bytes + //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: Sent out ' . $sentBytes . ' of ' . strlen($rawData) . ' bytes ...'); + $rawData = substr($rawData, $sentBytes); + + // Calculate difference again + $this->diff = $bufferSize - strlen($rawData); - // The difference between sent bytes and length of raw data should not be below zero - assert((strlen($rawData) - $sentBytes) >= 0); + // Can we abort? + if (strlen($rawData) <= 0) { + // Abort here, all sent! + //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: All sent! (' . __LINE__ . ')'); + break; + } // END - if + } // END - while // Return sent bytes - return $sentBytes; + //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: totalSentBytes=' . $totalSentBytes); + return $totalSentBytes; } /** diff --git a/application/hub/main/listener/tcp/class_TcpListener.php b/application/hub/main/listener/tcp/class_TcpListener.php index b2325c418..54842ca6a 100644 --- a/application/hub/main/listener/tcp/class_TcpListener.php +++ b/application/hub/main/listener/tcp/class_TcpListener.php @@ -92,9 +92,11 @@ class TcpListener extends BaseListener implements Listenable { throw new InvalidSocketException(array($this, gettype($mainSocket), $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET); } // END - if - // Now, we want non-blocking mode - $this->debugOutput('LISTENER: Setting non-blocking mode.'); - if (!socket_set_nonblock($mainSocket)) { + // "Bind" the socket to the given address, on given port so this means + // that all connections on this port are now our resposibility to + // send/recv data, disconnect, etc.. + $this->debugOutput('LISTENER: Binding to address ' . $this->getListenAddress() . ':' . $this->getListenPort()); + if (!socket_bind($mainSocket, $this->getListenAddress(), $this->getListenPort())) { // Get socket error code for verification $socketError = socket_last_error($mainSocket); @@ -108,11 +110,9 @@ class TcpListener extends BaseListener implements Listenable { throw new InvalidSocketException(array($this, gettype($mainSocket), $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET); } // END - if - // "Bind" the socket to the given address, on given port so this means - // that all connections on this port are now our resposibility to - // send/recv data, disconnect, etc.. - $this->debugOutput('LISTENER: Binding to address ' . $this->getListenAddress() . ':' . $this->getListenPort()); - if (!socket_bind($mainSocket, $this->getListenAddress(), $this->getListenPort())) { + // Start listen for connections + $this->debugOutput('LISTENER: Listening for connections.'); + if (!socket_listen($mainSocket)) { // Get socket error code for verification $socketError = socket_last_error($mainSocket); @@ -126,9 +126,9 @@ class TcpListener extends BaseListener implements Listenable { throw new InvalidSocketException(array($this, gettype($mainSocket), $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET); } // END - if - // Start listen for connections - $this->debugOutput('LISTENER: Listening for connections.'); - if (!socket_listen($mainSocket)) { + // Now, we want non-blocking mode + $this->debugOutput('LISTENER: Setting non-blocking mode.'); + if (!socket_set_nonblock($mainSocket)) { // Get socket error code for verification $socketError = socket_last_error($mainSocket); @@ -202,6 +202,7 @@ class TcpListener extends BaseListener implements Listenable { if (in_array($this->getSocketResource(), $readers)) { // Then accept it $newSocket = socket_accept($this->getSocketResource()); + //* NOISY-DEBUG: */ $this->debugOutput('LISTENER: newSocket=' . $newSocket); // We want non-blocking here, too if (!socket_set_nonblock($newSocket)) { @@ -234,6 +235,7 @@ class TcpListener extends BaseListener implements Listenable { // Handle it here, if not main socket if ($currentSocket != $this->getSocketResource()) { // ... or else it will raise warnings like 'Transport endpoint is not connected' + //* NOISY-DEBUG: */ $this->debugOutput('LISTENER: currentSocket=' . $currentSocket); $this->getPackageInstance()->processResourcePackage($currentSocket); } // END - if diff --git a/application/hub/main/package/class_NetworkPackage.php b/application/hub/main/package/class_NetworkPackage.php index 0f93a3946..9a4ce15df 100644 --- a/application/hub/main/package/class_NetworkPackage.php +++ b/application/hub/main/package/class_NetworkPackage.php @@ -1,9 +1,10 @@ . */ -class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Registerable { +class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Receivable, Registerable { /** * Package mask for compressing package data: * 0: Compressor extension @@ -106,7 +107,7 @@ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Registe const NETWORK_TARGET_SELF = 'self'; /** - * TCP package size + * TCP package size in bytes */ const TCP_PACKAGE_SIZE = 512; @@ -167,6 +168,10 @@ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Registe return $hash; } + /////////////////////////////////////////////////////////////////////////// + // Delivering packages / raw data + /////////////////////////////////////////////////////////////////////////// + /** * Delivers the given raw package data. * @@ -270,7 +275,7 @@ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Registe return; } // END - if - // Sent it away (we catch exceptions one method above + // Sent it away (we catch exceptions one method above) $sentBytes = $connectionInstance->sendRawPackageData($packageData); // Remember unsent raw bytes in back-buffer, if any @@ -435,6 +440,28 @@ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Registe $this->debugOutput('PACKAGE: Package was not delivered: ' . $e->getMessage()); } } + + /////////////////////////////////////////////////////////////////////////// + // Receiving packages / raw data + /////////////////////////////////////////////////////////////////////////// + + /** + * Checks wether new raw package data has arrived at a socket + * + * @return $hasArrived Wether new raw package data has arrived for processing + */ + public function isNewRawDataPending () { + // @TODO Add some content here + } + + /** + * Checks wether a new package has arrived + * + * @return $hasArrived Wether a new package has arrived for processing + */ + public function isNewPackageArrived () { + // @TODO Add some content here + } } // [EOF] diff --git a/application/hub/main/package/fragmenter/class_PackageFragmenter.php b/application/hub/main/package/fragmenter/class_PackageFragmenter.php index e7d2349ff..1cf6db7ea 100644 --- a/application/hub/main/package/fragmenter/class_PackageFragmenter.php +++ b/application/hub/main/package/fragmenter/class_PackageFragmenter.php @@ -302,6 +302,7 @@ class PackageFragmenter extends BaseFrameworkSystem implements Fragmentable, Reg // Calculate real (data) chunk size $dataChunkSize = $this->getDataChunkSizeFromHash($finalHash); + //* NOISY-DEBUG: */ $this->debugOutput('FRAGMENTER: dataChunkSize=' . $dataChunkSize); // Now split it up for ($idx = 0; $idx < strlen($encodedData); $idx += $dataChunkSize) { @@ -319,6 +320,7 @@ class PackageFragmenter extends BaseFrameworkSystem implements Fragmentable, Reg assert(strlen($chunk) <= NetworkPackage::TCP_PACKAGE_SIZE); // Add it to the array + //* NOISY-DEBUG: */ $this->debugOutput('FRAGMENTER: Adding ' . strlen($chunk) . ' bytes of a chunk.'); $this->chunks[$finalHash][] = $chunk; } // END - for @@ -362,6 +364,7 @@ class PackageFragmenter extends BaseFrameworkSystem implements Fragmentable, Reg assert(strlen($chunk) <= NetworkPackage::TCP_PACKAGE_SIZE); // Add it to the array + //* NOISY-DEBUG: */ $this->debugOutput('FRAGMENTER: Adding ' . strlen($chunk) . ' bytes of a chunk.'); array_unshift($this->chunks[$finalHash], $chunk); } // END - for } @@ -409,6 +412,7 @@ class PackageFragmenter extends BaseFrameworkSystem implements Fragmentable, Reg } // Return final hash + //* NOISY-DEBUG: */ $this->debugOutput('FRAGMENTER: finalHash[' . gettype($finalHash) . ']=' . $finalHash); return $finalHash; } diff --git a/application/hub/main/pools/class_BasePool.php b/application/hub/main/pools/class_BasePool.php index 26178b84c..458dc656d 100644 --- a/application/hub/main/pools/class_BasePool.php +++ b/application/hub/main/pools/class_BasePool.php @@ -87,7 +87,7 @@ class BasePool extends BaseHubSystem implements Visitable { */ public function accept (Visitor $visitorInstance) { // Debug message - //* DEBUG: */ $this->debugOutput('POOL: ' . $visitorInstance->__toString() . ' has visited - START'); + //* NOISY-DEBUG: */ $this->debugOutput('POOL: ' . $visitorInstance->__toString() . ' has visited - START'); // Visit this pool $visitorInstance->visitPool($this); @@ -117,7 +117,7 @@ class BasePool extends BaseHubSystem implements Visitable { } // END - while // Debug message - //* DEBUG: */ $this->debugOutput('POOL: ' . $visitorInstance->__toString() . ' has visited - FINISHED'); + //* NOISY-DEBUG: */ $this->debugOutput('POOL: ' . $visitorInstance->__toString() . ' has visited - FINISHED'); } /** diff --git a/application/hub/main/pools/peer/class_DefaultPeerPool.php b/application/hub/main/pools/peer/class_DefaultPeerPool.php index fc62b4134..daab465ee 100644 --- a/application/hub/main/pools/peer/class_DefaultPeerPool.php +++ b/application/hub/main/pools/peer/class_DefaultPeerPool.php @@ -104,11 +104,11 @@ class DefaultPeerPool extends BasePool implements PoolablePeer { } // END - if } else { // Server sockets won't work with socket_getpeername() - $this->debugOutput('POOL: Socket resource is server socket. This is not a bug.'); + $this->debugOutput('POOL: Socket resource is server socket (' . $socketResource . '). This is not a bug.'); } // Output error message - $this->debugOutput('POOL: Adding peer ' . $peerName); + $this->debugOutput('POOL: Adding peer ' . $peerName . ', socketResource=' . $socketResource); // Add it finally to the pool $this->addPoolEntry($socketResource); diff --git a/application/hub/main/streams/package/input/class_PackageInputStream.php b/application/hub/main/streams/package/input/class_PackageInputStream.php new file mode 100644 index 000000000..aa9548f61 --- /dev/null +++ b/application/hub/main/streams/package/input/class_PackageInputStream.php @@ -0,0 +1,65 @@ + + * @version 0.0.0 + * @copyright Copyright (c) 2007, 2008 Roland Haeder, 2009 Developer Team + * @license GNU GPL 3.0 or any newer version + * @link http://www.ship-simu.org + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +class PackageInputStream extends BaseStream implements InputStreamable { + /** + * Protected constructor + * + * @return void + */ + protected function __construct () { + // Call parent constructor + parent::__construct(__CLASS__); + } + + /** + * Creates an instance of this node class + * + * @return $streamInstance An instance of this node class + */ + public final static function createPackageInputStream () { + // Get a new instance + $streamInstance = new PackageInputStream(); + + // Return the instance + return $streamInstance; + } + + /** + * Streams the data and maybe does something to it + * + * @param $data The data (string mostly) to "stream" + * @return $data The data (string mostly) to "stream" + * @todo Do we need to do something more here? + */ + public function streamData ($data) { + // Encode the data with BASE64 encoding + $data = base64_decode($data); + + // Return it + return $data; + } +} + +// [EOF] +?> diff --git a/application/hub/main/streams/package/output/class_PackageOutputStream.php b/application/hub/main/streams/package/output/class_PackageOutputStream.php index af98e5eef..35b382ae3 100644 --- a/application/hub/main/streams/package/output/class_PackageOutputStream.php +++ b/application/hub/main/streams/package/output/class_PackageOutputStream.php @@ -50,7 +50,7 @@ class PackageOutputStream extends BaseStream implements OutputStreamable { * * @param $data The data (string mostly) to "stream" * @return $data The data (string mostly) to "stream" - * @throws UnsupportedOperationException If this method is called + * @todo Do we need to do something more here? */ public function streamData ($data) { // Encode the data with BASE64 encoding diff --git a/application/hub/main/tasks/cruncher/class_CruncherKeyProducerTask.php b/application/hub/main/tasks/cruncher/class_CruncherKeyProducerTask.php index 7e1c4a2d8..43673a23b 100644 --- a/application/hub/main/tasks/cruncher/class_CruncherKeyProducerTask.php +++ b/application/hub/main/tasks/cruncher/class_CruncherKeyProducerTask.php @@ -35,7 +35,7 @@ class CruncherKeyProducerTask extends BaseTask implements Taskable, Visitable { /** * Creates an instance of this class * - * @return $taskInstance An instance of a Visitable class + * @return $taskInstance An instance of a Taskable/Visitable class */ public final static function createCruncherKeyProducerTask () { // Get new instance diff --git a/application/hub/main/tasks/cruncher/class_CruncherTestUnitProducerTask.php b/application/hub/main/tasks/cruncher/class_CruncherTestUnitProducerTask.php index 5b62110b9..f47f633e3 100644 --- a/application/hub/main/tasks/cruncher/class_CruncherTestUnitProducerTask.php +++ b/application/hub/main/tasks/cruncher/class_CruncherTestUnitProducerTask.php @@ -35,7 +35,7 @@ class CruncherTestUnitProducerTask extends BaseTask implements Taskable, Visitab /** * Creates an instance of this class * - * @return $taskInstance An instance of a Visitable class + * @return $taskInstance An instance of a Taskable/Visitable class */ public final static function createCruncherTestUnitProducerTask () { // Get new instance diff --git a/application/hub/main/tasks/cruncher/class_CruncherWorkUnitFetcherTask.php b/application/hub/main/tasks/cruncher/class_CruncherWorkUnitFetcherTask.php index 5c7586069..d8bca71d9 100644 --- a/application/hub/main/tasks/cruncher/class_CruncherWorkUnitFetcherTask.php +++ b/application/hub/main/tasks/cruncher/class_CruncherWorkUnitFetcherTask.php @@ -36,7 +36,7 @@ class CruncherWorkUnitFetcherTask extends BaseTask implements Taskable, Visitabl /** * Creates an instance of this class * - * @return $taskInstance An instance of a Visitable class + * @return $taskInstance An instance of a Taskable/Visitable class */ public final static function createCruncherWorkUnitFetcherTask () { // Get new instance diff --git a/application/hub/main/tasks/hub/announcement/class_HubSelfAnnouncementTask.php b/application/hub/main/tasks/hub/announcement/class_HubSelfAnnouncementTask.php index d4c3bcc2f..316a986c9 100644 --- a/application/hub/main/tasks/hub/announcement/class_HubSelfAnnouncementTask.php +++ b/application/hub/main/tasks/hub/announcement/class_HubSelfAnnouncementTask.php @@ -35,7 +35,7 @@ class HubSelfAnnouncementTask extends BaseTask implements Taskable, Visitable { /** * Creates an instance of this class * - * @return $taskInstance An instance of a Visitable class + * @return $taskInstance An instance of a Taskable/Visitable class */ public static final function createHubSelfAnnouncementTask () { // Get new instance diff --git a/application/hub/main/tasks/hub/class_HubSelfConnectTask.php b/application/hub/main/tasks/hub/class_HubSelfConnectTask.php index 811a7c138..616c25a57 100644 --- a/application/hub/main/tasks/hub/class_HubSelfConnectTask.php +++ b/application/hub/main/tasks/hub/class_HubSelfConnectTask.php @@ -35,7 +35,7 @@ class HubSelfConnectTask extends BaseTask implements Taskable, Visitable { /** * Creates an instance of this class * - * @return $taskInstance An instance of a Visitable class + * @return $taskInstance An instance of a Taskable/Visitable class */ public static final function createHubSelfConnectTask () { // Get new instance diff --git a/application/hub/main/tasks/hub/class_HubSocketListenerTask.php b/application/hub/main/tasks/hub/class_HubSocketListenerTask.php new file mode 100644 index 000000000..139a2b4b2 --- /dev/null +++ b/application/hub/main/tasks/hub/class_HubSocketListenerTask.php @@ -0,0 +1,76 @@ + + * @version 0.0.0 + * @copyright Copyright (c) 2007, 2008 Roland Haeder, 2009 - 2011 Hub Developer Team + * @license GNU GPL 3.0 or any newer version + * @link http://www.ship-simu.org + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +class HubSocketListenerTask extends BaseTask implements Taskable, Visitable { + /** + * Protected constructor + * + * @return void + */ + protected function __construct () { + // Call parent constructor + parent::__construct(__CLASS__); + } + + /** + * Creates an instance of this class + * + * @return $taskInstance An instance of a Taskable/Visitable class + */ + public static final function createHubSocketListenerTask () { + // Get new instance + $taskInstance = new HubSocketListenerTask(); + + // Return the prepared instance + return $taskInstance; + } + + /** + * Accepts the visitor to process the visit "request" + * + * @param $visitorInstance An instance of a Visitor class + * @return void + */ + public function accept (Visitor $visitorInstance) { + // Get the node instance from registry + $nodeInstance = Registry::getRegistry()->getInstance('node'); + + // Visit the pool listener task + $nodeInstance->getListenerPoolInstance()->accept($visitorInstance); + + // Visit this task + // @TODO Do we need to visit this task? $visitorInstance->visitTask($this); + } + + /** + * Executes the task + * + * @return void + * @todo 0% done + */ + public function executeTask () { + } +} + +// [EOF] +?> diff --git a/application/hub/main/tasks/hub/ping/class_HubPingTask.php b/application/hub/main/tasks/hub/ping/class_HubPingTask.php index cda20d125..19ad14978 100644 --- a/application/hub/main/tasks/hub/ping/class_HubPingTask.php +++ b/application/hub/main/tasks/hub/ping/class_HubPingTask.php @@ -21,7 +21,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ -class HubPingTask extends BaseTask implements Visitable, Taskable { +class HubPingTask extends BaseTask implements Taskable, Visitable { /** * Protected constructor * @@ -36,7 +36,7 @@ class HubPingTask extends BaseTask implements Visitable, Taskable { * Creates an instance of this class * * @param $listInstance A Listable instance - * @return $taskInstance An instance of a Visitable class + * @return $taskInstance An instance of a Taskable/Visitable class */ public static final function createHubPingTask (Listable $listInstance) { // Get new instance diff --git a/application/hub/main/tasks/hub/update/class_HubUpdateCheckTask.php b/application/hub/main/tasks/hub/update/class_HubUpdateCheckTask.php index b733b13c4..e4ad02579 100644 --- a/application/hub/main/tasks/hub/update/class_HubUpdateCheckTask.php +++ b/application/hub/main/tasks/hub/update/class_HubUpdateCheckTask.php @@ -21,7 +21,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ -class HubUpdateCheckTask extends BaseTask implements Visitable, Taskable { +class HubUpdateCheckTask extends BaseTask implements Taskable, Visitable { /** * Protected constructor * @@ -35,7 +35,7 @@ class HubUpdateCheckTask extends BaseTask implements Visitable, Taskable { /** * Creates an instance of this class * - * @return $taskInstance An instance of a Visitable class + * @return $taskInstance An instance of a Taskable/Visitable class */ public static final function createHubUpdateCheckTask () { // Get new instance diff --git a/application/hub/main/tasks/idle/class_IdleLoopTask.php b/application/hub/main/tasks/idle/class_IdleLoopTask.php index c355472e2..c4fc5d53e 100644 --- a/application/hub/main/tasks/idle/class_IdleLoopTask.php +++ b/application/hub/main/tasks/idle/class_IdleLoopTask.php @@ -21,7 +21,7 @@ * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ -class IdleLoopTask extends BaseTask implements Visitable, Taskable { +class IdleLoopTask extends BaseTask implements Taskable, Visitable { /** * Protected constructor * @@ -35,7 +35,7 @@ class IdleLoopTask extends BaseTask implements Visitable, Taskable { /** * Creates an instance of this class * - * @return $taskInstance An instance of a Visitable class + * @return $taskInstance An instance of a Taskable/Visitable class */ public static final function createIdleLoopTask () { // Get new instance diff --git a/application/hub/main/tasks/network/class_NetworkPackageReaderTask.php b/application/hub/main/tasks/network/class_NetworkPackageReaderTask.php new file mode 100644 index 000000000..3f847aa83 --- /dev/null +++ b/application/hub/main/tasks/network/class_NetworkPackageReaderTask.php @@ -0,0 +1,80 @@ + + * @version 0.0.0 + * @copyright Copyright (c) 2007, 2008 Roland Haeder, 2009 - 2011 Hub Developer Team + * @license GNU GPL 3.0 or any newer version + * @link http://www.ship-simu.org + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + */ +class NetworkPackageReaderTask extends BaseTask implements Taskable, Visitable { + /** + * Protected constructor + * + * @return void + */ + protected function __construct () { + // Call parent constructor + parent::__construct(__CLASS__); + } + + /** + * Creates an instance of this class + * + * @return $taskInstance An instance of a Taskable/Visitable class + */ + public static final function createNetworkPackageReaderTask () { + // Get new instance + $taskInstance = new NetworkPackageReaderTask(); + + // Return the prepared instance + return $taskInstance; + } + + /** + * Accepts the visitor to process the visit "request" + * + * @param $visitorInstance An instance of a Visitor class + * @return void + */ + public function accept (Visitor $visitorInstance) { + // Visit this task + $visitorInstance->visitTask($this); + } + + /** + * Executes the task + * + * @return void + */ + public function executeTask () { + // Get a singleton network package instance + $packageInstance = NetworkPackageFactory::createNetworkPackageInstance(); + + // Do we have something to handle? + if ($packageInstance->isNewRawDataPending()) { + // We have to handle raw data from the socket + $packageInstance->handleIncomingSocketRawData(); + } elseif ($packageInstance->isNewPackageArrived()) { + // Okay, then handle newly arrived package + $packageInstance->handleNewlyArrivedPackage(); + } // END - if + } +} + +// [EOF] +?> diff --git a/application/hub/main/tasks/network/class_NetworkPackageWriterTask.php b/application/hub/main/tasks/network/class_NetworkPackageWriterTask.php index 2bba76f86..39874dedd 100644 --- a/application/hub/main/tasks/network/class_NetworkPackageWriterTask.php +++ b/application/hub/main/tasks/network/class_NetworkPackageWriterTask.php @@ -35,7 +35,7 @@ class NetworkPackageWriterTask extends BaseTask implements Taskable, Visitable { /** * Creates an instance of this class * - * @return $taskInstance An instance of a Visitable class + * @return $taskInstance An instance of a Taskable/Visitable class */ public static final function createNetworkPackageWriterTask () { // Get new instance -- 2.39.5