* @version 0.0.0 * @copyright Copyright (c) 2007, 2008 Roland Haeder, 2009 - 2011 Hub Developer Team * @license GNU GPL 3.0 or any newer version * @link http://www.ship-simu.org * @todo Needs to add functionality for handling the object's type * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Receivable, Registerable { /** * Package mask for compressing package data: * 0: Compressor extension * 1: Raw package data * 2: Tags, seperated by semicolons, no semicolon is required if only one tag is needed * 3: Checksum * 0 1 2 3 */ const PACKAGE_MASK = '%s:%s:%s:%s'; /** * Seperator for the above mask */ const PACKAGE_MASK_SEPERATOR = ':'; /** * Seperator for checksum */ const PACKAGE_CHECKSUM_SEPERATOR = ':'; /** * Array indexes for above mask, start with zero */ const INDEX_COMPRESSOR_EXTENSION = 0; const INDEX_PACKAGE_DATA = 1; const INDEX_TAGS = 2; const INDEX_CHECKSUM = 3; /** * Array indexes for raw package array */ const INDEX_PACKAGE_SENDER = 0; const INDEX_PACKAGE_RECIPIENT = 1; const INDEX_PACKAGE_CONTENT = 2; /** * Named array elements for package data */ const PACKAGE_DATA_SENDER = 'sender'; const PACKAGE_DATA_RECIPIENT = 'recipient'; const PACKAGE_DATA_CONTENT = 'content'; /** * Tags seperator */ const PACKAGE_TAGS_SEPERATOR = ';'; /** * Raw package data seperator */ const PACKAGE_DATA_SEPERATOR = '#'; /** * Stacker name for "undeclared" packages */ const STACKER_NAME_UNDECLARED = 'package_undeclared'; /** * Stacker name for "declared" packages (which are ready to send out) */ const STACKER_NAME_DECLARED = 'package_declared'; /** * Stacker name for "out-going" packages */ const STACKER_NAME_OUTGOING = 'package_outgoing'; /** * Stacker name for "incoming" decoded raw data */ const STACKER_NAME_DECODED_INCOMING = 'package_decoded_data'; /** * Stacker name for handled decoded raw data */ const STACKER_NAME_DECODED_HANDLED = 'package_handled_decoded'; /** * Stacker name for "back-buffered" packages */ const STACKER_NAME_BACK_BUFFER = 'package_backbuffer'; /** * Network target (alias): 'upper hubs' */ const NETWORK_TARGET_UPPER_HUBS = 'upper'; /** * Network target (alias): 'self' */ const NETWORK_TARGET_SELF = 'self'; /** * TCP package size in bytes */ const TCP_PACKAGE_SIZE = 512; /** * Protected constructor * * @return void */ protected function __construct () { // Call parent constructor parent::__construct(__CLASS__); } /** * Creates an instance of this class * * @param $compressorInstance A Compressor instance for compressing the content * @return $packageInstance An instance of a Deliverable class */ public static final function createNetworkPackage (Compressor $compressorInstance) { // Get new instance $packageInstance = new NetworkPackage(); // Now set the compressor instance $packageInstance->setCompressorInstance($compressorInstance); /* * We need to initialize a stack here for our packages even for those * which have no recipient address and stamp... ;-) This stacker will * also be used for incoming raw data to handle it. */ $stackerInstance = ObjectFactory::createObjectByConfiguredName('network_package_stacker_class'); // At last, set it in this class $packageInstance->setStackerInstance($stackerInstance); // Init all stacker $packageInstance->initStackers(); // Get a visitor instance for speeding up things $visitorInstance = ObjectFactory::createObjectByConfiguredName('node_raw_data_monitor_visitor_class', array($packageInstance)); // Set it in this package $packageInstance->setVisitorInstance($visitorInstance); // Return the prepared instance return $packageInstance; } /** * Initialize all stackers * * @return void */ protected function initStackers () { // Initialize all foreach ( array( self::STACKER_NAME_UNDECLARED, self::STACKER_NAME_DECLARED, self::STACKER_NAME_OUTGOING, self::STACKER_NAME_DECODED_INCOMING, self::STACKER_NAME_DECODED_HANDLED, self::STACKER_NAME_BACK_BUFFER ) as $stackerName) { // Init this stacker $this->getStackerInstance()->initStacker($stackerName); } // END - foreach } /** * "Getter" for hash from given content and helper instance * * @param $content Raw package content * @param $helperInstance An instance of a HelpableHub class * @param $nodeInstance An instance of a NodeHelper class * @return $hash Hash for given package content * @todo $helperInstance is unused */ private function getHashFromContent ($content, HelpableHub $helperInstance, NodeHelper $nodeInstance) { // Create the hash // @TODO crc32() is not very strong, but it needs to be fast $hash = crc32( $content . self::PACKAGE_CHECKSUM_SEPERATOR . $nodeInstance->getSessionId() . self::PACKAGE_CHECKSUM_SEPERATOR . $this->getCompressorInstance()->getCompressorExtension() ); // And return it return $hash; } /////////////////////////////////////////////////////////////////////////// // Delivering packages / raw data /////////////////////////////////////////////////////////////////////////// /** * Delivers the given raw package data. * * @param $packageData Raw package data in an array * @return void */ private function declareRawPackageData (array $packageData) { /* * We need to disover every recipient, just in case we have a * multi-recipient entry like 'upper' is. 'all' may be a not so good * target because it causes an overload on the network and may be * abused for attacking the network with large packages. */ $discoveryInstance = PackageDiscoveryFactory::createPackageDiscoveryInstance(); // Discover all recipients, this may throw an exception $discoveryInstance->discoverRecipients($packageData); // Now get an iterator $iteratorInstance = $discoveryInstance->getIterator(); // ... and begin iteration while ($iteratorInstance->valid()) { // Get current entry $currentRecipient = $iteratorInstance->current(); // Debug message $this->debugOutput('PACKAGE: Package declared for recipient ' . $currentRecipient); // Set the recipient $packageData[self::PACKAGE_DATA_RECIPIENT] = $currentRecipient; // And enqueue it to the writer class $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECLARED, $packageData); // Skip to next entry $iteratorInstance->next(); } // END - while // Clean-up the list $discoveryInstance->clearRecipients(); } /** * Delivers raw package data. In short, this will discover the raw socket * resource through a discovery class (which will analyse the receipient of * the package), register the socket with the connection (handler/helper?) * instance and finally push the raw data on our outgoing queue. * * @param $packageData Raw package data in an array * @return void */ private function deliverRawPackageData (array $packageData) { /* * This package may become big, depending on the shared object size or * delivered message size which shouldn't be so long (to save * bandwidth). Because of the nature of the used protocol (TCP) we need * to split it up into smaller pieces to fit it into a TCP frame. * * So first we need (again) a discovery class but now a protocol * discovery to choose the right socket resource. The discovery class * should take a look at the raw package data itself and then decide * which (configurable!) protocol should be used for that type of * package. */ $discoveryInstance = SocketDiscoveryFactory::createSocketDiscoveryInstance(); // Now discover the right protocol $socketResource = $discoveryInstance->discoverSocket($packageData); // Debug message //* NOISY-DEBUG: */ $this->debugOutput('NETWORK-PACKAGE: Reached line ' . __LINE__ . ' after discoverSocket() has been called.'); // We have to put this socket in our registry, so get an instance $registryInstance = SocketRegistry::createSocketRegistry(); // Get the listener from registry $helperInstance = Registry::getRegistry()->getInstance('connection'); // Debug message //* NOISY-DEBUG: */ $this->debugOutput('NETWORK-PACKAGE: Reached line ' . __LINE__ . ' before isSocketRegistered() has been called.'); // Is it not there? if ((is_resource($socketResource)) && (!$registryInstance->isSocketRegistered($helperInstance, $socketResource))) { // Then register it $registryInstance->registerSocket($helperInstance, $socketResource, $packageData); } // END - if // Debug message //* NOISY-DEBUG: */ $this->debugOutput('NETWORK-PACKAGE: Reached line ' . __LINE__ . ' after isSocketRegistered() has been called.'); // Make sure the connection is up $helperInstance->getStateInstance()->validatePeerStateConnected(); // Debug message //* NOISY-DEBUG: */ $this->debugOutput('NETWORK-PACKAGE: Reached line ' . __LINE__ . ' after validatePeerStateConnected() has been called.'); // We enqueue it again, but now in the out-going queue $this->getStackerInstance()->pushNamed(self::STACKER_NAME_OUTGOING, $packageData); } /** * Sends waiting packages * * @param $packageData Raw package data * @return void */ private function sendOutgoingRawPackageData (array $packageData) { // Init sent bytes $sentBytes = 0; // Get the right connection instance $helperInstance = SocketRegistry::createSocketRegistry()->getHandlerInstanceFromPackageData($packageData); // Is this connection still alive? if ($helperInstance->isShuttedDown()) { // This connection is shutting down // @TODO We may want to do somthing more here? return; } // END - if // Sent out package data $sentBytes = $helperInstance->sendRawPackageData($packageData); // Remember unsent raw bytes in back-buffer, if any $this->storeUnsentBytesInBackBuffer($packageData, $sentBytes); } /** * "Enqueues" raw content into this delivery class by reading the raw content * from given template instance and pushing it on the 'undeclared' stack. * * @param $helperInstance An instance of a HelpableHub class * @param $nodeInstance An instance of a NodeHelper class * @return void */ public function enqueueRawDataFromTemplate (HelpableHub $helperInstance, NodeHelper $nodeInstance) { // Get the raw content ... $content = $helperInstance->getTemplateInstance()->getRawTemplateData(); // ... and compress it $content = $this->getCompressorInstance()->compressStream($content); // Add magic in front of it and hash behind it, including BASE64 encoding $content = sprintf(self::PACKAGE_MASK, // 1.) Compressor's extension $this->getCompressorInstance()->getCompressorExtension(), // 2.) Raw package content, encoded with BASE64 base64_encode($content), // 3.) Tags implode(self::PACKAGE_TAGS_SEPERATOR, $helperInstance->getPackageTags()), // 4.) Checksum $this->getHashFromContent($content, $helperInstance, $nodeInstance) ); // Now prepare the temporary array and push it on the 'undeclared' stack $this->getStackerInstance()->pushNamed(self::STACKER_NAME_UNDECLARED, array( self::PACKAGE_DATA_SENDER => $nodeInstance->getSessionId(), self::PACKAGE_DATA_RECIPIENT => $helperInstance->getRecipientType(), self::PACKAGE_DATA_CONTENT => $content, )); } /** * Checks wether a package has been enqueued for delivery. * * @return $isEnqueued Wether a package is enqueued */ public function isPackageEnqueued () { // Check wether the stacker is not empty $isEnqueued = (($this->getStackerInstance()->isStackInitialized(self::STACKER_NAME_UNDECLARED)) && (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_UNDECLARED))); // Return the result return $isEnqueued; } /** * Checks wether a package has been declared * * @return $isDeclared Wether a package is declared */ public function isPackageDeclared () { // Check wether the stacker is not empty $isDeclared = (($this->getStackerInstance()->isStackInitialized(self::STACKER_NAME_DECLARED)) && (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECLARED))); // Return the result return $isDeclared; } /** * Checks wether a package should be sent out * * @return $isWaitingDelivery Wether a package is waiting for delivery */ public function isPackageWaitingForDelivery () { // Check wether the stacker is not empty $isWaitingDelivery = (($this->getStackerInstance()->isStackInitialized(self::STACKER_NAME_OUTGOING)) && (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_OUTGOING))); // Return the result return $isWaitingDelivery; } /** * Delivers an enqueued package to the stated destination. If a non-session * id is provided, recipient resolver is being asked (and instanced once). * This allows that a single package is being delivered to multiple targets * without enqueueing it for every target. If no target is provided or it * can't be determined a NoTargetException is being thrown. * * @return void * @throws NoTargetException If no target can't be determined */ public function declareEnqueuedPackage () { // Make sure this method isn't working if there is no package enqueued if (!$this->isPackageEnqueued()) { // This is not fatal but should be avoided // @TODO Add some logging here return; } // END - if // Now we know for sure there are packages to deliver, we can start // with the first one. $packageData = $this->getStackerInstance()->getNamed(self::STACKER_NAME_UNDECLARED); // Declare the raw package data for delivery $this->declareRawPackageData($packageData); // And remove it finally $this->getStackerInstance()->popNamed(self::STACKER_NAME_UNDECLARED); } /** * Delivers the next declared package. Only one package per time will be sent * because this may take time and slows down the whole delivery * infrastructure. * * @return void */ public function deliverDeclaredPackage () { // Sanity check if we have packages declared if (!$this->isPackageDeclared()) { // This is not fatal but should be avoided // @TODO Add some logging here return; } // END - if // Get the package again $packageData = $this->getStackerInstance()->getNamed(self::STACKER_NAME_DECLARED); try { // And try to send it $this->deliverRawPackageData($packageData); // And remove it finally $this->getStackerInstance()->popNamed(self::STACKER_NAME_DECLARED); } catch (InvalidStateException $e) { // The state is not excepected (shall be 'connected') $this->debugOutput('PACKAGE: Caught exception ' . $e->__toString() . ' with message=' . $e->getMessage()); } } /** * Sends waiting packages out for delivery * * @return void */ public function sendWaitingPackage () { // Send any waiting bytes in the back-buffer before sending a new package $this->sendBackBufferBytes(); // Sanity check if we have packages waiting for delivery if (!$this->isPackageWaitingForDelivery()) { // This is not fatal but should be avoided $this->debugOutput('PACKAGE: No package is waiting for delivery, but ' . __METHOD__ . ' was called.'); return; } // END - if // Get the package again $packageData = $this->getStackerInstance()->getNamed(self::STACKER_NAME_OUTGOING); try { // Now try to send it $this->sendOutgoingRawPackageData($packageData); // And remove it finally $this->getStackerInstance()->popNamed(self::STACKER_NAME_OUTGOING); } catch (InvalidSocketException $e) { // Output exception message $this->debugOutput('PACKAGE: Package was not delivered: ' . $e->getMessage()); } } /////////////////////////////////////////////////////////////////////////// // Receiving packages / raw data /////////////////////////////////////////////////////////////////////////// /** * Checks wether decoded raw data is pending * * @return $isPending Wether decoded raw data is pending */ private function isDecodedDataPending () { // Just return wether the stack is not empty $isPending = (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECODED_INCOMING)); // Return the status return $isPending; } /** * Checks wether new raw package data has arrived at a socket * * @param $poolInstance An instance of a PoolableListener class * @return $hasArrived Wether new raw package data has arrived for processing */ public function isNewRawDataPending (PoolableListener $poolInstance) { // Visit the pool. This monitors the pool for incoming raw data. $poolInstance->accept($this->getVisitorInstance()); // Check for new data arrival $hasArrived = $this->isDecodedDataPending(); // Return the status return $hasArrived; } /** * Handles the incoming decoded raw data. This method does not "convert" the * decoded data back into a package array, it just "handles" it and pushs it * on the next stack. * * @return void */ public function handleIncomingDecodedData () { /* * This method should only be called if decoded raw data is pending, * so check it again. */ if (!$this->isDecodedDataPending()) { // This is not fatal but should be avoided // @TODO Add some logging here return; } // END - if // Very noisy debug message: /* NOISY-DEBUG: */ $this->debugOutput('PACKAGE: Stacker size is ' . $this->getStackerInstance()->getStackCount(self::STACKER_NAME_DECODED_INCOMING) . ' entries.'); // "Pop" the next entry (the same array again) from the stack $decodedData = $this->getStackerInstance()->popNamed(self::STACKER_NAME_DECODED_INCOMING); // Make sure both array elements are there assert((is_array($decodedData)) && (isset($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA])) && (isset($decodedData[BaseRawDataHandler::PACKAGE_ERROR_CODE]))); /* * Also make sure the error code is SOCKET_ERROR_UNHANDLED because we * only want to handle unhandled packages here. */ assert($decodedData[BaseRawDataHandler::PACKAGE_ERROR_CODE] == BaseRawDataHandler::SOCKET_ERROR_UNHANDLED); // Remove the last chunk seperator (because it is being added and we don't need it) if (substr($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA], -1, 1) == PackageFragmenter::CHUNK_SEPERATOR) { // It is there and should be removed $decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA] = substr($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA], 0, -1); } // END - if // This package is "handled" and can be pushed on the next stack $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECODED_HANDLED, $decodedData); } /** * Adds raw decoded data from the given handler instance to this receiver * * @param $handlerInstance An instance of a Networkable class * @return void */ public function addDecodedDataToIncomingStack (Networkable $handlerInstance) { /* * Get the decoded data from the handler, this is an array with * 'decoded_data' and 'error_code' as elements. */ $decodedData = $handlerInstance->getNextDecodedData(); // Very noisy debug message: //* NOISY-DEBUG: */ $this->debugOutput('PACKAGE: decodedData[' . gettype($decodedData) . ']=' . print_r($decodedData, true)); // And push it on our stack $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECODED_INCOMING, $decodedData); } /** * Checks wether incoming decoded data is handled. * * @return $isHandled Wether incoming decoded data is handled */ public function isIncomingDecodedDataHandled () { // Determine if the stack is not empty $isHandled = (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECODED_HANDLED)); // Return it return $isHandled; } /** * Assembles incoming decoded data so it will become an abstract network * package again. * * @return void */ public function assembleDecodedDataToPackage () { $this->partialStub('Please implement this method.'); } /** * Checks wether a new package has arrived * * @return $hasArrived Wether a new package has arrived for processing */ public function isNewPackageArrived () { // @TODO Add some content here } } // [EOF] ?>