Again contunued on 'hub' project:
authorRoland Häder <roland@mxchange.org>
Fri, 27 Apr 2012 19:38:26 +0000 (19:38 +0000)
committerRoland Häder <roland@mxchange.org>
Fri, 27 Apr 2012 19:38:26 +0000 (19:38 +0000)
- Receiving (but not processing) of local received packages (not forwareded
  packages) is now basicly finished
- TODOs.txt updated

application/hub/config.php
application/hub/interfaces/handler/chunks/class_HandleableChunks.php
application/hub/interfaces/package/assembler/class_Assembler.php
application/hub/interfaces/package/class_Receivable.php
application/hub/main/class_BaseHubSystem.php
application/hub/main/factories/package/assembler/class_PackageAssemblerFactory.php
application/hub/main/handler/chunks/class_ChunkHandler.php
application/hub/main/package/assembler/class_PackageAssembler.php
application/hub/main/package/class_NetworkPackage.php
application/hub/main/tasks/network/class_NetworkPackageReaderTask.php
docs/TODOs.txt

index d1394fb6cdc31bdbd3007f1d2f26bfcf9557c06c..a67e18f8991a569ea44c69f519c3706a62a8da5c 100644 (file)
@@ -303,6 +303,9 @@ $cfg->setConfigEntry('stacker_decoded_data_max_size', 100);
 // CFG: STACKER-FINAL-CHUNKS-MAX-SIZE
 $cfg->setConfigEntry('stacker_final_chunks_max_size', 100);
 
+// CFG: STACKER-PENDING-CHUNKS-MAX-SIZE
+$cfg->setConfigEntry('stacker_pending_chunks_max_size', 500);
+
 // CFG: STACKER-CHUNK-RAW-DATA-MAX-SIZE
 $cfg->setConfigEntry('stacker_chunk_raw_data_max_size', 100);
 
index 4c3d66ee8977875572866d336c4abd3b64a066b4..ffb59539c218fb41a406bc37c9f3cdc606b62d02 100644 (file)
@@ -31,6 +31,14 @@ interface HandleableChunks extends Handleable {
         */
        function addAllChunksWithFinal (array $chunks);
 
+       /**
+        * Adds all chunks and wait for more (e.g. incomplete transmission)
+        *
+        * @param       $chunks         An array with chunks, the last one should be a 'final'
+        * @return      void
+        */
+       function addAllChunksWait (array $chunks);
+
        /**
         * Checks whether unhandled chunks are available
         *
index 5b18035fc0303249b36df28492c50be801c24b12..22ea21821ec43802d95f13cdf9c677575a167191 100644 (file)
@@ -30,6 +30,21 @@ interface Assembler extends FrameworkInterface {
         * @return      void
         */
        function chunkPackageContent (array $packageContent);
+
+       /**
+        * Checks whether the assembler's pending data is empty which means it has
+        * no pending data left for handling ... ;-)
+        *
+        * @return      $ifPendingDataIsEmpty   Whether pending data is empty
+        */
+       function isPendingDataEmpty ();
+
+       /**
+        * Handles the assembler's pending data
+        *
+        * @return      void
+        */
+       function handlePendingData ();
 }
 
 // [EOF]
index 869a710bfd2640231e06e400bdc2668c40235dc2..fe1be45b895eadb05c8e6c8c5454669fcdc6b74c 100644 (file)
@@ -77,6 +77,21 @@ interface Receivable extends FrameworkInterface {
         * @return      $decodedData            The real package data that the sender has sent
         */
        function decodeRawContent ($rawPackageContent);
+
+       /**
+        * Checks whether the assembler has pending data left
+        *
+        * @return      $isHandled      Whether the assembler has pending data left
+        */
+       function ifAssemblerHasPendingDataLeft ();
+
+       /**
+        * Handles the attached assemler's pending data queue to be finally
+        * assembled to the raw package data back.
+        *
+        * @return      void
+        */
+       function handleAssemblerPendingData ();
 }
 
 // [EOF]
