private $sentData = 0;
/**
- * Offset
+ * Difference
*/
- private $offset = 0;
+ private $diff = 0;
/**
* Connect retries for this connection
*/
private $shuttedDown = false;
+ /**
+ * Currently queued chunks
+ */
+ private $queuedChunks = array();
+
+ /**
+ * Current final hash
+ */
+ private $currentFinalHash = '';
+
/**
* Protected constructor
*
// Register this connection helper
Registry::getRegistry()->addInstance('connection', $this);
+
+ // Initialize output stream
+ $streamInstance = ObjectFactory::createObjectByConfiguredName('node_raw_data_output_stream_class');
+
+ // And add it to this connection helper
+ $this->setOutputStreamInstance($streamInstance);
}
/**
$this->address = $address;
}
- /**
- * "Accept" a visitor by simply calling it back
- *
- * @param $visitorInstance A Visitor instance
- * @return void
- */
- protected final function accept (Visitor $visitorInstance) {
- // Just call the visitor
- $visitorInstance->visitConnectionHelper($this);
- }
-
/**
* "Getter" for raw data from a package array. A fragmenter is used which
* will returns us only so many raw data which fits into the back buffer.
* The rest is being held in a back-buffer and waits there for the next
- * cycle and while be then sent. This is done by a FIFO.
+ * cycle and while be then sent.
*
* This method does 4 simple steps:
* 1) Aquire fragmenter object instance from the factory
* 2) Handle over the package data array to the fragmenter
- * 3) Request a chunk (which "pops" the chunk from the fragmenter's FIFO)
- * 4) Finally return the chunk to the caller
+ * 3) Request a chunk
+ * 4) Finally return the chunk (array) to the caller
*
* @param $packageData Raw package data array
- * @return $rawData Raw package data bytes
+ * @return $chunkData Raw data chunk
*/
private function getRawDataFromPackageArray (array $packageData) {
// Get the fragmenter instance
- $fragmenterInstance = ObjectFactory::createObjectByConfiguredName('package_fragmenter_class');
+ $fragmenterInstance = FragmenterFactory::createFragmenterInstance('package_fragmenter_class');
- // Implode the package data array and fragement the resulting string
- $fragmenterInstance->fragmentPackageArray($packageData, $this);
+ // Implode the package data array and fragement the resulting string, returns the final hash
+ $finalHash = $fragmenterInstance->fragmentPackageArray($packageData, $this);
+ if ($finalHash !== true) {
+ $this->currentFinalHash = $finalHash;
+ } // END - if
- // Get the next raw data chunk from the fragmenter's FIFO
- $rawData = $fragmenterInstance->getNextRawDataChunk($packageData);
- /* DEBUG: */ $this->debugOutput('rawData['.strlen($rawData).']='.$rawData);
- /* DEBUG: */ die();
+ // Debug message
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: currentFinalHash=' . $this->currentFinalHash);
- // Return it
- return $rawData;
+ // Get the next raw data chunk from the fragmenter
+ $rawDataChunk = $fragmenterInstance->getNextRawDataChunk($this->currentFinalHash);
+
+ // Get chunk hashes and chunk data
+ $chunkHashes = array_keys($rawDataChunk);
+ $chunkData = array_values($rawDataChunk);
+
+ // Is the required data there?
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: chunkHashes[]=' . count($chunkHashes) . ',chunkData[]=' . count($chunkData));
+ if ((isset($chunkHashes[0])) && (isset($chunkData[0]))) {
+ // Remember this chunk as queued
+ $this->queuedChunks[$chunkHashes[0]] = $chunkData[0];
+
+ // Return the raw data
+ return $chunkData[0];
+ } else {
+ // Return zero string
+ return '';
+ }
+ }
+
+ /**
+ * "Accept" a visitor by simply calling it back
+ *
+ * @param $visitorInstance A Visitor instance
+ * @return void
+ */
+ protected final function accept (Visitor $visitorInstance) {
+ // Just call the visitor
+ $visitorInstance->visitConnectionHelper($this);
}
/**
* Sends raw package data to the recipient
*
- * @param $packageData Raw package data
- * @return $sentBytes Actual sent bytes to the peer
+ * @param $packageData Raw package data
+ * @return $totalSentBytes Total sent bytes to the peer
* @throws InvalidSocketException If we got a problem with this socket
*/
public function sendRawPackageData (array $packageData) {
- // Convert the package data array to a raw data stream
- $rawData = $this->getRawDataFromPackageArray($packageData);
+ // Cache buffer length
+ $bufferSize = $this->getConfigInstance()->getConfigEntry($this->getProtocol() . '_buffer_length');
+
+ // Init variables
+ $rawData = '';
+ $dataStream = ' ';
+ $totalSentBytes = 0;
+
+ // Fill sending buffer with data
+ while ((strlen($rawData) < $bufferSize) && (strlen($dataStream) > 0)) {
+ // Convert the package data array to a raw data stream
+ $dataStream = $this->getRawDataFromPackageArray($packageData);
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: Adding ' . strlen($dataStream) . ' bytes to the sending buffer ...');
+ $rawData .= $dataStream;
+ } // END - while
+
+ // Nothing to sent is bad news!
+ assert(strlen($rawData) > 0);
+
+ // Encode the raw data with our output-stream
+ $encodedData = $this->getOutputStreamInstance()->streamData($rawData);
+
+ // Calculate difference
+ $this->diff = $bufferSize - strlen($encodedData);
// Get socket resource
$socketResource = $this->getSocketResource();
- // And deliver it
- $sentBytes = @socket_write($socketResource, $rawData, $this->getConfigInstance()->getConfigEntry($this->getProtocol() . '_buffer_length') - $this->offset);
+ // Init sent bytes
+ $sentBytes = 0;
- // If there was an error, we don't continue here
- if ($sentBytes === false) {
- // Get socket error code for verification
- $socketError = socket_last_error($socketResource);
+ // Deliver all data
+ while ($sentBytes !== false) {
+ // And deliver it
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: Sending out ' . strlen($encodedData) . ' bytes,bufferSize=' . $bufferSize . ',diff=' . $this->diff);
+ $sentBytes = @socket_write($socketResource, $encodedData, ($bufferSize - $this->diff));
- // Get error message
- $errorMessage = socket_strerror($socketError);
+ // If there was an error, we don't continue here
+ if ($sentBytes === false) {
+ // Get socket error code for verification
+ $socketError = socket_last_error($socketResource);
- // Shutdown this socket
- $this->shutdownSocket($socketResource);
+ // Get error message
+ $errorMessage = socket_strerror($socketError);
- // And throw it
- throw new InvalidSocketException(array($this, gettype($socketResource), $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET);
- } // END - if
+ // Shutdown this socket
+ $this->shutdownSocket($socketResource);
+
+ // And throw it
+ throw new InvalidSocketException(array($this, gettype($socketResource), $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET);
+ } elseif (($sentBytes == 0) && (strlen($encodedData) > 0)) {
+ // Nothing sent means we are done
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: All sent! (' . __LINE__ . ')');
+ break;
+ }
+
+ // The difference between sent bytes and length of raw data should not be below zero
+ assert((strlen($encodedData) - $sentBytes) >= 0);
+
+ // Add total sent bytes
+ $totalSentBytes += $sentBytes;
+
+ // Cut out the last unsent bytes
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: Sent out ' . $sentBytes . ' of ' . strlen($encodedData) . ' bytes ...');
+ $encodedData = substr($encodedData, $sentBytes);
+
+ // Calculate difference again
+ $this->diff = $bufferSize - strlen($encodedData);
- // The difference between sent bytes and length of raw data should not be below zero
- assert((strlen($rawData) - $sentBytes) >= 0);
+ // Can we abort?
+ if (strlen($encodedData) <= 0) {
+ // Abort here, all sent!
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: All sent! (' . __LINE__ . ')');
+ break;
+ } // END - if
+ } // END - while
// Return sent bytes
- return $sentBytes;
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: totalSentBytes=' . $totalSentBytes);
+ return $totalSentBytes;
}
/**