From 205a409d2e4ed84dda8e033431c5c61be06e9858 Mon Sep 17 00:00:00 2001 From: =?utf8?q?Roland=20H=C3=A4der?= Date: Thu, 28 Apr 2011 16:02:32 +0000 Subject: [PATCH] Handling of decoded raw data continued: - Decoded raw data is now being handled over to the NetworkPackage class - Therefore the Receivable interface has been extended with some more methods - Further assembling of the handled decoded raw data is currently unfinished - TODOs.txt updated --- application/hub/config.php | 23 ++- .../handler/network/class_Networkable.php | 14 ++ .../interfaces/package/class_Receivable.php | 32 ++++ .../class_PackageDiscoveryFactory.php | 2 +- .../class_SocketDiscoveryFactory.php | 2 +- .../fragmenter/class_FragmenterFactory.php | 12 +- .../lists/class_RecipientListFactory.php | 2 +- .../package/class_NetworkPackageFactory.php | 6 +- .../producer/class_ProducerFactory.php | 2 +- .../states/peer/class_PeerStateFactory.php | 4 +- .../tags/class_PackageTagsFactory.php | 2 +- .../class_CruncherInitializationFilter.php | 2 +- .../node/class_NodeInitializationFilter.php | 2 +- .../network/class_BaseRawDataHandler.php | 35 +++- .../connection/class_BaseConnectionHelper.php | 2 +- .../listener/class_BaseListenerDecorator.php | 22 ++- .../hub/main/package/class_NetworkPackage.php | 159 ++++++++++++++++-- .../class_NetworkPackageReaderTask.php | 9 +- docs/TODOs.txt | 18 +- 19 files changed, 298 insertions(+), 52 deletions(-) diff --git a/application/hub/config.php b/application/hub/config.php index 6dca9179f..08db0c4df 100644 --- a/application/hub/config.php +++ b/application/hub/config.php @@ -256,13 +256,16 @@ $cfg->setConfigEntry('stacker_announcement_max_size', 20); $cfg->setConfigEntry('stacker_self_connect_max_size', 10); // CFG: STACKER-UNDECLARED-MAX-SIZE -$cfg->setConfigEntry('stacker_undeclared_max_size', 10000); +$cfg->setConfigEntry('stacker_package_undeclared_max_size', 10000); -// CFG: STACKER-DECLARED-MAX-SIZE -$cfg->setConfigEntry('stacker_declared_max_size', 1000); +// CFG: STACKER-PACKAGE-DECLARED-MAX-SIZE +$cfg->setConfigEntry('stacker_package_declared_max_size', 1000); -// CFG: STACKER-OUTGOING-MAX-SIZE -$cfg->setConfigEntry('stacker_outgoing_max_size', 100); +// CFG: STACKER-PACKAGE-OUTGOING-MAX-SIZE +$cfg->setConfigEntry('stacker_package_outgoing_max_size', 100); + +// CFG: STACKER-PACKAGE-BACKBUFFER-MAX-SIZE +$cfg->setConfigEntry('stacker_package_backbuffer_max_size', 1000); // CFG: STACKER-IN-QUEUE-MAX-SIZE $cfg->setConfigEntry('stacker_in_queue_max_size', 10000); @@ -279,8 +282,14 @@ $cfg->setConfigEntry('stacker_outgoing_queue_max_size', 100000); // CFG: STACKER-INCOMING-QUEUE-MAX-SIZE $cfg->setConfigEntry('stacker_incoming_queue_max_size', 100000); -// CFG: STACKER-RAW-DATA-MAX-SIZE -$cfg->setConfigEntry('stacker_raw_data_max_size', 100); +// CFG: STACKER-DECODED-DATA-MAX-SIZE +$cfg->setConfigEntry('stacker_decoded_data_max_size', 100); + +// CFG: STACKER-PACKAGE-DECODED-DATA-MAX-SIZE +$cfg->setConfigEntry('stacker_package_decoded_data_max_size', 200); + +// CFG: STACKER-PACKAGE-HANDLED-DECODED-MAX-SIZE +$cfg->setConfigEntry('stacker_package_handled_decoded_max_size', 200); // CFG: NEWS-MAIN-LIMIT $cfg->setConfigEntry('news_main_limit', 5); diff --git a/application/hub/interfaces/handler/network/class_Networkable.php b/application/hub/interfaces/handler/network/class_Networkable.php index 0e88879ed..adea6832e 100644 --- a/application/hub/interfaces/handler/network/class_Networkable.php +++ b/application/hub/interfaces/handler/network/class_Networkable.php @@ -31,6 +31,20 @@ interface Networkable extends Handleable { * @throws InvalidResourceException If the given resource is invalid */ function processRawDataFromResource ($resource); + + /** + * Checks wether decoded data is pending for further processing. + * + * @return $isPending Wether decoded data is pending + */ + function isDecodedDataPending (); + + /** + * "Getter" for next decoded data from the stacker + * + * @return $decodedData Decoded data from the stacker + */ + function getNextDecodedData (); } // [EOF] diff --git a/application/hub/interfaces/package/class_Receivable.php b/application/hub/interfaces/package/class_Receivable.php index e8156d2ae..adc15b112 100644 --- a/application/hub/interfaces/package/class_Receivable.php +++ b/application/hub/interfaces/package/class_Receivable.php @@ -30,12 +30,44 @@ interface Receivable extends FrameworkInterface { */ function isNewRawDataPending (PoolableListener $poolInstance); + /** + * Handles the incoming decoded raw data. This method does not "convert" the + * decoded data back into a package array, it just "handles" it and pushs it + * on the next stack. + * + * @return void + */ + function handleIncomingDecodedData (); + + /** + * Checks wether incoming decoded data is handled. + * + * @return $isHandled Wether incoming decoded data is handled + */ + function isIncomingDecodedDataHandled (); + + /** + * Assembles incoming decoded data so it will become an abstract network + * package again. + * + * @return void + */ + function assembleDecodedDataToPackage (); + /** * Checks wether a new package has arrived * * @return $hasArrived Wether a new package has arrived for processing */ function isNewPackageArrived (); + + /** + * Adds raw decoded data from the given handler instance to this receiver + * + * @param $handlerInstance An instance of a Networkable class + * @return void + */ + function addDecodedDataToIncomingStack (Networkable $handlerInstance); } // [EOF] diff --git a/application/hub/main/factories/discovery/class_PackageDiscoveryFactory.php b/application/hub/main/factories/discovery/class_PackageDiscoveryFactory.php index 644fe28a9..db42524ed 100644 --- a/application/hub/main/factories/discovery/class_PackageDiscoveryFactory.php +++ b/application/hub/main/factories/discovery/class_PackageDiscoveryFactory.php @@ -46,7 +46,7 @@ class PackageDiscoveryFactory extends ObjectFactory { $discoveryInstance = Registry::getRegistry()->getInstance('package_discovery'); } else { // Create a new instance - $discoveryInstance = ObjectFactory::createObjectByConfiguredName('package_recipient_discovery_class'); + $discoveryInstance = self::createObjectByConfiguredName('package_recipient_discovery_class'); // Set the instance in registry for further use Registry::getRegistry()->addInstance('package_discovery', $discoveryInstance); diff --git a/application/hub/main/factories/discovery/class_SocketDiscoveryFactory.php b/application/hub/main/factories/discovery/class_SocketDiscoveryFactory.php index 10c2f8cd4..ca9261910 100644 --- a/application/hub/main/factories/discovery/class_SocketDiscoveryFactory.php +++ b/application/hub/main/factories/discovery/class_SocketDiscoveryFactory.php @@ -46,7 +46,7 @@ class SocketDiscoveryFactory extends ObjectFactory { $discoveryInstance = Registry::getRegistry()->getInstance('socket_discovery'); } else { // Create a new instance - $discoveryInstance = ObjectFactory::createObjectByConfiguredName('socket_discovery_class'); + $discoveryInstance = self::createObjectByConfiguredName('socket_discovery_class'); // Set the instance in registry for further use Registry::getRegistry()->addInstance('socket_discovery', $discoveryInstance); diff --git a/application/hub/main/factories/fragmenter/class_FragmenterFactory.php b/application/hub/main/factories/fragmenter/class_FragmenterFactory.php index 7e1a0152f..0436b76f1 100644 --- a/application/hub/main/factories/fragmenter/class_FragmenterFactory.php +++ b/application/hub/main/factories/fragmenter/class_FragmenterFactory.php @@ -37,20 +37,20 @@ class FragmenterFactory extends ObjectFactory { * be generated and stored in registry, else the fragmenter from the * registry will be returned. * - * @param $configEntry A configuration entry naming the real class' name + * @param $fragmenterType The fragmenter's type * @return $fragmenterInstance A fragmenter instance */ - public static final function createFragmenterInstance ($configEntry) { + public static final function createFragmenterInstance ($fragmenterType) { // If there is no fragmenter? - if (Registry::getRegistry()->instanceExists('fragmenter')) { + if (Registry::getRegistry()->instanceExists($fragmenterType . '_fragmenter')) { // Get fragmenter from registry - $fragmenterInstance = Registry::getRegistry()->getInstance('fragmenter'); + $fragmenterInstance = Registry::getRegistry()->getInstance($fragmenterType . '_fragmenter'); } else { // Get the fragmenter instance - $fragmenterInstance = ObjectFactory::createObjectByConfiguredName($configEntry); + $fragmenterInstance = self::createObjectByConfiguredName($fragmenterType . '_fragmenter_class'); // Add it to the registry - Registry::getRegistry()->addInstance('fragmenter', $fragmenterInstance); + Registry::getRegistry()->addInstance($fragmenterType . '_fragmenter', $fragmenterInstance); } // Return the instance diff --git a/application/hub/main/factories/lists/class_RecipientListFactory.php b/application/hub/main/factories/lists/class_RecipientListFactory.php index 873e771f3..8d4c84f63 100644 --- a/application/hub/main/factories/lists/class_RecipientListFactory.php +++ b/application/hub/main/factories/lists/class_RecipientListFactory.php @@ -46,7 +46,7 @@ class RecipientListFactory extends ObjectFactory { $listInstance = Registry::getRegistry()->getInstance('recipient_list'); } else { // Create a new instance - $listInstance = ObjectFactory::createObjectByConfiguredName('recipient_list_class'); + $listInstance = self::createObjectByConfiguredName('recipient_list_class'); // Set the instance in registry for further use Registry::getRegistry()->addInstance('recipient_list', $listInstance); diff --git a/application/hub/main/factories/package/class_NetworkPackageFactory.php b/application/hub/main/factories/package/class_NetworkPackageFactory.php index d4d5b380a..79c8a3cf9 100644 --- a/application/hub/main/factories/package/class_NetworkPackageFactory.php +++ b/application/hub/main/factories/package/class_NetworkPackageFactory.php @@ -50,14 +50,14 @@ class NetworkPackageFactory extends ObjectFactory { * keep it open here so you can experiment with the settings and don't * need to touch any code. */ - $compressorInstance = ObjectFactory::createObjectByConfiguredName('raw_package_compressor_class'); + $compressorInstance = self::createObjectByConfiguredName('raw_package_compressor_class'); // Prepare the decorator compressor (for later flawless and easy updates) - $compressorInstance = ObjectFactory::createObjectByConfiguredName('deco_package_compressor_class', array($compressorInstance)); + $compressorInstance = self::createObjectByConfiguredName('deco_package_compressor_class', array($compressorInstance)); // Now prepare the network package for delivery so only need to do this // once just before the "big announcement loop". - $packageInstance = ObjectFactory::createObjectByConfiguredName('network_package_class', array($compressorInstance)); + $packageInstance = self::createObjectByConfiguredName('network_package_class', array($compressorInstance)); // Set the instance in registry for further use Registry::getRegistry()->addInstance('network_package', $packageInstance); diff --git a/application/hub/main/factories/producer/class_ProducerFactory.php b/application/hub/main/factories/producer/class_ProducerFactory.php index d952b25e0..e376ce52e 100644 --- a/application/hub/main/factories/producer/class_ProducerFactory.php +++ b/application/hub/main/factories/producer/class_ProducerFactory.php @@ -48,7 +48,7 @@ class FragmenterFactory extends ObjectFactory { $producerInstance = Registry::getRegistry()->getInstance($producerType . '_producer'); } else { // Get the producer instance - $producerInstance = ObjectFactory::createObjectByConfiguredName($configEntry); + $producerInstance = self::createObjectByConfiguredName($configEntry); // Add it to the registry Registry::getRegistry()->addInstance($producerType . '_producer', $producerInstance); diff --git a/application/hub/main/factories/states/peer/class_PeerStateFactory.php b/application/hub/main/factories/states/peer/class_PeerStateFactory.php index eff9b8922..5cbcc9a2e 100644 --- a/application/hub/main/factories/states/peer/class_PeerStateFactory.php +++ b/application/hub/main/factories/states/peer/class_PeerStateFactory.php @@ -63,7 +63,7 @@ class PeerStateFactory extends ObjectFactory { $tableInstance->registerPeerByPackageData($packageData, $socketResource); // Then get it - $stateInstance = ObjectFactory::createObjectByConfiguredName($configEntry); + $stateInstance = self::createObjectByConfiguredName($configEntry); // And register it with the lookup table $tableInstance->registerPeerState($stateInstance, $packageData); @@ -86,7 +86,7 @@ class PeerStateFactory extends ObjectFactory { // Is the instance null? if (is_null(self::$tableInstance)) { // Get a new one - self::$tableInstance = ObjectFactory::createObjectByConfiguredName('node_state_lookup_table_class'); + self::$tableInstance = self::createObjectByConfiguredName('node_state_lookup_table_class'); } // END - if // Return it diff --git a/application/hub/main/factories/tags/class_PackageTagsFactory.php b/application/hub/main/factories/tags/class_PackageTagsFactory.php index e39835931..fb8634abf 100644 --- a/application/hub/main/factories/tags/class_PackageTagsFactory.php +++ b/application/hub/main/factories/tags/class_PackageTagsFactory.php @@ -46,7 +46,7 @@ class PackageTagsFactory extends ObjectFactory { $packageInstance = Registry::getRegistry()->getInstance('package_tags'); } else { // Now prepare the tags instance - $packageInstance = ObjectFactory::createObjectByConfiguredName('package_tags_class'); + $packageInstance = self::createObjectByConfiguredName('package_tags_class'); // Set the instance in registry for further use Registry::getRegistry()->addInstance('package_tags', $packageInstance); diff --git a/application/hub/main/filter/cruncher/class_CruncherInitializationFilter.php b/application/hub/main/filter/cruncher/class_CruncherInitializationFilter.php index 57ade35fd..bc9252401 100644 --- a/application/hub/main/filter/cruncher/class_CruncherInitializationFilter.php +++ b/application/hub/main/filter/cruncher/class_CruncherInitializationFilter.php @@ -85,7 +85,7 @@ class CruncherInitializationFilter extends BaseFilter implements Filterable { } catch (ClassNotFoundException $e) { // This exception means, the cruncher mode is invalid. // @TODO Can we rewrite this to app_die() ? - die('Cruncher mode ' . $cruncherMode . ' is invalid.' . "\n"); + die(__METHOD__ . ': cruncher mode ' . $cruncherMode . ' is invalid.' . "\n"); } // Set the cruncher instance in registry diff --git a/application/hub/main/filter/node/class_NodeInitializationFilter.php b/application/hub/main/filter/node/class_NodeInitializationFilter.php index 8b90f65fd..bf0cc5cb2 100644 --- a/application/hub/main/filter/node/class_NodeInitializationFilter.php +++ b/application/hub/main/filter/node/class_NodeInitializationFilter.php @@ -85,7 +85,7 @@ class NodeInitializationFilter extends BaseFilter implements Filterable { } catch (ClassNotFoundException $e) { // This exception means, the node mode is invalid. // @TODO Can we rewrite this to app_die() ? - die('Node mode ' . $nodeMode . ' is invalid.' . "\n"); + die(__METHOD__ . ': node mode ' . $nodeMode . ' is invalid.' . "\n"); } // Set the node instance in registry diff --git a/application/hub/main/handler/network/class_BaseRawDataHandler.php b/application/hub/main/handler/network/class_BaseRawDataHandler.php index 71c15525b..ef5790823 100644 --- a/application/hub/main/handler/network/class_BaseRawDataHandler.php +++ b/application/hub/main/handler/network/class_BaseRawDataHandler.php @@ -42,6 +42,11 @@ class BaseRawDataHandler extends BaseHandler { const PACKAGE_DECODED_DATA = 'decoded_data'; const PACKAGE_ERROR_CODE = 'error_code'; + /** + * Stacker for decoded data + */ + const STACKER_NAME_DECODED_DATA = 'decoded_data'; + /** * Error code from socket */ @@ -82,7 +87,7 @@ class BaseRawDataHandler extends BaseHandler { * @return void */ protected function initStacker () { - $this->getStackerInstance()->initStacker('raw_data'); + $this->getStackerInstance()->initStacker(self::STACKER_NAME_DECODED_DATA); } /** @@ -96,12 +101,38 @@ class BaseRawDataHandler extends BaseHandler { * Add the deocoded data and error code to the stacker so other classes * (e.g. NetworkPackage) can "pop" it from the stacker. */ - $this->getStackerInstance()->pushNamed('raw_data', array( + $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECODED_DATA, array( self::PACKAGE_DECODED_DATA => $decodedData, self::PACKAGE_ERROR_CODE => $this->getErrorCode() )); } + /** + * Checks wether decoded data is pending for further processing. + * + * @return $isPending Wether decoded data is pending + */ + public function isDecodedDataPending () { + // Does the stacker have some entries (not empty)? + $isPending = (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECODED_DATA)); + + // Return it + return $isPending; + } + + /** + * "Getter" for next decoded data from the stacker + * + * @return $decodedData Decoded data from the stacker + */ + public function getNextDecodedData () { + // "Pop" the decoded data from the stacker + $decodedData = $this->getStackerInstance()->popNamed(self::STACKER_NAME_DECODED_DATA); + + // And return it + return $decodedData; + } + /** * Checks wether the 'recipient' field matches our own address:port * combination. diff --git a/application/hub/main/helper/connection/class_BaseConnectionHelper.php b/application/hub/main/helper/connection/class_BaseConnectionHelper.php index c1a21b6ca..f6c4debed 100644 --- a/application/hub/main/helper/connection/class_BaseConnectionHelper.php +++ b/application/hub/main/helper/connection/class_BaseConnectionHelper.php @@ -161,7 +161,7 @@ class BaseConnectionHelper extends BaseHubHelper implements Registerable, Protoc */ private function getRawDataFromPackageArray (array $packageData) { // Get the fragmenter instance - $fragmenterInstance = FragmenterFactory::createFragmenterInstance('package_fragmenter_class'); + $fragmenterInstance = FragmenterFactory::createFragmenterInstance('package'); // Implode the package data array and fragement the resulting string, returns the final hash $finalHash = $fragmenterInstance->fragmentPackageArray($packageData, $this); diff --git a/application/hub/main/listener/class_BaseListenerDecorator.php b/application/hub/main/listener/class_BaseListenerDecorator.php index e71c66f7c..e102c530e 100644 --- a/application/hub/main/listener/class_BaseListenerDecorator.php +++ b/application/hub/main/listener/class_BaseListenerDecorator.php @@ -110,7 +110,7 @@ class BaseListenerDecorator extends BaseDecorator implements Visitable { /** * Getter for peer pool instance * - * @return $poolInstance The peer pool instance we shall set + * @return $poolInstance A peer pool instance */ public final function getPoolInstance () { return $this->getListenerInstance()->getPoolInstance(); @@ -124,16 +124,34 @@ class BaseListenerDecorator extends BaseDecorator implements Visitable { * @return void */ public function monitorIncomingRawData (Receivable $receiverInstance) { + // Get the handler instance + $handlerInstance = $this->getListenerInstance()->getHandlerInstance(); + /* * Does our deocorated listener (or even a decorator again) have a * handler assigned? Remember that a handler will hold all incoming raw * data and not a listener. */ - if (!$this->getListenerInstance()->getHandlerInstance() instanceof Networkable) { + if (!$handlerInstance instanceof Networkable) { // Skip this silently for now. Later on, this will become mandatory! //* NOISY-DEBUG: */ $this->debugOutput('No handler assigned to this listener decorator. this=' . $this->__toString() . ', listenerInstance=' . $this->getListenerInstance()->__toString()); return; } // END - if + + // Does the handler have some decoded data pending? + if (!$handlerInstance->isDecodedDataPending()) { + // No data is pending so skip further code silently + return; + } // END - if + + /* + * We have some pending decoded data. The receiver instance is an + * abstract network package (which can be received and sent out) so + * handle the decoded data over. At this moment we don't need to know + * if the decoded data origins from a TCP or UDP connection so we can + * just pass it over to the network package receiver + */ + $receiverInstance->addDecodedDataToIncomingStack($handlerInstance); } } diff --git a/application/hub/main/package/class_NetworkPackage.php b/application/hub/main/package/class_NetworkPackage.php index d54d99b04..e0d6b5c18 100644 --- a/application/hub/main/package/class_NetworkPackage.php +++ b/application/hub/main/package/class_NetworkPackage.php @@ -86,27 +86,37 @@ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Receiva /** * Raw package data seperator */ - const PACKAGE_DATA_SEPERATOR = '|'; + const PACKAGE_DATA_SEPERATOR = '#'; /** * Stacker name for "undeclared" packages */ - const STACKER_NAME_UNDECLARED = 'undeclared'; + const STACKER_NAME_UNDECLARED = 'package_undeclared'; /** * Stacker name for "declared" packages (which are ready to send out) */ - const STACKER_NAME_DECLARED = 'declared'; + const STACKER_NAME_DECLARED = 'package_declared'; /** * Stacker name for "out-going" packages */ - const STACKER_NAME_OUTGOING = 'outgoing'; + const STACKER_NAME_OUTGOING = 'package_outgoing'; + + /** + * Stacker name for "incoming" decoded raw data + */ + const STACKER_NAME_DECODED_INCOMING = 'package_decoded_data'; + + /** + * Stacker name for handled decoded raw data + */ + const STACKER_NAME_DECODED_HANDLED = 'package_handled_decoded'; /** * Stacker name for "back-buffered" packages */ - const STACKER_NAME_BACK_BUFFER = 'backbuffer'; + const STACKER_NAME_BACK_BUFFER = 'package_backbuffer'; /** * Network target (alias): 'upper hubs' @@ -146,13 +156,19 @@ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Receiva // Now set the compressor instance $packageInstance->setCompressorInstance($compressorInstance); - // We need to initialize a stack here for our packages even those - // which have no recipient address and stamp... ;-) + /* + * We need to initialize a stack here for our packages even for those + * which have no recipient address and stamp... ;-) This stacker will + * also be used for incoming raw data to handle it. + */ $stackerInstance = ObjectFactory::createObjectByConfiguredName('network_package_stacker_class'); // At last, set it in this class $packageInstance->setStackerInstance($stackerInstance); + // Init all stacker + $packageInstance->initStackers(); + // Get a visitor instance for speeding up things $visitorInstance = ObjectFactory::createObjectByConfiguredName('node_raw_data_monitor_visitor_class', array($packageInstance)); @@ -163,6 +179,27 @@ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Receiva return $packageInstance; } + /** + * Initialize all stackers + * + * @return void + */ + protected function initStackers () { + // Initialize all + foreach ( + array( + self::STACKER_NAME_UNDECLARED, + self::STACKER_NAME_DECLARED, + self::STACKER_NAME_OUTGOING, + self::STACKER_NAME_DECODED_INCOMING, + self::STACKER_NAME_DECODED_HANDLED, + self::STACKER_NAME_BACK_BUFFER + ) as $stackerName) { + // Init this stacker + $this->getStackerInstance()->initStacker($stackerName); + } // END - foreach + } + /** * "Getter" for hash from given content and helper instance * @@ -464,6 +501,19 @@ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Receiva // Receiving packages / raw data /////////////////////////////////////////////////////////////////////////// + /** + * Checks wether decoded raw data is pending + * + * @return $isPending Wether decoded raw data is pending + */ + private function isDecodedDataPending () { + // Just return wether the stack is not empty + $isPending = (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECODED_INCOMING)); + + // Return the status + return $isPending; + } + /** * Checks wether new raw package data has arrived at a socket * @@ -471,17 +521,102 @@ class NetworkPackage extends BaseFrameworkSystem implements Deliverable, Receiva * @return $hasArrived Wether new raw package data has arrived for processing */ public function isNewRawDataPending (PoolableListener $poolInstance) { - // By default no new data has arrived - $hasArrived = false; - - // Visit the pool + // Visit the pool. This monitors the pool for incoming raw data. $poolInstance->accept($this->getVisitorInstance()); - // @TODO Check for if new data has arrived + + // Check for new data arrival + $hasArrived = $this->isDecodedDataPending(); // Return the status return $hasArrived; } + /** + * Handles the incoming decoded raw data. This method does not "convert" the + * decoded data back into a package array, it just "handles" it and pushs it + * on the next stack. + * + * @return void + */ + public function handleIncomingDecodedData () { + /* + * This method should only be called if decoded raw data is pending, + * so check it again. + */ + if (!$this->isDecodedDataPending()) { + // This is not fatal but should be avoided + // @TODO Add some logging here + return; + } // END - if + + // Very noisy debug message: + /* NOISY-DEBUG: */ $this->debugOutput('PACKAGE: Stacker size is ' . $this->getStackerInstance()->getStackCount(self::STACKER_NAME_DECODED_INCOMING) . ' entries.'); + + // "Pop" the next entry (the same array again) from the stack + $decodedData = $this->getStackerInstance()->popNamed(self::STACKER_NAME_DECODED_INCOMING); + + // Make sure both array elements are there + assert((isset($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA])) && (isset($decodedData[BaseRawDataHandler::PACKAGE_ERROR_CODE]))); + + /* + * Also make sure the error code is SOCKET_ERROR_UNHANDLED because we + * only want to handle unhandled packages here. + */ + assert($decodedData[BaseRawDataHandler::PACKAGE_ERROR_CODE] == BaseRawDataHandler::SOCKET_ERROR_UNHANDLED); + + // Remove the last chunk seperator (because it is being added and we don't need it) + if (substr($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA], -1, 1) == PackageFragmenter::CHUNK_SEPERATOR) { + // It is there and should be removed + $decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA] = substr($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA], 0, -1); + } // END - if + + // This package is "handled" and can be pushed on the next stack + $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECODED_HANDLED, $decodedData); + } + + /** + * Adds raw decoded data from the given handler instance to this receiver + * + * @param $handlerInstance An instance of a Networkable class + * @return void + */ + public function addDecodedDataToIncomingStack (Networkable $handlerInstance) { + /* + * Get the decoded data from the handler, this is an array with + * 'decoded_data' and 'error_code' as elements. + */ + $decodedData = $handlerInstance->getNextDecodedData(); + + // Very noisy debug message: + //* NOISY-DEBUG: */ $this->debugOutput('PACKAGE: decodedData[' . gettype($decodedData) . ']=' . print_r($decodedData, true)); + + // And push it on our stack + $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECODED_INCOMING, $decodedData); + } + + /** + * Checks wether incoming decoded data is handled. + * + * @return $isHandled Wether incoming decoded data is handled + */ + public function isIncomingDecodedDataHandled () { + // Determine if the stack is not empty + $isHandled = (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECODED_HANDLED)); + + // Return it + return $isHandled; + } + + /** + * Assembles incoming decoded data so it will become an abstract network + * package again. + * + * @return void + */ + public function assembleDecodedDataToPackage () { + $this->partialStub('Please implement this method.'); + } + /** * Checks wether a new package has arrived * diff --git a/application/hub/main/tasks/network/class_NetworkPackageReaderTask.php b/application/hub/main/tasks/network/class_NetworkPackageReaderTask.php index 18b730ff0..fc7d85168 100644 --- a/application/hub/main/tasks/network/class_NetworkPackageReaderTask.php +++ b/application/hub/main/tasks/network/class_NetworkPackageReaderTask.php @@ -72,7 +72,14 @@ class NetworkPackageReaderTask extends BaseTask implements Taskable, Visitable { // Do we have something to handle? if ($packageInstance->isNewRawDataPending($this->getListenerPoolInstance())) { // We have to handle raw data from the socket - $packageInstance->handleIncomingSocketRawData(); + $packageInstance->handleIncomingDecodedData(); + } elseif ($packageInstance->isIncomingDecodedDataHandled()) { + /* + * We have handled decoded data so we should validate it, if we have + * all chunks/fragments together, and assemble it into an abstract + * network package. + */ + $packageInstance->assembleDecodedDataToPackage(); } elseif ($packageInstance->isNewPackageArrived()) { // Okay, then handle newly arrived package $packageInstance->handleNewlyArrivedPackage(); diff --git a/docs/TODOs.txt b/docs/TODOs.txt index fade89043..0c067a866 100644 --- a/docs/TODOs.txt +++ b/docs/TODOs.txt @@ -31,8 +31,8 @@ ./application/hub/main/filter/shutdown/node/class_NodeShutdownTaskHandlerFilter.php:55: * @todo 0% done ./application/hub/main/filter/task/cruncher/class_CruncherTaskHandlerInitializerFilter.php:55: * @todo 5% done ./application/hub/main/filter/task/node/class_NodeTaskHandlerInitializerFilter.php:55: * @todo Maybe some more tasks needs to be added? -./application/hub/main/handler/network/class_BaseRawDataHandler.php:111: * @todo This method will be moved to a better place -./application/hub/main/handler/network/class_BaseRawDataHandler.php:118: // @TODO Numeric or alpha-numeric index? +./application/hub/main/handler/network/class_BaseRawDataHandler.php:142: * @todo This method will be moved to a better place +./application/hub/main/handler/network/class_BaseRawDataHandler.php:149: // @TODO Numeric or alpha-numeric index? ./application/hub/main/handler/network/udp/class_UdpRawDataHandler.php:58: * @todo 0% ./application/hub/main/handler/tasks/class_TaskHandler.php:140: // @TODO Messurement can be added around this call ./application/hub/main/helper/connection/tcp/class_TcpConnectionHelper.php:10: * @todo Find an interface for hub helper @@ -73,14 +73,14 @@ ./application/hub/main/nodes/regular/class_HubRegularNode.php:58: * @todo Implement this method ./application/hub/main/nodes/regular/class_HubRegularNode.php:68: * @todo Unfinished method ./application/hub/main/nodes/regular/class_HubRegularNode.php:91: // @TODO Add some filters here -./application/hub/main/package/class_NetworkPackage.php:173: * @todo $helperInstance is unused -./application/hub/main/package/class_NetworkPackage.php:177: // @TODO crc32 is not very strong, but it needs to be fast +./application/hub/main/package/class_NetworkPackage.php:210: * @todo $helperInstance is unused +./application/hub/main/package/class_NetworkPackage.php:214: // @TODO crc32 is not very strong, but it needs to be fast ./application/hub/main/package/class_NetworkPackage.php:23: * @todo Needs to add functionality for handling the object's type -./application/hub/main/package/class_NetworkPackage.php:293: // @TODO We may want to do somthing more here? -./application/hub/main/package/class_NetworkPackage.php:392: // @TODO Add some logging here -./application/hub/main/package/class_NetworkPackage.php:418: // @TODO Add some logging here -./application/hub/main/package/class_NetworkPackage.php:479: // @TODO Check for if new data has arrived -./application/hub/main/package/class_NetworkPackage.php:491: // @TODO Add some content here +./application/hub/main/package/class_NetworkPackage.php:330: // @TODO We may want to do somthing more here? +./application/hub/main/package/class_NetworkPackage.php:429: // @TODO Add some logging here +./application/hub/main/package/class_NetworkPackage.php:455: // @TODO Add some logging here +./application/hub/main/package/class_NetworkPackage.php:548: // @TODO Add some logging here +./application/hub/main/package/class_NetworkPackage.php:626: // @TODO Add some content here ./application/hub/main/package/fragmenter/class_PackageFragmenter.php:426: * @todo $connectionInstance is unused ./application/hub/main/pools/peer/class_DefaultPeerPool.php:148: // @TODO Check for IP ./application/hub/main/producer/cruncher/keys/class_CruncherKeyProducer.php:106: // @TODO Do something with it -- 2.39.5