$cfg->setConfigEntry('stacker_self_connect_max_size', 10);
// CFG: STACKER-UNDECLARED-MAX-SIZE
-$cfg->setConfigEntry('stacker_undeclared_max_size', 10000);
+$cfg->setConfigEntry('stacker_package_undeclared_max_size', 10000);
-// CFG: STACKER-DECLARED-MAX-SIZE
-$cfg->setConfigEntry('stacker_declared_max_size', 1000);
+// CFG: STACKER-PACKAGE-DECLARED-MAX-SIZE
+$cfg->setConfigEntry('stacker_package_declared_max_size', 1000);
-// CFG: STACKER-OUTGOING-MAX-SIZE
-$cfg->setConfigEntry('stacker_outgoing_max_size', 100);
+// CFG: STACKER-PACKAGE-OUTGOING-MAX-SIZE
+$cfg->setConfigEntry('stacker_package_outgoing_max_size', 100);
+
+// CFG: STACKER-PACKAGE-BACKBUFFER-MAX-SIZE
+$cfg->setConfigEntry('stacker_package_backbuffer_max_size', 1000);
// CFG: STACKER-IN-QUEUE-MAX-SIZE
$cfg->setConfigEntry('stacker_in_queue_max_size', 10000);
// CFG: STACKER-INCOMING-QUEUE-MAX-SIZE
$cfg->setConfigEntry('stacker_incoming_queue_max_size', 100000);
-// CFG: STACKER-RAW-DATA-MAX-SIZE
-$cfg->setConfigEntry('stacker_raw_data_max_size', 100);
+// CFG: STACKER-DECODED-DATA-MAX-SIZE
+$cfg->setConfigEntry('stacker_decoded_data_max_size', 100);
+
+// CFG: STACKER-PACKAGE-DECODED-DATA-MAX-SIZE
+$cfg->setConfigEntry('stacker_package_decoded_data_max_size', 200);
+
+// CFG: STACKER-PACKAGE-HANDLED-DECODED-MAX-SIZE
+$cfg->setConfigEntry('stacker_package_handled_decoded_max_size', 200);
// CFG: NEWS-MAIN-LIMIT
$cfg->setConfigEntry('news_main_limit', 5);
* @throws InvalidResourceException If the given resource is invalid
*/
function processRawDataFromResource ($resource);
+
+ /**
+ * Checks wether decoded data is pending for further processing.
+ *
+ * @return $isPending Wether decoded data is pending
+ */
+ function isDecodedDataPending ();
+
+ /**
+ * "Getter" for next decoded data from the stacker
+ *
+ * @return $decodedData Decoded data from the stacker
+ */
+ function getNextDecodedData ();
}
// [EOF]
*/
function isNewRawDataPending (PoolableListener $poolInstance);
+ /**
+ * Handles the incoming decoded raw data. This method does not "convert" the
+ * decoded data back into a package array, it just "handles" it and pushs it
+ * on the next stack.
+ *
+ * @return void
+ */
+ function handleIncomingDecodedData ();
+
+ /**
+ * Checks wether incoming decoded data is handled.
+ *
+ * @return $isHandled Wether incoming decoded data is handled
+ */
+ function isIncomingDecodedDataHandled ();
+
+ /**
+ * Assembles incoming decoded data so it will become an abstract network
+ * package again.
+ *
+ * @return void
+ */
+ function assembleDecodedDataToPackage ();
+
/**
* Checks wether a new package has arrived
*
* @return $hasArrived Wether a new package has arrived for processing
*/
function isNewPackageArrived ();
+
+ /**
+ * Adds raw decoded data from the given handler instance to this receiver
+ *
+ * @param $handlerInstance An instance of a Networkable class
+ * @return void
+ */
+ function addDecodedDataToIncomingStack (Networkable $handlerInstance);
}
// [EOF]
$discoveryInstance = Registry::getRegistry()->getInstance('package_discovery');
} else {
// Create a new instance
- $discoveryInstance = ObjectFactory::createObjectByConfiguredName('package_recipient_discovery_class');
+ $discoveryInstance = self::createObjectByConfiguredName('package_recipient_discovery_class');
// Set the instance in registry for further use
Registry::getRegistry()->addInstance('package_discovery', $discoveryInstance);
$discoveryInstance = Registry::getRegistry()->getInstance('socket_discovery');
} else {
// Create a new instance
- $discoveryInstance = ObjectFactory::createObjectByConfiguredName('socket_discovery_class');
+ $discoveryInstance = self::createObjectByConfiguredName('socket_discovery_class');
// Set the instance in registry for further use
Registry::getRegistry()->addInstance('socket_discovery', $discoveryInstance);
* be generated and stored in registry, else the fragmenter from the
* registry will be returned.
*
- * @param $configEntry A configuration entry naming the real class' name
+ * @param $fragmenterType The fragmenter's type
* @return $fragmenterInstance A fragmenter instance
*/
- public static final function createFragmenterInstance ($configEntry) {
+ public static final function createFragmenterInstance ($fragmenterType) {
// If there is no fragmenter?
- if (Registry::getRegistry()->instanceExists('fragmenter')) {
+ if (Registry::getRegistry()->instanceExists($fragmenterType . '_fragmenter')) {
// Get fragmenter from registry
- $fragmenterInstance = Registry::getRegistry()->getInstance('fragmenter');
+ $fragmenterInstance = Registry::getRegistry()->getInstance($fragmenterType . '_fragmenter');
} else {
// Get the fragmenter instance
- $fragmenterInstance = ObjectFactory::createObjectByConfiguredName($configEntry);
+ $fragmenterInstance = self::createObjectByConfiguredName($fragmenterType . '_fragmenter_class');
// Add it to the registry
- Registry::getRegistry()->addInstance('fragmenter', $fragmenterInstance);
+ Registry::getRegistry()->addInstance($fragmenterType . '_fragmenter', $fragmenterInstance);
}
// Return the instance
$listInstance = Registry::getRegistry()->getInstance('recipient_list');
} else {
// Create a new instance
- $listInstance = ObjectFactory::createObjectByConfiguredName('recipient_list_class');
+ $listInstance = self::createObjectByConfiguredName('recipient_list_class');
// Set the instance in registry for further use
Registry::getRegistry()->addInstance('recipient_list', $listInstance);
* keep it open here so you can experiment with the settings and don't
* need to touch any code.
*/
- $compressorInstance = ObjectFactory::createObjectByConfiguredName('raw_package_compressor_class');
+ $compressorInstance = self::createObjectByConfiguredName('raw_package_compressor_class');
// Prepare the decorator compressor (for later flawless and easy updates)
- $compressorInstance = ObjectFactory::createObjectByConfiguredName('deco_package_compressor_class', array($compressorInstance));
+ $compressorInstance = self::createObjectByConfiguredName('deco_package_compressor_class', array($compressorInstance));
// Now prepare the network package for delivery so only need to do this
// once just before the "big announcement loop".
- $packageInstance = ObjectFactory::createObjectByConfiguredName('network_package_class', array($compressorInstance));
+ $packageInstance = self::createObjectByConfiguredName('network_package_class', array($compressorInstance));
// Set the instance in registry for further use
Registry::getRegistry()->addInstance('network_package', $packageInstance);
$producerInstance = Registry::getRegistry()->getInstance($producerType . '_producer');
} else {
// Get the producer instance
- $producerInstance = ObjectFactory::createObjectByConfiguredName($configEntry);
+ $producerInstance = self::createObjectByConfiguredName($configEntry);
// Add it to the registry
Registry::getRegistry()->addInstance($producerType . '_producer', $producerInstance);
$tableInstance->registerPeerByPackageData($packageData, $socketResource);
// Then get it
- $stateInstance = ObjectFactory::createObjectByConfiguredName($configEntry);
+ $stateInstance = self::createObjectByConfiguredName($configEntry);
// And register it with the lookup table
$tableInstance->registerPeerState($stateInstance, $packageData);
// Is the instance null?
if (is_null(self::$tableInstance)) {
// Get a new one
- self::$tableInstance = ObjectFactory::createObjectByConfiguredName('node_state_lookup_table_class');
+ self::$tableInstance = self::createObjectByConfiguredName('node_state_lookup_table_class');
} // END - if
// Return it
$packageInstance = Registry::getRegistry()->getInstance('package_tags');
} else {
// Now prepare the tags instance
- $packageInstance = ObjectFactory::createObjectByConfiguredName('package_tags_class');
+ $packageInstance = self::createObjectByConfiguredName('package_tags_class');
// Set the instance in registry for further use
Registry::getRegistry()->addInstance('package_tags', $packageInstance);
} catch (ClassNotFoundException $e) {
// This exception means, the cruncher mode is invalid.
// @TODO Can we rewrite this to app_die() ?
- die('Cruncher mode ' . $cruncherMode . ' is invalid.' . "\n");
+ die(__METHOD__ . ': cruncher mode ' . $cruncherMode . ' is invalid.' . "\n");
}
// Set the cruncher instance in registry
} catch (ClassNotFoundException $e) {
// This exception means, the node mode is invalid.
// @TODO Can we rewrite this to app_die() ?
- die('Node mode ' . $nodeMode . ' is invalid.' . "\n");
+ die(__METHOD__ . ': node mode ' . $nodeMode . ' is invalid.' . "\n");
}
// Set the node instance in registry
const PACKAGE_DECODED_DATA = 'decoded_data';
const PACKAGE_ERROR_CODE = 'error_code';
+ /**
+ * Stacker for decoded data
+ */
+ const STACKER_NAME_DECODED_DATA = 'decoded_data';
+
/**
* Error code from socket
*/
* @return void
*/
protected function initStacker () {
- $this->getStackerInstance()->initStacker('raw_data');
+ $this->getStackerInstance()->initStacker(self::STACKER_NAME_DECODED_DATA);
}
/**
* Add the deocoded data and error code to the stacker so other classes
* (e.g. NetworkPackage) can "pop" it from the stacker.
*/
- $this->getStackerInstance()->pushNamed('raw_data', array(
+ $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECODED_DATA, array(
self::PACKAGE_DECODED_DATA => $decodedData,
self::PACKAGE_ERROR_CODE => $this->getErrorCode()
));
}
+ /**
+ * Checks wether decoded data is pending for further processing.
+ *
+ * @return $isPending Wether decoded data is pending
+ */
+ public function isDecodedDataPending () {
+ // Does the stacker have some entries (not empty)?
+ $isPending = (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECODED_DATA));
+
+ // Return it
+ return $isPending;
+ }
+
+ /**
+ * "Getter" for next decoded data from the stacker
+ *
+ * @return $decodedData Decoded data from the stacker
+ */
+ public function getNextDecodedData () {
+ // "Pop" the decoded data from the stacker
+ $decodedData = $this->getStackerInstance()->popNamed(self::STACKER_NAME_DECODED_DATA);
+
+ // And return it
+ return $decodedData;
+ }
+
/**
* Checks wether the 'recipient' field matches our own address:port
* combination.
*/
private function getRawDataFromPackageArray (array $packageData) {
// Get the fragmenter instance
- $fragmenterInstance = FragmenterFactory::createFragmenterInstance('package_fragmenter_class');
+ $fragmenterInstance = FragmenterFactory::createFragmenterInstance('package');
// Implode the package data array and fragement the resulting string, returns the final hash
$finalHash = $fragmenterInstance->fragmentPackageArray($packageData, $this);
/**
* Getter for peer pool instance
*
- * @return $poolInstance The peer pool instance we shall set
+ * @return $poolInstance A peer pool instance
*/
public final function getPoolInstance () {
return $this->getListenerInstance()->getPoolInstance();
* @return void
*/
public function monitorIncomingRawData (Receivable $receiverInstance) {
+ // Get the handler instance
+ $handlerInstance = $this->getListenerInstance()->getHandlerInstance();
+
/*
* Does our deocorated listener (or even a decorator again) have a
* handler assigned? Remember that a handler will hold all incoming raw
* data and not a listener.
*/
- if (!$this->getListenerInstance()->getHandlerInstance() instanceof Networkable) {
+ if (!$handlerInstance instanceof Networkable) {
// Skip this silently for now. Later on, this will become mandatory!
//* NOISY-DEBUG: */ $this->debugOutput('No handler assigned to this listener decorator. this=' . $this->__toString() . ', listenerInstance=' . $this->getListenerInstance()->__toString());
return;
} // END - if
+
+ // Does the handler have some decoded data pending?
+ if (!$handlerInstance->isDecodedDataPending()) {
+ // No data is pending so skip further code silently
+ return;
+ } // END - if
+
+ /*
+ * We have some pending decoded data. The receiver instance is an
+ * abstract network package (which can be received and sent out) so
+ * handle the decoded data over. At this moment we don't need to know
+ * if the decoded data origins from a TCP or UDP connection so we can
+ * just pass it over to the network package receiver
+ */
+ $receiverInstance->addDecodedDataToIncomingStack($handlerInstance);
}
}
/**
* Raw package data seperator
*/
- const PACKAGE_DATA_SEPERATOR = '|';
+ const PACKAGE_DATA_SEPERATOR = '#';
/**
* Stacker name for "undeclared" packages
*/
- const STACKER_NAME_UNDECLARED = 'undeclared';
+ const STACKER_NAME_UNDECLARED = 'package_undeclared';
/**
* Stacker name for "declared" packages (which are ready to send out)
*/
- const STACKER_NAME_DECLARED = 'declared';
+ const STACKER_NAME_DECLARED = 'package_declared';
/**
* Stacker name for "out-going" packages
*/
- const STACKER_NAME_OUTGOING = 'outgoing';
+ const STACKER_NAME_OUTGOING = 'package_outgoing';
+
+ /**
+ * Stacker name for "incoming" decoded raw data
+ */
+ const STACKER_NAME_DECODED_INCOMING = 'package_decoded_data';
+
+ /**
+ * Stacker name for handled decoded raw data
+ */
+ const STACKER_NAME_DECODED_HANDLED = 'package_handled_decoded';
/**
* Stacker name for "back-buffered" packages
*/
- const STACKER_NAME_BACK_BUFFER = 'backbuffer';
+ const STACKER_NAME_BACK_BUFFER = 'package_backbuffer';
/**
* Network target (alias): 'upper hubs'
// Now set the compressor instance
$packageInstance->setCompressorInstance($compressorInstance);
- // We need to initialize a stack here for our packages even those
- // which have no recipient address and stamp... ;-)
+ /*
+ * We need to initialize a stack here for our packages even for those
+ * which have no recipient address and stamp... ;-) This stacker will
+ * also be used for incoming raw data to handle it.
+ */
$stackerInstance = ObjectFactory::createObjectByConfiguredName('network_package_stacker_class');
// At last, set it in this class
$packageInstance->setStackerInstance($stackerInstance);
+ // Init all stacker
+ $packageInstance->initStackers();
+
// Get a visitor instance for speeding up things
$visitorInstance = ObjectFactory::createObjectByConfiguredName('node_raw_data_monitor_visitor_class', array($packageInstance));
return $packageInstance;
}
+ /**
+ * Initialize all stackers
+ *
+ * @return void
+ */
+ protected function initStackers () {
+ // Initialize all
+ foreach (
+ array(
+ self::STACKER_NAME_UNDECLARED,
+ self::STACKER_NAME_DECLARED,
+ self::STACKER_NAME_OUTGOING,
+ self::STACKER_NAME_DECODED_INCOMING,
+ self::STACKER_NAME_DECODED_HANDLED,
+ self::STACKER_NAME_BACK_BUFFER
+ ) as $stackerName) {
+ // Init this stacker
+ $this->getStackerInstance()->initStacker($stackerName);
+ } // END - foreach
+ }
+
/**
* "Getter" for hash from given content and helper instance
*
// Receiving packages / raw data
///////////////////////////////////////////////////////////////////////////
+ /**
+ * Checks wether decoded raw data is pending
+ *
+ * @return $isPending Wether decoded raw data is pending
+ */
+ private function isDecodedDataPending () {
+ // Just return wether the stack is not empty
+ $isPending = (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECODED_INCOMING));
+
+ // Return the status
+ return $isPending;
+ }
+
/**
* Checks wether new raw package data has arrived at a socket
*
* @return $hasArrived Wether new raw package data has arrived for processing
*/
public function isNewRawDataPending (PoolableListener $poolInstance) {
- // By default no new data has arrived
- $hasArrived = false;
-
- // Visit the pool
+ // Visit the pool. This monitors the pool for incoming raw data.
$poolInstance->accept($this->getVisitorInstance());
- // @TODO Check for if new data has arrived
+
+ // Check for new data arrival
+ $hasArrived = $this->isDecodedDataPending();
// Return the status
return $hasArrived;
}
+ /**
+ * Handles the incoming decoded raw data. This method does not "convert" the
+ * decoded data back into a package array, it just "handles" it and pushs it
+ * on the next stack.
+ *
+ * @return void
+ */
+ public function handleIncomingDecodedData () {
+ /*
+ * This method should only be called if decoded raw data is pending,
+ * so check it again.
+ */
+ if (!$this->isDecodedDataPending()) {
+ // This is not fatal but should be avoided
+ // @TODO Add some logging here
+ return;
+ } // END - if
+
+ // Very noisy debug message:
+ /* NOISY-DEBUG: */ $this->debugOutput('PACKAGE: Stacker size is ' . $this->getStackerInstance()->getStackCount(self::STACKER_NAME_DECODED_INCOMING) . ' entries.');
+
+ // "Pop" the next entry (the same array again) from the stack
+ $decodedData = $this->getStackerInstance()->popNamed(self::STACKER_NAME_DECODED_INCOMING);
+
+ // Make sure both array elements are there
+ assert((isset($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA])) && (isset($decodedData[BaseRawDataHandler::PACKAGE_ERROR_CODE])));
+
+ /*
+ * Also make sure the error code is SOCKET_ERROR_UNHANDLED because we
+ * only want to handle unhandled packages here.
+ */
+ assert($decodedData[BaseRawDataHandler::PACKAGE_ERROR_CODE] == BaseRawDataHandler::SOCKET_ERROR_UNHANDLED);
+
+ // Remove the last chunk seperator (because it is being added and we don't need it)
+ if (substr($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA], -1, 1) == PackageFragmenter::CHUNK_SEPERATOR) {
+ // It is there and should be removed
+ $decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA] = substr($decodedData[BaseRawDataHandler::PACKAGE_DECODED_DATA], 0, -1);
+ } // END - if
+
+ // This package is "handled" and can be pushed on the next stack
+ $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECODED_HANDLED, $decodedData);
+ }
+
+ /**
+ * Adds raw decoded data from the given handler instance to this receiver
+ *
+ * @param $handlerInstance An instance of a Networkable class
+ * @return void
+ */
+ public function addDecodedDataToIncomingStack (Networkable $handlerInstance) {
+ /*
+ * Get the decoded data from the handler, this is an array with
+ * 'decoded_data' and 'error_code' as elements.
+ */
+ $decodedData = $handlerInstance->getNextDecodedData();
+
+ // Very noisy debug message:
+ //* NOISY-DEBUG: */ $this->debugOutput('PACKAGE: decodedData[' . gettype($decodedData) . ']=' . print_r($decodedData, true));
+
+ // And push it on our stack
+ $this->getStackerInstance()->pushNamed(self::STACKER_NAME_DECODED_INCOMING, $decodedData);
+ }
+
+ /**
+ * Checks wether incoming decoded data is handled.
+ *
+ * @return $isHandled Wether incoming decoded data is handled
+ */
+ public function isIncomingDecodedDataHandled () {
+ // Determine if the stack is not empty
+ $isHandled = (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_DECODED_HANDLED));
+
+ // Return it
+ return $isHandled;
+ }
+
+ /**
+ * Assembles incoming decoded data so it will become an abstract network
+ * package again.
+ *
+ * @return void
+ */
+ public function assembleDecodedDataToPackage () {
+ $this->partialStub('Please implement this method.');
+ }
+
/**
* Checks wether a new package has arrived
*
// Do we have something to handle?
if ($packageInstance->isNewRawDataPending($this->getListenerPoolInstance())) {
// We have to handle raw data from the socket
- $packageInstance->handleIncomingSocketRawData();
+ $packageInstance->handleIncomingDecodedData();
+ } elseif ($packageInstance->isIncomingDecodedDataHandled()) {
+ /*
+ * We have handled decoded data so we should validate it, if we have
+ * all chunks/fragments together, and assemble it into an abstract
+ * network package.
+ */
+ $packageInstance->assembleDecodedDataToPackage();
} elseif ($packageInstance->isNewPackageArrived()) {
// Okay, then handle newly arrived package
$packageInstance->handleNewlyArrivedPackage();
./application/hub/main/filter/shutdown/node/class_NodeShutdownTaskHandlerFilter.php:55: * @todo 0% done
./application/hub/main/filter/task/cruncher/class_CruncherTaskHandlerInitializerFilter.php:55: * @todo 5% done
./application/hub/main/filter/task/node/class_NodeTaskHandlerInitializerFilter.php:55: * @todo Maybe some more tasks needs to be added?
-./application/hub/main/handler/network/class_BaseRawDataHandler.php:111: * @todo This method will be moved to a better place
-./application/hub/main/handler/network/class_BaseRawDataHandler.php:118: // @TODO Numeric or alpha-numeric index?
+./application/hub/main/handler/network/class_BaseRawDataHandler.php:142: * @todo This method will be moved to a better place
+./application/hub/main/handler/network/class_BaseRawDataHandler.php:149: // @TODO Numeric or alpha-numeric index?
./application/hub/main/handler/network/udp/class_UdpRawDataHandler.php:58: * @todo 0%
./application/hub/main/handler/tasks/class_TaskHandler.php:140: // @TODO Messurement can be added around this call
./application/hub/main/helper/connection/tcp/class_TcpConnectionHelper.php:10: * @todo Find an interface for hub helper
./application/hub/main/nodes/regular/class_HubRegularNode.php:58: * @todo Implement this method
./application/hub/main/nodes/regular/class_HubRegularNode.php:68: * @todo Unfinished method
./application/hub/main/nodes/regular/class_HubRegularNode.php:91: // @TODO Add some filters here
-./application/hub/main/package/class_NetworkPackage.php:173: * @todo $helperInstance is unused
-./application/hub/main/package/class_NetworkPackage.php:177: // @TODO crc32 is not very strong, but it needs to be fast
+./application/hub/main/package/class_NetworkPackage.php:210: * @todo $helperInstance is unused
+./application/hub/main/package/class_NetworkPackage.php:214: // @TODO crc32 is not very strong, but it needs to be fast
./application/hub/main/package/class_NetworkPackage.php:23: * @todo Needs to add functionality for handling the object's type
-./application/hub/main/package/class_NetworkPackage.php:293: // @TODO We may want to do somthing more here?
-./application/hub/main/package/class_NetworkPackage.php:392: // @TODO Add some logging here
-./application/hub/main/package/class_NetworkPackage.php:418: // @TODO Add some logging here
-./application/hub/main/package/class_NetworkPackage.php:479: // @TODO Check for if new data has arrived
-./application/hub/main/package/class_NetworkPackage.php:491: // @TODO Add some content here
+./application/hub/main/package/class_NetworkPackage.php:330: // @TODO We may want to do somthing more here?
+./application/hub/main/package/class_NetworkPackage.php:429: // @TODO Add some logging here
+./application/hub/main/package/class_NetworkPackage.php:455: // @TODO Add some logging here
+./application/hub/main/package/class_NetworkPackage.php:548: // @TODO Add some logging here
+./application/hub/main/package/class_NetworkPackage.php:626: // @TODO Add some content here
./application/hub/main/package/fragmenter/class_PackageFragmenter.php:426: * @todo $connectionInstance is unused
./application/hub/main/pools/peer/class_DefaultPeerPool.php:148: // @TODO Check for IP
./application/hub/main/producer/cruncher/keys/class_CruncherKeyProducer.php:106: // @TODO Do something with it