index 18a448d7aebfcd33a6c1c0f1fee221566e021af2..b6431b158eb696dbc6f4b422ec3fd7dc21793df5 100644 (file)
@@ -76,6 +76,11 @@ class BaseHubSystem extends BaseFrameworkSystem {
         */
        private $decoderInstance = NULL;
 
+       /**
+        * Assembler instance
+        */
+       private $assemblerInstance = NULL;
+
        /**
         * Protected constructor
         *
@@ -258,6 +263,25 @@ class BaseHubSystem extends BaseFrameworkSystem {
                return $this->decoderInstance;
        }
 
+       /**
+        * Setter for assembler instance
+        *
+        * @param       $assemblerInstance      A Decodeable instance
+        * @return      void
+        */
+       protected final function setAssemblerInstance (Assembler $assemblerInstance) {
+               $this->assemblerInstance = $assemblerInstance;
+       }
+
+       /**
+        * Getter for assembler instance
+        *
+        * @return      $assemblerInstance      A Decodeable instance
+        */
+       protected final function getAssemblerInstance () {
+               return $this->assemblerInstance;
+       }
+
        /**
         * Constructs a callable method name from given socket error code. If the
         * method is not found, a generic one is used.
index 62fc0e52374747bbb2454a3c70a7c78f17a0073b..f76fc01e3b60bc15069a126b36d67b54e46d96b2 100644 (file)
@@ -37,16 +37,17 @@ class PackageAssemblerFactory extends ObjectFactory {
         * be generated and stored in registry, else the assembler from the
         * registry will be returned.
         *
-        * @return      $assemblerInstance      An assembler instance
+        * @param       $packageInstance        An instance of a Receivable instance
+        * @return      $assemblerInstance      An instance of a Assembler instance
         */
-       public static final function createAssemblerInstance () {
+       public static final function createAssemblerInstance (Receivable $packageInstance) {
                // If there is no assembler?
                if (Registry::getRegistry()->instanceExists('package_assembler')) {
                        // Get assembler from registry
                        $assemblerInstance = Registry::getRegistry()->getInstance('package_assembler');
                } else {
                        // Get the assembler instance
-                       $assemblerInstance = self::createObjectByConfiguredName('package_assembler_class');
+                       $assemblerInstance = self::createObjectByConfiguredName('package_assembler_class', array($packageInstance));
 
                        // Add it to the registry
                        Registry::getRegistry()->addInstance('package_assembler', $assemblerInstance);
index 04db917df4c647706fa1d977836999473c77e00e..ae17e9681680b8817afc245f13c1dcc1d9d28ec2 100644 (file)
@@ -26,6 +26,7 @@ class ChunkHandler extends BaseHandler implements HandleableChunks, Registerable
         * Stacker for chunks with final EOP
         */
        const STACKER_NAME_CHUNKS_WITH_FINAL_EOP = 'final_chunks';
+       const STACKER_NAME_CHUNKS_WITHOUT_FINAL  = 'pending_chunks';
        const STACKER_NAME_ASSEMBLED_RAW_DATA    = 'chunk_raw_data';
 
        /**
@@ -91,6 +92,7 @@ class ChunkHandler extends BaseHandler implements HandleableChunks, Registerable
 
                // Init all stacker
                $stackerInstance->initStacker(self::STACKER_NAME_CHUNKS_WITH_FINAL_EOP);
+               $stackerInstance->initStacker(self::STACKER_NAME_CHUNKS_WITHOUT_FINAL);
                $stackerInstance->initStacker(self::STACKER_NAME_ASSEMBLED_RAW_DATA);
 
                // Set the stacker in this handler
@@ -192,6 +194,9 @@ class ChunkHandler extends BaseHandler implements HandleableChunks, Registerable
                        throw new ChunkAlreadyAssembledException(array($this, $chunkSplits), self::EXCEPTION_CHUNK_ALREADY_ASSEMBLED);
                } // END - if
 
+               // Debug message
+               //* NOISY-DEBUG: */ $this->debugOutput('CHUNK-HANDLER: serialNumber=' . $chunkSplits[self::CHUNK_SPLITS_INDEX_SERIAL] . ',hash=' . $chunkSplits[self::CHUNK_SPLITS_INDEX_HASH]);
+
                // Add the chunk data (index 2) to the final array and use the serial number as index
                $this->finalPackageChunks['content'][$chunkSplits[self::CHUNK_SPLITS_INDEX_SERIAL]] = $chunkSplits[self::CHUNK_SPLITS_INDEX_RAW_DATA];
 
@@ -241,6 +246,7 @@ class ChunkHandler extends BaseHandler implements HandleableChunks, Registerable
        private function preparePackageAssmble () {
                // Make sure both arrays have same count (this however should always be true)
                assert(count($this->finalPackageChunks['hashes']) == count($this->finalPackageChunks['content']));
+               //* DIE: */ die('finalPackageChunks='.print_r($this->finalPackageChunks['content'],true));
 
                /*
                 * Remove last element (hash chunk) from 'hashes'. This hash will never
@@ -306,6 +312,10 @@ class ChunkHandler extends BaseHandler implements HandleableChunks, Registerable
 
                // That went well, so start assembling all chunks
                foreach ($this->finalPackageChunks['content'] as $serialNumber=>$content) {
+                       // Debug message
+                       //* NOISY-DEBUG: */ $this->debugOutput('CHUNK-HANDLER: serialNumber=' . $serialNumber . ' - validating ...');
+                       //* NOISY-DEBUG: */ $this->debugOutput('finalPackageChunks=' . print_r($this->finalPackageChunks,true) . 'chunkHashes=' . print_r($this->chunkHashes,true));
+
                        // Is this chunk valid? This should be the case
                        assert($this->isChunkHashValid(array(
                                self::CHUNK_SPLITS_INDEX_HASH     => $this->finalPackageChunks['hashes'][$serialNumber],
@@ -354,6 +364,12 @@ class ChunkHandler extends BaseHandler implements HandleableChunks, Registerable
                        throw new FinalChunkVerificationException(array($this, $chunks), BaseListener::EXCEPTION_FINAL_CHUNK_VERIFICATION);
                } // END - if
 
+               // Do we have some pending chunks (no final)?
+               while (!$this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_CHUNKS_WITHOUT_FINAL)) {
+                       // Then get it first and add it before the EOP chunks
+                       array_unshift($chunks, $this->getStackerInstance()->popNamed(self::STACKER_NAME_CHUNKS_WITHOUT_FINAL));
+               } // END - while
+
                // Add all chunks to the FIFO stacker
                foreach ($chunks as $chunk) {
                        // Add the chunk
@@ -361,6 +377,20 @@ class ChunkHandler extends BaseHandler implements HandleableChunks, Registerable
                } // END - foreach
        }
 
+       /**
+        * Adds all chunks and wait for more (e.g. incomplete transmission)
+        *
+        * @param       $chunks         An array with chunks, the last one should be a 'final'
+        * @return      void
+        */
+       public function addAllChunksWait (array $chunks) {
+               // Add all chunks to the FIFO stacker
+               foreach ($chunks as $chunk) {
+                       // Add the chunk
+                       $this->getStackerInstance()->pushNamed(self::STACKER_NAME_CHUNKS_WITHOUT_FINAL, $chunk);
+               } // END - foreach
+       }
+
        /**
         * Checks whether unhandled chunks are available
         *
index e518008aeeef57668d11e0dcd96cb10886df01db..b53cc50ff07f16cc48b4c127b4aadc19af1b19ab 100644 (file)
  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
  */
 class PackageAssembler extends BaseHubSystem implements Assembler, Registerable {
+       /**
+        * Pending data
+        */
+       private $pendingData = '';
+
        /**
         * Protected constructor
         *
@@ -36,16 +41,36 @@ class PackageAssembler extends BaseHubSystem implements Assembler, Registerable
        /**
         * Creates an instance of this class
         *
+        * @param       $packageInstance        An instance of a Receivable class
         * @return      $assemblerInstance      An instance of an Assembler class
         */
-       public static final function createPackageAssembler () {
+       public static final function createPackageAssembler (Receivable $packageInstance) {
                // Get new instance
                $assemblerInstance = new PackageAssembler();
 
+               // Set package instance here
+               $assemblerInstance->setPackageInstance($packageInstance);
+
                // Return the prepared instance
                return $assemblerInstance;
        }
 
+       /**
+        * Checks whether the input buffer (stacker to be more preceise) is empty.
+        *
+        * @return      $isInputBufferEmpty             Whether the input buffer is empty
+        */
+       private function ifInputBufferIsEmpty () {
+               // Check it
+               $isInputBufferEmpty = $this->getPackageInstance()->getStackerInstance()->isStackEmpty(NetworkPackage::STACKER_NAME_DECODED_HANDLED);
+
+               // Debug message
+               //* NOISY-DEBUG: */ $this->debugOutput('PACKAGE-ASSEMBLER: isInputBufferEmpty=' . intval($isInputBufferEmpty));
+
+               // Return it
+               return $isInputBufferEmpty;
+       }
+
        /**
         * Assembles the content from $packageContent. This method does only
         * initialize the whole process by creating a call-back which will then
@@ -80,6 +105,10 @@ class PackageAssembler extends BaseHubSystem implements Assembler, Registerable
                call_user_func(array($this, $methodName), $packageContent);
        }
 
+       /**************************************************************************
+        *                 Call-back methods for above method                     *
+        **************************************************************************/
+
        /**
         * Call-back handler to handle unhandled packages. This method "explodes"
         * the string with the chunk separator from PackageFragmenter class, does
@@ -91,25 +120,69 @@ class PackageAssembler extends BaseHubSystem implements Assembler, Registerable
         * @throws      FinalChunkVerificationException         If the final chunk does not start with 'EOP:'
         */
        private function handlePackageByUnhandledPackage (array $packageContent) {
-               /*
-                * "explode" the string from 'decoded_data' with chunk separator to
-                * get an array of chunks. These chunks must then be verified by
-                * their checksums. Also the final chunk must be handled.
-                */
-               $chunks = explode(PackageFragmenter::CHUNK_SEPARATOR, $packageContent[BaseRawDataHandler::PACKAGE_DECODED_DATA]);
-               //* NOISY-DEBUG */ $this->debugOutput('chunks='.print_r($chunks,true));
-
-               // Validate final chunk
-               if (!$this->isValidFinalChunk($chunks)) {
-                       // Last chunk is not valid
-                       throw new FinalChunkVerificationException(array($this, $chunks), BaseListener::EXCEPTION_FINAL_CHUNK_VERIFICATION);
-               } // END - if
+               // Check for some conditions
+               if (!$this->ifInputBufferIsEmpty()) {
+                       // Last chunk is not valid, so wait for more
+                       $this->pendingData .= $packageContent[BaseRawDataHandler::PACKAGE_DECODED_DATA];
+
+                       // Debug message
+                       /* NOISY-DEBUG: */ $this->debugOutput('PACKAGE-ASSEMBLER: Partial data received. Waiting for more ... ( ' . strlen($packageContent[BaseRawDataHandler::PACKAGE_DECODED_DATA]) . ' bytes)');
+               } else {
+                       // Debug message
+                       //* NOISY-DEBUG */ $this->debugOutput('packageContent=' . print_r($packageContent,true) . ',chunks='.print_r($chunks,true));
+
+                       /*
+                        * "explode" the string from 'decoded_data' with chunk separator to
+                        * get an array of chunks. These chunks must then be verified by
+                        * their checksums. Also the final chunk must be handled.
+                        */
+                       $chunks = explode(PackageFragmenter::CHUNK_SEPARATOR, $packageContent[BaseRawDataHandler::PACKAGE_DECODED_DATA]);
+
+                       // Now get a chunk handler instance
+                       $handlerInstance = ChunkHandlerFactory::createChunkHandlerInstance();
+
+                       // Add all chunks because the last final chunk is found
+                       $handlerInstance->addAllChunksWithFinal($chunks);
+               }
+       }
+
+       /**
+        * Checks whether the assembler's pending data is empty which means it has
+        * no pending data left for handling ... ;-)
+        *
+        * @return      $ifPendingDataIsEmpty   Whether pending data is empty
+        */
+       public function isPendingDataEmpty () {
+               // A simbple check
+               $ifPendingDataIsEmpty = empty($this->pendingData);
+
+               // Return it
+               return $ifPendingDataIsEmpty;
+       }
+
+       /**
+        * Handles the assembler's pending data
+        *
+        * @return      void
+        */
+       public function handlePendingData () {
+               // Assert on condition
+               assert(!$this->isPendingDataEmpty());
+
+               // Init fake array
+               $packageContent = array(
+                       BaseRawDataHandler::PACKAGE_DECODED_DATA => $this->pendingData,
+                       BaseRawDataHandler::PACKAGE_ERROR_CODE   => BaseRawDataHandler::SOCKET_ERROR_UNHANDLED
+               );
+
+               // Clear pending data
+               $this->pendingData = '';
 
-               // Now get a chunk handler instance
-               $handlerInstance = ChunkHandlerFactory::createChunkHandlerInstance();
+               // Debug message
+               /* NOISY-DEBUG: */ $this->debugOutput('PACKAGE-ASSEMBLER: Last block of partial data received. A total of ' . strlen($packageContent[BaseRawDataHandler::PACKAGE_DECODED_DATA]) . ' bytes has been received.');
 
-               // Add all chunks because the last final chunk is found
-               $handlerInstance->addAllChunksWithFinal($chunks);
+               // Call the real handler method
+               $this->handlePackageByUnhandledPackage($packageContent);
        }
 }
 
index 06869fcd148f6baac9e717b5587f2f11c3ae1fe5..8c649d37152a265c6fb590ced2342acef8bb4af7 100644 (file)
@@ -207,6 +207,10 @@ class NetworkPackage extends BaseHubSystem implements Deliverable, Receivable, R
                $cryptoInstance = ObjectFactory::createObjectByConfiguredName('crypto_class');
                $packageInstance->setCryptoInstance($cryptoInstance);
 
+               // Get a singleton package assembler instance from factory and set it here
+               $assemblerInstance = PackageAssemblerFactory::createAssemblerInstance($packageInstance);
+               $packageInstance->setAssemblerInstance($assemblerInstance);
+
                // Return the prepared instance
                return $packageInstance;
        }
@@ -751,6 +755,30 @@ class NetworkPackage extends BaseHubSystem implements Deliverable, Receivable, R
                return $isHandled;
        }
 
+       /**
+        * Checks whether the assembler has pending data left
+        *
+        * @return      $isHandled      Whether the assembler has pending data left
+        */
+       public function ifAssemblerHasPendingDataLeft () {
+               // Determine if the stack is not empty
+               $isHandled = (!$this->getAssemblerInstance()->isPendingDataEmpty());
+
+               // Return it
+               return $isHandled;
+       }
+
+       /**
+        * Handles the attached assemler's pending data queue to be finally
+        * assembled to the raw package data back.
+        *
+        * @return      void
+        */
+       public function handleAssemblerPendingData () {
+               // Handle it
+               $this->getAssemblerInstance()->handlePendingData();
+       }
+
        /**
         * Assembles incoming decoded data so it will become an abstract network
         * package again. The assembler does later do it's job by an other task,
@@ -765,11 +793,8 @@ class NetworkPackage extends BaseHubSystem implements Deliverable, Receivable, R
                // Get current package content (an array with two elements; see handleIncomingDecodedData() for details)
                $packageContent = $this->getStackerInstance()->getNamed(self::STACKER_NAME_DECODED_HANDLED);
 
-               // Get a singleton package assembler instance from factory
-               $assemblerInstance = PackageAssemblerFactory::createAssemblerInstance();
-
                // Start assembling the raw package data array by chunking it
-               $assemblerInstance->chunkPackageContent($packageContent);
+               $this->getAssemblerInstance()->chunkPackageContent($packageContent);
 
                // Remove the package from 'handled_decoded' stack ...
                $this->getStackerInstance()->popNamed(self::STACKER_NAME_DECODED_HANDLED);
index da672097a97d43e8b093c26151a11b500a10f36b..dcb3f9531684967ccd112ec666e39953278fb86d 100644 (file)
@@ -77,6 +77,9 @@ class NetworkPackageReaderTask extends BaseTask implements Taskable, Visitable {
                if ($this->getPackageInstance()->isNewPackageArrived()) {
                        // Okay, then handle newly arrived package
                        $this->getPackageInstance()->handleNewlyArrivedPackage();
+               } elseif ($this->getPackageInstance()->ifAssemblerHasPendingDataLeft()) {
+                       // Okay, handle it here
+                       $this->getPackageInstance()->handleAssemblerPendingData();
                } elseif ($this->getPackageInstance()->isIncomingDecodedDataHandled()) {
                        /*
                         * We have handled decoded data so we should validate it, if we have
index db2d967cee38b7b7c9bc4e49f395b971b697bdd5..8a625223338fb4eb81613098434cfadb565e39f7 100644 (file)
@@ -6,7 +6,7 @@
 ./application/hub/interfaces/helper/connections/class_ConnectionHelper.php:38:  * @todo        We may want to implement a filter for ease notification of other objects like our pool
 ./application/hub/interfaces/helper/messages/class_MessageHelper.php:10: * @todo               Please find another name for this interface
 ./application/hub/interfaces/nodes/class_NodeHelper.php:10: * @todo            We need to find a better name for this interface
-./application/hub/main/class_BaseHubSystem.php:376:                            // @TODO On some systems it is 134, on some 107?
+./application/hub/main/class_BaseHubSystem.php:400:                            // @TODO On some systems it is 134, on some 107?
 ./application/hub/main/commands/console/class_HubConsoleChatCommand.php:107:    * @todo        Should we add some more filters?
 ./application/hub/main/commands/console/class_HubConsoleChatCommand.php:58:     * @todo        Try to create a ChatActivationTask or so
 ./application/hub/main/commands/console/class_HubConsoleCruncherCommand.php:107:        * @todo        Should we add some more filters?
 ./application/hub/main/nodes/regular/class_HubRegularNode.php:68:       * @todo        Unfinished method
 ./application/hub/main/nodes/regular/class_HubRegularNode.php:91:              // @TODO Add some filters here
 ./application/hub/main/package/class_NetworkPackage.php:23: * @todo            Needs to add functionality for handling the object's type
-./application/hub/main/package/class_NetworkPackage.php:244:    * @todo        $helperInstance is unused
-./application/hub/main/package/class_NetworkPackage.php:248:           // @TODO crc32() is very weak, but it needs to be fast
-./application/hub/main/package/class_NetworkPackage.php:430:                   // @TODO We may want to do somthing more here?
-./application/hub/main/package/class_NetworkPackage.php:558:                   // @TODO Add some logging here
-./application/hub/main/package/class_NetworkPackage.php:688:                   // @TODO Add some logging here
-./application/hub/main/package/class_NetworkPackage.php:787:           // @TODO Add some content here
-./application/hub/main/package/class_NetworkPackage.php:826:    * @todo        This may be enchanced for outgoing packages?
+./application/hub/main/package/class_NetworkPackage.php:248:    * @todo        $helperInstance is unused
+./application/hub/main/package/class_NetworkPackage.php:252:           // @TODO crc32() is very weak, but it needs to be fast
+./application/hub/main/package/class_NetworkPackage.php:434:                   // @TODO We may want to do somthing more here?
+./application/hub/main/package/class_NetworkPackage.php:562:                   // @TODO Add some logging here
+./application/hub/main/package/class_NetworkPackage.php:692:                   // @TODO Add some logging here
+./application/hub/main/package/class_NetworkPackage.php:812:           // @TODO Add some content here
+./application/hub/main/package/class_NetworkPackage.php:851:    * @todo        This may be enchanced for outgoing packages?
 ./application/hub/main/package/fragmenter/class_PackageFragmenter.php:275:      * @todo        Implement a way to send non-announcement packages with extra-salt
 ./application/hub/main/package/fragmenter/class_PackageFragmenter.php:427:      * @todo        $helperInstance is unused
 ./application/hub/main/producer/cruncher/keys/class_CruncherKeyProducer.php:106:                       // @TODO Do something with it