From: Roland Häder Date: Fri, 27 Apr 2012 19:38:26 +0000 (+0000) Subject: Again contunued on 'hub' project: X-Git-Url: https://git.mxchange.org/?a=commitdiff_plain;h=3979e1f4e74de760fcde7e972e1d0d0900c4b7b4;p=hub.git Again contunued on 'hub' project: - Receiving (but not processing) of local received packages (not forwareded packages) is now basicly finished - TODOs.txt updated --- diff --git a/application/hub/config.php b/application/hub/config.php index d1394fb6c..a67e18f89 100644 --- a/application/hub/config.php +++ b/application/hub/config.php @@ -303,6 +303,9 @@ $cfg->setConfigEntry('stacker_decoded_data_max_size', 100); // CFG: STACKER-FINAL-CHUNKS-MAX-SIZE $cfg->setConfigEntry('stacker_final_chunks_max_size', 100); +// CFG: STACKER-PENDING-CHUNKS-MAX-SIZE +$cfg->setConfigEntry('stacker_pending_chunks_max_size', 500); + // CFG: STACKER-CHUNK-RAW-DATA-MAX-SIZE $cfg->setConfigEntry('stacker_chunk_raw_data_max_size', 100); diff --git a/application/hub/interfaces/handler/chunks/class_HandleableChunks.php b/application/hub/interfaces/handler/chunks/class_HandleableChunks.php index 4c3d66ee8..ffb59539c 100644 --- a/application/hub/interfaces/handler/chunks/class_HandleableChunks.php +++ b/application/hub/interfaces/handler/chunks/class_HandleableChunks.php @@ -31,6 +31,14 @@ interface HandleableChunks extends Handleable { */ function addAllChunksWithFinal (array $chunks); + /** + * Adds all chunks and wait for more (e.g. incomplete transmission) + * + * @param $chunks An array with chunks, the last one should be a 'final' + * @return void + */ + function addAllChunksWait (array $chunks); + /** * Checks whether unhandled chunks are available * diff --git a/application/hub/interfaces/package/assembler/class_Assembler.php b/application/hub/interfaces/package/assembler/class_Assembler.php index 5b18035fc..22ea21821 100644 --- a/application/hub/interfaces/package/assembler/class_Assembler.php +++ b/application/hub/interfaces/package/assembler/class_Assembler.php @@ -30,6 +30,21 @@ interface Assembler extends FrameworkInterface { * @return void */ function chunkPackageContent (array $packageContent); + + /** + * Checks whether the assembler's pending data is empty which means it has + * no pending data left for handling ... ;-) + * + * @return $ifPendingDataIsEmpty Whether pending data is empty + */ + function isPendingDataEmpty (); + + /** + * Handles the assembler's pending data + * + * @return void + */ + function handlePendingData (); } // [EOF] diff --git a/application/hub/interfaces/package/class_Receivable.php b/application/hub/interfaces/package/class_Receivable.php index 869a710bf..fe1be45b8 100644 --- a/application/hub/interfaces/package/class_Receivable.php +++ b/application/hub/interfaces/package/class_Receivable.php @@ -77,6 +77,21 @@ interface Receivable extends FrameworkInterface { * @return $decodedData The real package data that the sender has sent */ function decodeRawContent ($rawPackageContent); + + /** + * Checks whether the assembler has pending data left + * + * @return $isHandled Whether the assembler has pending data left + */ + function ifAssemblerHasPendingDataLeft (); + + /** + * Handles the attached assemler's pending data queue to be finally + * assembled to the raw package data back. + * + * @return void + */ + function handleAssemblerPendingData (); } // [EOF] diff --git a/application/hub/main/class_BaseHubSystem.php b/application/hub/main/class_BaseHubSystem.php index 18a448d7a..b6431b158 100644 --- a/application/hub/main/class_BaseHubSystem.php +++ b/application/hub/main/class_BaseHubSystem.php @@ -76,6 +76,11 @@ class BaseHubSystem extends BaseFrameworkSystem { */ private $decoderInstance = NULL; + /** + * Assembler instance + */ + private $assemblerInstance = NULL; + /** * Protected constructor * @@ -258,6 +263,25 @@ class BaseHubSystem extends BaseFrameworkSystem { return $this->decoderInstance; } + /** + * Setter for assembler instance + * + * @param $assemblerInstance A Decodeable instance + * @return void + */ + protected final function setAssemblerInstance (Assembler $assemblerInstance) { + $this->assemblerInstance = $assemblerInstance; + } + + /** + * Getter for assembler instance + * + * @return $assemblerInstance A Decodeable instance + */ + protected final function getAssemblerInstance () { + return $this->assemblerInstance; + } + /** * Constructs a callable method name from given socket error code. If the * method is not found, a generic one is used. diff --git a/application/hub/main/factories/package/assembler/class_PackageAssemblerFactory.php b/application/hub/main/factories/package/assembler/class_PackageAssemblerFactory.php index 62fc0e523..f76fc01e3 100644 --- a/application/hub/main/factories/package/assembler/class_PackageAssemblerFactory.php +++ b/application/hub/main/factories/package/assembler/class_PackageAssemblerFactory.php @@ -37,16 +37,17 @@ class PackageAssemblerFactory extends ObjectFactory { * be generated and stored in registry, else the assembler from the * registry will be returned. * - * @return $assemblerInstance An assembler instance + * @param $packageInstance An instance of a Receivable instance + * @return $assemblerInstance An instance of a Assembler instance */ - public static final function createAssemblerInstance () { + public static final function createAssemblerInstance (Receivable $packageInstance) { // If there is no assembler? if (Registry::getRegistry()->instanceExists('package_assembler')) { // Get assembler from registry $assemblerInstance = Registry::getRegistry()->getInstance('package_assembler'); } else { // Get the assembler instance - $assemblerInstance = self::createObjectByConfiguredName('package_assembler_class'); + $assemblerInstance = self::createObjectByConfiguredName('package_assembler_class', array($packageInstance)); // Add it to the registry Registry::getRegistry()->addInstance('package_assembler', $assemblerInstance); diff --git a/application/hub/main/handler/chunks/class_ChunkHandler.php b/application/hub/main/handler/chunks/class_ChunkHandler.php index 04db917df..ae17e9681 100644 --- a/application/hub/main/handler/chunks/class_ChunkHandler.php +++ b/application/hub/main/handler/chunks/class_ChunkHandler.php @@ -26,6 +26,7 @@ class ChunkHandler extends BaseHandler implements HandleableChunks, Registerable * Stacker for chunks with final EOP */ const STACKER_NAME_CHUNKS_WITH_FINAL_EOP = 'final_chunks'; + const STACKER_NAME_CHUNKS_WITHOUT_FINAL = 'pending_chunks'; const STACKER_NAME_ASSEMBLED_RAW_DATA = 'chunk_raw_data'; /** @@ -91,6 +92,7 @@ class ChunkHandler extends BaseHandler implements HandleableChunks, Registerable // Init all stacker $stackerInstance->initStacker(self::STACKER_NAME_CHUNKS_WITH_FINAL_EOP); + $stackerInstance->initStacker(self::STACKER_NAME_CHUNKS_WITHOUT_FINAL); $stackerInstance->initStacker(self::STACKER_NAME_ASSEMBLED_RAW_DATA); // Set the stacker in this handler @@ -192,6 +194,9 @@ class ChunkHandler extends BaseHandler implements HandleableChunks, Registerable throw new ChunkAlreadyAssembledException(array($this, $chunkSplits), self::EXCEPTION_CHUNK_ALREADY_ASSEMBLED); } // END - if + // Debug message + //* NOISY-DEBUG: */ $this->debugOutput('CHUNK-HANDLER: serialNumber=' . $chunkSplits[self::CHUNK_SPLITS_INDEX_SERIAL] . ',hash=' . $chunkSplits[self::CHUNK_SPLITS_INDEX_HASH]); + // Add the chunk data (index 2) to the final array and use the serial number as index $this->finalPackageChunks['content'][$chunkSplits[self::CHUNK_SPLITS_INDEX_SERIAL]] = $chunkSplits[self::CHUNK_SPLITS_INDEX_RAW_DATA]; @@ -241,6 +246,7 @@ class ChunkHandler extends BaseHandler implements HandleableChunks, Registerable private function preparePackageAssmble () { // Make sure both arrays have same count (this however should always be true) assert(count($this->finalPackageChunks['hashes']) == count($this->finalPackageChunks['content'])); + //* DIE: */ die('finalPackageChunks='.print_r($this->finalPackageChunks['content'],true)); /* * Remove last element (hash chunk) from 'hashes'. This hash will never @@ -306,6 +312,10 @@ class ChunkHandler extends BaseHandler implements HandleableChunks, Registerable // That went well, so start assembling all chunks foreach ($this->finalPackageChunks['content'] as $serialNumber=>$content) { + // Debug message + //* NOISY-DEBUG: */ $this->debugOutput('CHUNK-HANDLER: serialNumber=' . $serialNumber . ' - validating ...'); + //* NOISY-DEBUG: */ $this->debugOutput('finalPackageChunks=' . print_r($this->finalPackageChunks,true) . 'chunkHashes=' . print_r($this->chunkHashes,true)); + // Is this chunk valid? This should be the case assert($this->isChunkHashValid(array( self::CHUNK_SPLITS_INDEX_HASH => $this->finalPackageChunks['hashes'][$serialNumber], @@ -354,6 +364,12 @@ class ChunkHandler extends BaseHandler implements HandleableChunks, Registerable throw new FinalChunkVerificationException(array($this, $chunks), BaseListener::EXCEPTION_FINAL_CHUNK_VERIFICATION); } // END - if + // Do we have some pending chunks (no final)? + while (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_CHUNKS_WITHOUT_FINAL)) { + // Then get it first and add it before the EOP chunks + array_unshift($chunks, $this->getStackerInstance()->popNamed(self::STACKER_NAME_CHUNKS_WITHOUT_FINAL)); + } // END - while + // Add all chunks to the FIFO stacker foreach ($chunks as $chunk) { // Add the chunk @@ -361,6 +377,20 @@ class ChunkHandler extends BaseHandler implements HandleableChunks, Registerable } // END - foreach } + /** + * Adds all chunks and wait for more (e.g. incomplete transmission) + * + * @param $chunks An array with chunks, the last one should be a 'final' + * @return void + */ + public function addAllChunksWait (array $chunks) { + // Add all chunks to the FIFO stacker + foreach ($chunks as $chunk) { + // Add the chunk + $this->getStackerInstance()->pushNamed(self::STACKER_NAME_CHUNKS_WITHOUT_FINAL, $chunk); + } // END - foreach + } + /** * Checks whether unhandled chunks are available * diff --git a/application/hub/main/package/assembler/class_PackageAssembler.php b/application/hub/main/package/assembler/class_PackageAssembler.php index e518008ae..b53cc50ff 100644 --- a/application/hub/main/package/assembler/class_PackageAssembler.php +++ b/application/hub/main/package/assembler/class_PackageAssembler.php @@ -23,6 +23,11 @@ * along with this program. If not, see . */ class PackageAssembler extends BaseHubSystem implements Assembler, Registerable { + /** + * Pending data + */ + private $pendingData = ''; + /** * Protected constructor * @@ -36,16 +41,36 @@ class PackageAssembler extends BaseHubSystem implements Assembler, Registerable /** * Creates an instance of this class * + * @param $packageInstance An instance of a Receivable class * @return $assemblerInstance An instance of an Assembler class */ - public static final function createPackageAssembler () { + public static final function createPackageAssembler (Receivable $packageInstance) { // Get new instance $assemblerInstance = new PackageAssembler(); + // Set package instance here + $assemblerInstance->setPackageInstance($packageInstance); + // Return the prepared instance return $assemblerInstance; } + /** + * Checks whether the input buffer (stacker to be more preceise) is empty. + * + * @return $isInputBufferEmpty Whether the input buffer is empty + */ + private function ifInputBufferIsEmpty () { + // Check it + $isInputBufferEmpty = $this->getPackageInstance()->getStackerInstance()->isStackEmpty(NetworkPackage::STACKER_NAME_DECODED_HANDLED); + + // Debug message + //* NOISY-DEBUG: */ $this->debugOutput('PACKAGE-ASSEMBLER: isInputBufferEmpty=' . intval($isInputBufferEmpty)); + + // Return it + return $isInputBufferEmpty; + } + /** * Assembles the content from $packageContent. This method does only * initialize the whole process by creating a call-back which will then @@ -80,6 +105,10 @@ class PackageAssembler extends BaseHubSystem implements Assembler, Registerable call_user_func(array($this, $methodName), $packageContent); } + /************************************************************************** + * Call-back methods for above method * + **************************************************************************/ + /** * Call-back handler to handle unhandled packages. This method "explodes" * the string with the chunk separator from PackageFragmenter class, does @@ -91,25 +120,69 @@ class PackageAssembler extends BaseHubSystem implements Assembler, Registerable * @throws FinalChunkVerificationException If the final chunk does not start with 'EOP:' */ private function handlePackageByUnhandledPackage (array $packageContent) { - /* - * "explode" the string from 'decoded_data' with chunk separator to - * get an array of chunks. These chunks must then be verified by - * their checksums. Also the final chunk must be handled. - */ - $chunks = explode(PackageFragmenter::CHUNK_SEPARATOR, $packageContent[BaseRawDataHandler::PACKAGE_DECODED_DATA]); - //* NOISY-DEBUG */ $this->debugOutput('chunks='.print_r($chunks,true)); - - // Validate final chunk - if (!$this->isValidFinalChunk($chunks)) { - // Last chunk is not valid - throw new FinalChunkVerificationException(array($this, $chunks), BaseListener::EXCEPTION_FINAL_CHUNK_VERIFICATION); - } // END - if + // Check for some conditions + if (!$this->ifInputBufferIsEmpty()) { + // Last chunk is not valid, so wait for more + $this->pendingData .= $packageContent[BaseRawDataHandler::PACKAGE_DECODED_DATA]; + + // Debug message + /* NOISY-DEBUG: */ $this->debugOutput('PACKAGE-ASSEMBLER: Partial data received. Waiting for more ... ( ' . strlen($packageContent[BaseRawDataHandler::PACKAGE_DECODED_DATA]) . ' bytes)'); + } else { + // Debug message + //* NOISY-DEBUG */ $this->debugOutput('packageContent=' . print_r($packageContent,true) . ',chunks='.print_r($chunks,true)); + + /* + * "explode" the string from 'decoded_data' with chunk separator to + * get an array of chunks. These chunks must then be verified by + * their checksums. Also the final chunk must be handled. + */ + $chunks = explode(PackageFragmenter::CHUNK_SEPARATOR, $packageContent[BaseRawDataHandler::PACKAGE_DECODED_DATA]); + + // Now get a chunk handler instance + $handlerInstance = ChunkHandlerFactory::createChunkHandlerInstance(); + + // Add all chunks because the last final chunk is found + $handlerInstance->addAllChunksWithFinal($chunks); + } + } + + /** + * Checks whether the assembler's pending data is empty which means it has + * no pending data left for handling ... ;-) + * + * @return $ifPendingDataIsEmpty Whether pending data is empty + */ + public function isPendingDataEmpty () { + // A simbple check + $ifPendingDataIsEmpty = empty($this->pendingData); + + // Return it + return $ifPendingDataIsEmpty; + } + + /** + * Handles the assembler's pending data + * + * @return void + */ + public function handlePendingData () { + // Assert on condition + assert(!$this->isPendingDataEmpty()); + + // Init fake array + $packageContent = array( + BaseRawDataHandler::PACKAGE_DECODED_DATA => $this->pendingData, + BaseRawDataHandler::PACKAGE_ERROR_CODE => BaseRawDataHandler::SOCKET_ERROR_UNHANDLED + ); + + // Clear pending data + $this->pendingData = ''; - // Now get a chunk handler instance - $handlerInstance = ChunkHandlerFactory::createChunkHandlerInstance(); + // Debug message + /* NOISY-DEBUG: */ $this->debugOutput('PACKAGE-ASSEMBLER: Last block of partial data received. A total of ' . strlen($packageContent[BaseRawDataHandler::PACKAGE_DECODED_DATA]) . ' bytes has been received.'); - // Add all chunks because the last final chunk is found - $handlerInstance->addAllChunksWithFinal($chunks); + // Call the real handler method + $this->handlePackageByUnhandledPackage($packageContent); } } diff --git a/application/hub/main/package/class_NetworkPackage.php b/application/hub/main/package/class_NetworkPackage.php index 06869fcd1..8c649d371 100644 --- a/application/hub/main/package/class_NetworkPackage.php +++ b/application/hub/main/package/class_NetworkPackage.php @@ -207,6 +207,10 @@ class NetworkPackage extends BaseHubSystem implements Deliverable, Receivable, R $cryptoInstance = ObjectFactory::createObjectByConfiguredName('crypto_class'); $packageInstance->setCryptoInstance($cryptoInstance); + // Get a singleton package assembler instance from factory and set it here + $assemblerInstance = PackageAssemblerFactory::createAssemblerInstance($packageInstance); + $packageInstance->setAssemblerInstance($assemblerInstance); + // Return the prepared instance return $packageInstance; } @@ -751,6 +755,30 @@ class NetworkPackage extends BaseHubSystem implements Deliverable, Receivable, R return $isHandled; } + /** + * Checks whether the assembler has pending data left + * + * @return $isHandled Whether the assembler has pending data left + */ + public function ifAssemblerHasPendingDataLeft () { + // Determine if the stack is not empty + $isHandled = (!$this->getAssemblerInstance()->isPendingDataEmpty()); + + // Return it + return $isHandled; + } + + /** + * Handles the attached assemler's pending data queue to be finally + * assembled to the raw package data back. + * + * @return void + */ + public function handleAssemblerPendingData () { + // Handle it + $this->getAssemblerInstance()->handlePendingData(); + } + /** * Assembles incoming decoded data so it will become an abstract network * package again. The assembler does later do it's job by an other task, @@ -765,11 +793,8 @@ class NetworkPackage extends BaseHubSystem implements Deliverable, Receivable, R // Get current package content (an array with two elements; see handleIncomingDecodedData() for details) $packageContent = $this->getStackerInstance()->getNamed(self::STACKER_NAME_DECODED_HANDLED); - // Get a singleton package assembler instance from factory - $assemblerInstance = PackageAssemblerFactory::createAssemblerInstance(); - // Start assembling the raw package data array by chunking it - $assemblerInstance->chunkPackageContent($packageContent); + $this->getAssemblerInstance()->chunkPackageContent($packageContent); // Remove the package from 'handled_decoded' stack ... $this->getStackerInstance()->popNamed(self::STACKER_NAME_DECODED_HANDLED); diff --git a/application/hub/main/tasks/network/class_NetworkPackageReaderTask.php b/application/hub/main/tasks/network/class_NetworkPackageReaderTask.php index da672097a..dcb3f9531 100644 --- a/application/hub/main/tasks/network/class_NetworkPackageReaderTask.php +++ b/application/hub/main/tasks/network/class_NetworkPackageReaderTask.php @@ -77,6 +77,9 @@ class NetworkPackageReaderTask extends BaseTask implements Taskable, Visitable { if ($this->getPackageInstance()->isNewPackageArrived()) { // Okay, then handle newly arrived package $this->getPackageInstance()->handleNewlyArrivedPackage(); + } elseif ($this->getPackageInstance()->ifAssemblerHasPendingDataLeft()) { + // Okay, handle it here + $this->getPackageInstance()->handleAssemblerPendingData(); } elseif ($this->getPackageInstance()->isIncomingDecodedDataHandled()) { /* * We have handled decoded data so we should validate it, if we have diff --git a/docs/TODOs.txt b/docs/TODOs.txt index db2d967ce..8a6252233 100644 --- a/docs/TODOs.txt +++ b/docs/TODOs.txt @@ -6,7 +6,7 @@ ./application/hub/interfaces/helper/connections/class_ConnectionHelper.php:38: * @todo We may want to implement a filter for ease notification of other objects like our pool ./application/hub/interfaces/helper/messages/class_MessageHelper.php:10: * @todo Please find another name for this interface ./application/hub/interfaces/nodes/class_NodeHelper.php:10: * @todo We need to find a better name for this interface -./application/hub/main/class_BaseHubSystem.php:376: // @TODO On some systems it is 134, on some 107? +./application/hub/main/class_BaseHubSystem.php:400: // @TODO On some systems it is 134, on some 107? ./application/hub/main/commands/console/class_HubConsoleChatCommand.php:107: * @todo Should we add some more filters? ./application/hub/main/commands/console/class_HubConsoleChatCommand.php:58: * @todo Try to create a ChatActivationTask or so ./application/hub/main/commands/console/class_HubConsoleCruncherCommand.php:107: * @todo Should we add some more filters? @@ -88,13 +88,13 @@ ./application/hub/main/nodes/regular/class_HubRegularNode.php:68: * @todo Unfinished method ./application/hub/main/nodes/regular/class_HubRegularNode.php:91: // @TODO Add some filters here ./application/hub/main/package/class_NetworkPackage.php:23: * @todo Needs to add functionality for handling the object's type -./application/hub/main/package/class_NetworkPackage.php:244: * @todo $helperInstance is unused -./application/hub/main/package/class_NetworkPackage.php:248: // @TODO crc32() is very weak, but it needs to be fast -./application/hub/main/package/class_NetworkPackage.php:430: // @TODO We may want to do somthing more here? -./application/hub/main/package/class_NetworkPackage.php:558: // @TODO Add some logging here -./application/hub/main/package/class_NetworkPackage.php:688: // @TODO Add some logging here -./application/hub/main/package/class_NetworkPackage.php:787: // @TODO Add some content here -./application/hub/main/package/class_NetworkPackage.php:826: * @todo This may be enchanced for outgoing packages? +./application/hub/main/package/class_NetworkPackage.php:248: * @todo $helperInstance is unused +./application/hub/main/package/class_NetworkPackage.php:252: // @TODO crc32() is very weak, but it needs to be fast +./application/hub/main/package/class_NetworkPackage.php:434: // @TODO We may want to do somthing more here? +./application/hub/main/package/class_NetworkPackage.php:562: // @TODO Add some logging here +./application/hub/main/package/class_NetworkPackage.php:692: // @TODO Add some logging here +./application/hub/main/package/class_NetworkPackage.php:812: // @TODO Add some content here +./application/hub/main/package/class_NetworkPackage.php:851: * @todo This may be enchanced for outgoing packages? ./application/hub/main/package/fragmenter/class_PackageFragmenter.php:275: * @todo Implement a way to send non-announcement packages with extra-salt ./application/hub/main/package/fragmenter/class_PackageFragmenter.php:427: * @todo $helperInstance is unused ./application/hub/main/producer/cruncher/keys/class_CruncherKeyProducer.php:106: // @TODO Do something with it