* @version 0.0.0 * @copyright Copyright (c) 2007, 2008 Roland Haeder, 2009, 2010 Hub Developer Team * @license GNU GPL 3.0 or any newer version * @link http://www.ship-simu.org * @todo Needs to add functionality for handling the object's type * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Registerable { /** * Package mask for compressing package data: * 1.) Compressor extension * 2.) Raw package data * 3.) Tags, seperated by semicolons, no semicolon is required if only one tag is needed * 4.) Checksum */ const PACKAGE_MASK = '%s:%s:%s:%s'; /** * Seperator for the above mask */ const PACKAGE_MASK_SEPERATOR = ':'; /** * Array indexes for above mask, start with zero */ const INDEX_COMPRESSOR_EXTENSION = 0; const INDEX_PACKAGE_DATA = 1; const INDEX_TAGS = 2; const INDEX_CHECKSUM = 3; /** * Array indexes for raw package array */ const INDEX_PACKAGE_SENDER = 0; const INDEX_PACKAGE_RECIPIENT = 1; const INDEX_PACKAGE_CONTENT = 2; /** * Tags seperator */ const PACKAGE_TAGS_SEPERATOR = ';'; /** * Raw package data seperator */ const PACKAGE_DATA_SEPERATOR = '|'; /** * Stacker name for "undeclared" packages */ const STACKER_NAME_UNDECLARED = 'undeclared'; /** * Stacker name for "declared" packages (which are ready to send out) */ const STACKER_NAME_DECLARED = 'declared'; /** * Stacker name for "out-going" packages */ const STACKER_NAME_OUTGOING = 'outgoing'; /** * Stacker name for "back-buffered" packages */ const STACKER_NAME_BACK_BUFFER = 'backbuffer'; /** * Network target (alias): 'upper hubs' */ const NETWORK_TARGET_UPPER_HUBS = 'upper'; /** * Network target (alias): 'self' */ const NETWORK_TARGET_SELF = 'self'; /** * Protected constructor * * @return void */ protected function __construct () { // Call parent constructor parent::__construct(__CLASS__); // We need to initialize a stack here for our packages even those // which have no recipient address and stamp... ;-) $stackerInstance = ObjectFactory::createObjectByConfiguredName('package_stacker_class'); // At last, set it in this class $this->setStackerInstance($stackerInstance); } /** * Creates an instance of this class * * @param $compressorInstance A Compressor instance for compressing the content * @return $packageInstance An instance of a Deliverable class */ public static final function createNetworkPackage (Compressor $compressorInstance) { // Get new instance $packageInstance = new NetworkPackage(); // Now set the compressor instance $packageInstance->setCompressorInstance($compressorInstance); // Return the prepared instance return $packageInstance; } /** * "Getter" for hash from given content and helper instance * * @param $content Raw package content * @param $helperInstance A BaseHubHelper instance * @return $hash Hash for given package content */ private function getHashFromContent ($content, BaseHubHelper $helperInstance) { // Create the hash // @TODO crc32 is not good, but it needs to be fast $hash = crc32( $content . ':' . $helperInstance->getNodeInstance()->getSessionId() . ':' . $this->getCompressorInstance()->getCompressorExtension() ); // And return it return $hash; } /** * Delivers the given raw package data. * * @param $packageData Raw package data in an array * @return void */ private function deliverPackage (array $packageData) { /* * We need to disover every recipient, just in case we have a * multi-recipient entry like 'upper' is. 'all' may be a not so good * target because it causes an overload on the network and may be * abused for attacking the network with large packages. */ $discoveryInstance = PackageDiscoveryFactory::createPackageDiscoveryInstance(); // Discover all recipients, this may throw an exception $discoveryInstance->discoverRecipients($packageData); // Now get an iterator $iteratorInstance = $discoveryInstance->getIterator(); // ... and begin iteration while ($iteratorInstance->valid()) { // Get current entry $currentRecipient = $iteratorInstance->current(); // Debug message $this->debugOutput('PACKAGE: Package declared for recipient ' . $currentRecipient); // Set the recipient $packageData['recipient'] = $currentRecipient; // And enqueue it to the writer class $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECLARED, $packageData); // Skip to next entry $iteratorInstance->next(); } // END - while // Clean-up the list $discoveryInstance->clearRecipients(); } /** * Sends a raw package out * * @param $packageData Raw package data in an array * @return void */ private function sendRawPackage (array $packageData) { /* * This package may become big, depending on the shared object size or * delivered message size which shouldn't be so long (to save * bandwidth). Because of the nature of the used protocol (TCP) we need * to split it up into smaller pieces to fit it into a TCP frame. * * So first we need (again) a discovery class but now a protocol * discovery to choose the right socket resource. The discovery class * should take a look at the raw package data itself and then decide * which (configurable!) protocol should be used for that type of * package. */ $discoveryInstance = SocketDiscoveryFactory::createSocketDiscoveryInstance(); // Now discover the right protocol $socketResource = $discoveryInstance->discoverSocket($packageData); // We have to put this socket in our registry, so get an instance $registryInstance = SocketRegistry::createSocketRegistry(); // Get the listener from registry $connectionInstance = Registry::getRegistry()->getInstance('connection'); // Is it not there? if (!$registryInstance->isSocketRegistered($connectionInstance, $socketResource)) { // Then register it $registryInstance->registerSocket($connectionInstance, $socketResource, $packageData); } // END - if // We enqueue it again, but now in the out-going queue $this->getStackerInstance()->pushNamed(self::STACKER_NAME_OUTGOING, $packageData); } /** * Sends waiting packages * * @param $packageData Raw package data * @return void */ private function sendOutgoingPackage (array $packageData) { // Get the right connection instance $connectionInstance = SocketRegistry::createSocketRegistry()->getHandlerInstanceFromPackageData($packageData); // Is this connection still alive? if ($connectionInstance->isShuttedDown()) { // This connection is shutting down // @TODO We may want to do somthing more here? return; } // END - if // Sent it away (we catch exceptions one method above $sentBytes = $connectionInstance->sendRawPackageData($packageData); // Remember unsent raw bytes in back-buffer, if any $this->storeUnsentBytesInBackBuffer($packageData, $sentBytes); } /** * "Enqueues" raw content into this delivery class by reading the raw content * from given template instance and pushing it on the 'undeclared' stack. * * @param $helperInstance A BaseHubHelper instance * @return void */ public function enqueueRawDataFromTemplate (BaseHubHelper $helperInstance) { // Get the raw content ... $content = $helperInstance->getTemplateInstance()->getRawTemplateData(); // ... and compress it $content = $this->getCompressorInstance()->compressStream($content); // Add magic in front of it and hash behind it, including BASE64 encoding $content = sprintf(self::PACKAGE_MASK, // 1.) Compressor's extension $this->getCompressorInstance()->getCompressorExtension(), // 2.) Raw package content, encoded with BASE64 base64_encode($content), // 3.) Tags implode(self::PACKAGE_TAGS_SEPERATOR, $helperInstance->getPackageTags()), // 4.) Checksum $this->getHashFromContent($content, $helperInstance) ); // Now prepare the temporary array and push it on the 'undeclared' stack $this->getStackerInstance()->pushNamed(self::STACKER_NAME_UNDECLARED, array( 'sender' => $helperInstance->getNodeInstance()->getSessionId(), 'recipient' => $helperInstance->getRecipientType(), 'content' => $content, )); } /** * Checks wether a package has been enqueued for delivery. * * @return $isEnqueued Wether a package is enqueued */ public function isPackageEnqueued () { // Check wether the stacker is not empty $isEnqueued = (($this->getStackerInstance()->isStackInitialized(self::STACKER_NAME_UNDECLARED)) && (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_UNDECLARED))); // Return the result return $isEnqueued; } /** * Checks wether a package has been declared * * @return $isDeclared Wether a package is declared */ public function isPackageDeclared () { // Check wether the stacker is not empty $isDeclared = (($this->getStackerInstance()->isStackInitialized(self::STACKER_NAME_DECLARED)) && (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECLARED))); // Return the result return $isDeclared; } /** * Checks wether a package should be sent out * * @return $isWaitingDelivery Wether a package is waiting for delivery */ public function isPackageWaitingDelivery () { // Check wether the stacker is not empty $isWaitingDelivery = (($this->getStackerInstance()->isStackInitialized(self::STACKER_NAME_OUTGOING)) && (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_OUTGOING))); // Return the result return $isWaitingDelivery; } /** * Delivers an enqueued package to the stated destination. If a non-session * id is provided, recipient resolver is being asked (and instanced once). * This allows that a single package is being delivered to multiple targets * without enqueueing it for every target. If no target is provided or it * can't be determined a NoTargetException is being thrown. * * @return void * @throws NoTargetException If no target can't be determined */ public function declareEnqueuedPackage () { // Make sure this method isn't working if there is no package enqueued if (!$this->isPackageEnqueued()) { // This is not fatal but should be avoided // @TODO Add some logging here return; } // END - if // Now we know for sure there are packages to deliver, we can start // with the first one. $packageData = $this->getStackerInstance()->getNamed(self::STACKER_NAME_UNDECLARED); // Finally, deliver the package $this->deliverPackage($packageData); // And remove it finally $this->getStackerInstance()->popNamed(self::STACKER_NAME_UNDECLARED); } /** * Delivers the next declared package. Only one package per time will be sent * because this may take time and slows down the whole delivery * infrastructure. * * @return void */ public function deliverDeclaredPackage () { // Sanity check if we have packages declared if (!$this->isPackageDeclared()) { // This is not fatal but should be avoided // @TODO Add some logging here return; } // END - if // Get the package again $packageData = $this->getStackerInstance()->getNamed(self::STACKER_NAME_DECLARED); // And send it $this->sendRawPackage($packageData); // And remove it finally $this->getStackerInstance()->popNamed(self::STACKER_NAME_DECLARED); } /** * Sends waiting packages out for delivery * * @return void */ public function sendWaitingPackage () { // Sent any waiting bytes in the back-buffer $this->sendBackBufferBytes(); // Sanity check if we have packages waiting for delivery if (!$this->isPackageWaitingDelivery()) { // This is not fatal but should be avoided // @TODO Add some logging here return; } // END - if // Get the package again $packageData = $this->getStackerInstance()->getNamed(self::STACKER_NAME_OUTGOING); try { // Now try to send it $this->sendOutgoingPackage($packageData); die("O!\n"); // And remove it finally $this->getStackerInstance()->popNamed(self::STACKER_NAME_OUTGOING); } catch (InvalidSocketException $e) { // Output exception message $this->debugOutput('PACKAGE: Package was not delivered: ' . $e->getMessage()); } } } // [EOF] ?>