*/
private $shuttedDown = false;
+ /**
+ * Currently sent chunks
+ */
+ private $sentChunks = array();
+
+ /**
+ * Current final hash
+ */
+ private $currentFinalHash = '';
+
/**
* Protected constructor
*
* "Getter" for raw data from a package array. A fragmenter is used which
* will returns us only so many raw data which fits into the back buffer.
* The rest is being held in a back-buffer and waits there for the next
- * cycle and while be then sent. This is done by a FIFO.
+ * cycle and while be then sent.
*
* This method does 4 simple steps:
* 1) Aquire fragmenter object instance from the factory
* 2) Handle over the package data array to the fragmenter
- * 3) Request a chunk (which "pops" the chunk from the fragmenter's FIFO)
- * 4) Finally return the chunk to the caller
+ * 3) Request a chunk
+ * 4) Finally return the chunk (array) to the caller
*
* @param $packageData Raw package data array
- * @return $rawData Raw package data bytes
+ * @return $rawDataChunk An array with the raw data as value and chunk hash as key
*/
private function getRawDataFromPackageArray (array $packageData) {
- // Get the fragmenter instance
- $fragmenterInstance = ObjectFactory::createObjectByConfiguredName('package_fragmenter_class');
+ // If there is no fragmenter?
+ if (!Registry::getRegistry()->instanceExists('package_fragmenter')) {
+ // Get the fragmenter instance
+ $fragmenterInstance = ObjectFactory::createObjectByConfiguredName('package_fragmenter_class');
+
+ // Add it to the registry
+ Registry::getRegistry()->addInstance('package_fragmenter', $fragmenterInstance);
+ } else {
+ // Get fragmenter from registry
+ $fragmenterInstance = Registry::getRegistry()->getInstance('package_fragmenter');
+ }
- // Implode the package data array and fragement the resulting string
- $fragmenterInstance->fragmentPackageArray($packageData, $this);
+ // Implode the package data array and fragement the resulting string, returns the final hash
+ $this->currentFinalHash = $fragmenterInstance->fragmentPackageArray($packageData, $this);
- // Get the next raw data chunk from the fragmenter's FIFO
- $rawData = $fragmenterInstance->getNextRawDataChunk($packageData);
- /* DEBUG: */ $this->debugOutput('rawData['.strlen($rawData).']='.$rawData);
- /* DEBUG: */ die();
+ // Get the next raw data chunk from the fragmenter
+ $rawDataChunk = $fragmenterInstance->getNextRawDataChunk($this->currentFinalHash);
// Return it
- return $rawData;
+ return $rawDataChunk;
}
/**
*/
public function sendRawPackageData (array $packageData) {
// Convert the package data array to a raw data stream
- $rawData = $this->getRawDataFromPackageArray($packageData);
+ $rawDataChunk = $this->getRawDataFromPackageArray($packageData);
// Get socket resource
$socketResource = $this->getSocketResource();
* You should have received a copy of the GNU General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
-class PackageFragmenter extends BaseFrameworkSystem implements Fragmentable {
+class PackageFragmenter extends BaseFrameworkSystem implements Fragmentable, Registerable {
+ /**
+ * Cached chunk size in bits
+ */
+ private $chunkSize = 0;
+
+ /**
+ * Array for chunks
+ */
+ private $chunks = array();
+
+ /**
+ * Array for chunk hashes
+ */
+ private $chunkHashes = array();
+
+ /**
+ * Array for chunk pointers
+ */
+ private $chunkPointers = array();
+
+ /**
+ * Array for processed packages
+ */
+ private $processedPackages = array();
+
+ /**
+ * Serial number
+ */
+ private $serialNumber = 0x00000000;
+
+ /**
+ * Length of largest possible serial number
+ */
+ private $maxSerialLength = 8;
+
+ /**
+ * Maximum possible serial number
+ */
+ private $maxSerialNumber = 0;
+ /**
+ * Seperator between chunk data, serial number and chunk hash
+ */
+ const CHUNK_DATA_HASH_SEPERATOR = '@';
+
+ /**
+ * Seperator for all chunk hashes
+ */
+ const CHUNK_HASH_SEPERATOR = ';';
+
+ /**
+ * Identifier for hash chunk
+ */
+ const HASH_CHUNK_IDENTIFIER = 'HASH-CHUNK:';
+
/**
* Protected constructor
*
protected function __construct () {
// Call parent constructor
parent::__construct(__CLASS__);
+
+ // Init this fragmenter
+ $this->initFragmenter();
}
/**
// Get new instance
$fragmenterInstance = new PackageFragmenter();
+ // Get an output stream for all out-going packages
+ $streamInstance = ObjectFactory::createObjectByConfiguredName('node_raw_package_output_stream');
+
+ // And set it in this fragmenter
+ $fragmenterInstance->setOutputStreamInstance($streamInstance);
+
+ // And also a crypto instance (for our encrypted messages)
+ $cryptoInstance = ObjectFactory::createObjectByConfiguredName('crypto_class');
+ $fragmenterInstance->setCryptoInstance($cryptoInstance);
+
// Return the prepared instance
return $fragmenterInstance;
}
+ /**
+ * Initializes this fragmenter
+ *
+ * @return void
+ */
+ private function initFragmenter () {
+ // Load some configuration entries and "cache" them:
+ // - Chunk size in bits
+ $this->chunkSize = $this->getConfigInstance()->getConfigEntry('package_chunk_size');
+
+ // - Maximum serial number
+ $this->maxSerialNumber = $this->hex2dec(str_repeat('f', $this->maxSerialLength));
+ }
+
+ /**
+ * Initializes the pointer for given final hash
+ *
+ * @param $finalHash Final hash to initialize pointer for
+ * @return void
+ */
+ private function initPointer ($finalHash) {
+ $this->chunkPointers[$finalHash] = 0;
+ }
+
+ /**
+ * "Getter" for processedPackages array index
+ *
+ * @param $packageData Raw package data array
+ * @return $index Array index for processedPackages
+ */
+ private function getProcessedPackagesIndex (array $packageData) {
+ return (
+ $packageData['sender'] . NetworkPackage::PACKAGE_DATA_SEPERATOR .
+ $packageData['recipient'] . NetworkPackage::PACKAGE_DATA_SEPERATOR .
+ $packageData['content'] . NetworkPackage::PACKAGE_DATA_SEPERATOR
+ );
+ }
+
+ /**
+ * Checks wether the given package data is already processed by this fragmenter
+ *
+ * @param $packageData Raw package data array
+ * @return $isProcessed Wether the package has been fragmented
+ */
+ private function isPackageProcessed (array $packageData) {
+ // Get array index
+ $index = $this->getProcessedPackagesIndex($packageData);
+
+ // Is the array index there?
+ $isProcessed = (
+ (isset($this->processedPackages[$index])) &&
+ ($this->processedPackages[$index] === true)
+ );
+
+ // Return it
+ return $isProcessed;
+ }
+
+ /**
+ * Marks the given package data as processed by this fragmenter
+ *
+ * @param $packageData Raw package data array
+ * @return void
+ */
+ private function markPackageDataProcessed (array $packageData) {
+ // Remember it (until we may remove it)
+ $this->processedPackages[$this->getProcessedPackagesIndex($packageData)] = true;
+ }
+
+ /**
+ * Getter for final hash from given package data
+ *
+ * @param $packageData Raw package data array
+ * @return $finalHash Final hash for package data
+ */
+ private function getFinalHashFromPackageData (array $packageData) {
+ // Make sure it is there
+ assert(isset($this->processedPackages[$this->getProcessedPackagesIndex($packageData)]));
+
+ // Return it
+ return $this->processedPackages[$this->getProcessedPackagesIndex($packageData)];
+ }
+
+ /**
+ * Get next chunk pointer for given final hash
+ *
+ * @param $finalHash Final hash to get current pointer for
+ */
+ private function getCurrentChunkPointer ($finalHash) {
+ // Is the final hash valid?
+ assert(strlen($finalHash) > 0);
+
+ // Is the pointer already initialized?
+ //* NOISY-DEBUG: */ $this->debugOutput('FRAGMENTER: finalHash=' . $finalHash);
+ assert(isset($this->chunkPointers[$finalHash]));
+
+ // Return it
+ return $this->chunkPointers[$finalHash];
+ }
+
+ /**
+ * Advance the chunk pointer for given final hash
+ *
+ * @param $finalHash Final hash to advance the pointer for
+ */
+ private function nextChunkPointer ($finalHash) {
+ // Is the pointer already initialized?
+ assert(isset($this->chunkPointers[$finalHash]));
+
+ // Count one up
+ $this->chunkPointers[$finalHash]++;
+ }
+
+ /**
+ * "Getter" for data chunk size of given hash.
+ *
+ * @param $hash Hash to substract it's length
+ * @return $dataChunkSize The chunk size
+ */
+ private function getDataChunkSizeFromHash ($hash) {
+ // Calculate real (data) chunk size
+ $dataChunkSize = (
+ // Real chunk size
+ ($this->chunkSize / 8) -
+ // Hash size
+ strlen($hash) -
+ // Length of sperators
+ (strlen(self::CHUNK_DATA_HASH_SEPERATOR) * 2) -
+ // Length of max serial number
+ $this->maxSerialLength
+ );
+
+ // This should be larger than zero bytes
+ assert($dataChunkSize > 0);
+
+ // Return it
+ return $dataChunkSize;
+ }
+
+ /**
+ * Generates a hash from raw data
+ *
+ * @param $rawData Raw data bytes to hash
+ * @return $hash Hash from the raw data
+ */
+ private function generateHashFromRawData ($rawData) {
+ // Get the crypto instance and hash the data
+ $hash = $this->getCryptoInstance()->hashString($rawData);
+
+ // Return it
+ return $hash;
+ }
+
+ /**
+ * "Getter" for the next hexadecimal-encoded serial number
+ *
+ * @return $encodedSerialNumber The next hexadecimal-encoded serial number
+ */
+ private function getNextHexSerialNumber () {
+ // Assert on maximum serial number length
+ assert($this->serialNumber <= $this->maxSerialNumber);
+
+ // Encode the current serial number
+ $encodedSerialNumber = $this->dec2Hex($this->serialNumber, $this->maxSerialLength);
+
+ // Count one up
+ $this->serialNumber++;
+
+ // Return the encoded serial number
+ return $encodedSerialNumber;
+ }
+
+ /**
+ * Splits the given encoded data into smaller chunks, the size of the final
+ * and the seperator is being subtracted from chunk size to fit it into a
+ * TCP package (512 bytes).
+ *
+ * @param $encodedData Encoded data string
+ * @param $finalHash Final hash from the raw data
+ * @return void
+ */
+ private function splitEncodedDataIntoChunks ($encodedData, $finalHash) {
+ // Make sure final hashes with at least 32 bytes can pass
+ assert(strlen($finalHash) >= 32);
+
+ // Calculate real (data) chunk size
+ $dataChunkSize = $this->getDataChunkSizeFromHash($finalHash);
+
+ // Now split it up
+ for ($idx = 0; $idx < strlen($encodedData); $idx += $dataChunkSize) {
+ // Get the next chunk
+ $chunk = substr($encodedData, $idx, $dataChunkSize);
+
+ // Hash it and remember it in seperate array
+ $chunkHash = $this->getCryptoInstance()->hashString($chunk);
+ $this->chunkHashes[$finalHash][] = $chunkHash;
+
+ // Prepend the hash to the chunk
+ $chunk = $chunkHash . self::CHUNK_DATA_HASH_SEPERATOR . $this->getNextHexSerialNumber() . self::CHUNK_DATA_HASH_SEPERATOR . $chunk;
+
+ // Make sure the chunk is not larger than a TCP package can hold
+ assert(strlen($chunk) <= NetworkPackage::TCP_PACKAGE_SIZE);
+
+ // Add it to the array
+ $this->chunks[$finalHash][] = $chunk;
+ } // END - for
+
+ // Debug output
+ $this->debugOutput('FRAGMENTER: Encoded data of ' . strlen($encodedData) . ' bytes has been fragmented into ' . count($this->chunks[$finalHash]) . ' chunk(s).');
+ }
+
+ /**
+ * Prepends a chunk (or more) with all hashes from all chunks + final chunk.
+ *
+ * @param $encodedData Encoded data string
+ * @param $finalHash Final hash from the raw data
+ * @return void
+ */
+ private function prependHashChunk ($encodedData, $finalHash) {
+ // "Implode" the whole array of hashes into one string
+ $rawData = self::HASH_CHUNK_IDENTIFIER . implode(self::CHUNK_HASH_SEPERATOR, $this->chunkHashes[$finalHash]);
+
+ // Also get a hash from it
+ $chunkHash = $this->generateHashFromRawData($rawData);
+
+ // Also encode this one
+ $encodedData = $this->getOutputStreamInstance()->streamData($rawData);
+
+ // Calulcate chunk size
+ $dataChunkSize = $this->getDataChunkSizeFromHash($chunkHash);
+
+ // Now array_unshift() it to the two chunk arrays
+ for ($idx = 0; $idx < strlen($encodedData); $idx += $dataChunkSize) {
+ // Get the next chunk
+ $chunk = substr($encodedData, $idx, $dataChunkSize);
+
+ // Hash it and remember it in seperate array
+ $chunkHash = $this->getCryptoInstance()->hashString($chunk);
+ array_unshift($this->chunkHashes[$finalHash], $chunkHash);
+
+ // Prepend the hash to the chunk
+ $chunk = $chunkHash . self::CHUNK_DATA_HASH_SEPERATOR . $this->getNextHexSerialNumber() . self::CHUNK_DATA_HASH_SEPERATOR . $chunk;
+
+ // Make sure the chunk is not larger than a TCP package can hold
+ assert(strlen($chunk) <= NetworkPackage::TCP_PACKAGE_SIZE);
+
+ // Add it to the array
+ array_unshift($this->chunks[$finalHash], $chunk);
+ } // END - for
+ }
+
/**
* This method does "implode" the given package data array into one long
* string, splits it into small chunks, adds a serial number and checksum
- * to all chunks and prepends a final hashsum chunk.
+ * to all chunks and prepends a chunk with all hashes only in it. It will
+ * return the final hash for faster processing of packages.
*
* @param $packageData Raw package data array
* @param $connectionInstance A helper instance for connections
- * @return void
+ * @return $finalHash Final hash for faster processing
+ * @todo $connectionInstance is unused
*/
public function fragmentPackageArray (array $packageData, BaseConnectionHelper $connectionInstance) {
+ // Is this package already fragmented?
+ if (!$this->isPackageProcessed($packageData)) {
+ // First we need to "implode" the array
+ $rawData = implode(NetworkPackage::PACKAGE_DATA_SEPERATOR, $packageData);
+
+ // Generate the final hash from the raw data (not encoded!)
+ $finalHash = $this->generateHashFromRawData($rawData);
+
+ // Remember it
+ $this->processedPackages[$this->getProcessedPackagesIndex($packageData)] = $finalHash;
+
+ // Init pointer
+ $this->initPointer($finalHash);
+
+ // Encode the package for delivery
+ $encodedData = $this->getOutputStreamInstance()->streamData($rawData);
+
+ // Split the encoded data into smaller chunks
+ $this->splitEncodedDataIntoChunks($encodedData, $finalHash);
+
+ // Prepend a chunk with all hashes together
+ $this->prependHashChunk($encodedData, $finalHash);
+
+ // Mark the package as fragmented
+ $this->markPackageDataProcessed($packageData);
+ } else {
+ // Get the final hash from the package data
+ $finalHash = $this->getFinalHashFromPackageData($packageData);
+ }
+
+ // Return final hash
+ return $finalHash;
}
/**
* This method gets the next chunk from the internal FIFO which should be
- * sent to the given recipient.
+ * sent to the given recipient. It will return an associative array where
+ * the key is the chunk hash and value the raw chunk data.
*
- * @param $packageData Raw package data array
+ * @param $finalHash Final hash for faster lookup
* @return $rawDataChunk Raw package data chunk
*/
- public function getNextRawDataChunk (array $packageData) {
+ public function getNextRawDataChunk ($finalHash) {
+ try {
+ // Get current chunk index
+ $current = $this->getCurrentChunkPointer($finalHash);
+ } catch (AssertionException $e) {
+ // This may happen when the final hash is true
+ if ($finalHash === true) {
+ // Set current to null
+ $current = null;
+ } else {
+ // Throw the exception
+ throw $e;
+ }
+ }
+
+ // If there is no entry left, return an empty array
+ if ((!isset($this->chunkHashes[$finalHash][$current])) || (!isset($this->chunks[$finalHash][$current]))) {
+ // No more entries found
+ return array();
+ } // END - if
+
+ // Generate the array
+ $rawDataChunk = array(
+ $this->chunkHashes[$finalHash][$current] => $this->chunks[$finalHash][$current]
+ );
+
+ // Count one index up
+ $this->nextChunkPointer($finalHash);
+
+ // Return the chunk array
+ return $rawDataChunk;
}
}