* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
class BaseConnectionHelper extends BaseHubHelper implements Registerable, ProtocolHandler {
+ // Exception codes
+ const EXCEPTION_UNSUPPORTED_ERROR_HANDLER = 0x900;
+
/**
* Protocol used
*/
private $sentData = 0;
/**
- * Offset
- */
- private $offset = 0;
-
- /**
- * Connect retries for this connection
+ * Difference
*/
- private $retryCount = 0;
+ private $diff = 0;
/**
* Wether this connection is shutted down
private $shuttedDown = false;
/**
- * Currently sent chunks
+ * Currently queued chunks
*/
- private $sentChunks = array();
+ private $queuedChunks = array();
/**
* Current final hash
// Call parent constructor
parent::__construct($className);
+ // Initialize output stream
+ $streamInstance = ObjectFactory::createObjectByConfiguredName('node_raw_data_output_stream_class');
+
+ // And add it to this connection helper
+ $this->setOutputStreamInstance($streamInstance);
+
+ // Init state which sets the state to 'init'
+ $this->initState();
+
// Register this connection helper
Registry::getRegistry()->addInstance('connection', $this);
}
+ /**
+ * Getter for real class name, overwrites generic method and is final
+ *
+ * @return $class Name of this class
+ */
+ public final function __toString () {
+ // Class name representation
+ $class = self::getConnectionClassName($this->getAddress(), $this->getPort(), parent::__toString());
+
+ // Return it
+ return $class;
+ }
+
+ /**
+ * Static "getter" for this connection class' name
+ *
+ * @param $address IP address
+ * @param $port Port number
+ * @param $className Original class name
+ * @return $class Expanded class name
+ */
+ public static function getConnectionClassName ($address, $port, $className) {
+ // Construct it
+ $class = $address . ':' . $port . ':' . $className;
+
+ // ... and return it
+ return $class;
+ }
+
+ /**
+ * Initializes the peer's state which sets it to 'init'
+ *
+ * @return void
+ */
+ private function initState() {
+ /*
+ * Get the state factory and create the initial state, we don't need
+ * the state instance here
+ */
+ PeerStateFactory::createPeerStateInstanceByName('init', $this);
+ }
+
+ /**
+ * "Getter" for raw data from a package array. A fragmenter is used which
+ * will returns us only so many raw data which fits into the back buffer.
+ * The rest is being held in a back-buffer and waits there for the next
+ * cycle and while be then sent.
+ *
+ * This method does 4 simple steps:
+ * 1) Aquire fragmenter object instance from the factory
+ * 2) Handle over the package data array to the fragmenter
+ * 3) Request a chunk
+ * 4) Finally return the chunk (array) to the caller
+ *
+ * @param $packageData Raw package data array
+ * @return $chunkData Raw data chunk
+ */
+ private function getRawDataFromPackageArray (array $packageData) {
+ // Get the fragmenter instance
+ $fragmenterInstance = FragmenterFactory::createFragmenterInstance('package');
+
+ // Implode the package data array and fragement the resulting string, returns the final hash
+ $finalHash = $fragmenterInstance->fragmentPackageArray($packageData, $this);
+ if ($finalHash !== true) {
+ $this->currentFinalHash = $finalHash;
+ } // END - if
+
+ // Debug message
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: currentFinalHash=' . $this->currentFinalHash);
+
+ // Get the next raw data chunk from the fragmenter
+ $rawDataChunk = $fragmenterInstance->getNextRawDataChunk($this->currentFinalHash);
+
+ // Get chunk hashes and chunk data
+ $chunkHashes = array_keys($rawDataChunk);
+ $chunkData = array_values($rawDataChunk);
+
+ // Is the required data there?
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: chunkHashes[]=' . count($chunkHashes) . ',chunkData[]=' . count($chunkData));
+ if ((isset($chunkHashes[0])) && (isset($chunkData[0]))) {
+ // Remember this chunk as queued
+ $this->queuedChunks[$chunkHashes[0]] = $chunkData[0];
+
+ // Return the raw data
+ return $chunkData[0];
+ } else {
+ // Return zero string
+ return '';
+ }
+ }
+
/**
* Getter for port number to satify ProtocolHandler
*
$visitorInstance->visitConnectionHelper($this);
}
- /**
- * "Getter" for raw data from a package array. A fragmenter is used which
- * will returns us only so many raw data which fits into the back buffer.
- * The rest is being held in a back-buffer and waits there for the next
- * cycle and while be then sent.
- *
- * This method does 4 simple steps:
- * 1) Aquire fragmenter object instance from the factory
- * 2) Handle over the package data array to the fragmenter
- * 3) Request a chunk
- * 4) Finally return the chunk (array) to the caller
- *
- * @param $packageData Raw package data array
- * @return $rawDataChunk An array with the raw data as value and chunk hash as key
- */
- private function getRawDataFromPackageArray (array $packageData) {
- // If there is no fragmenter?
- if (!Registry::getRegistry()->instanceExists('package_fragmenter')) {
- // Get the fragmenter instance
- $fragmenterInstance = ObjectFactory::createObjectByConfiguredName('package_fragmenter_class');
-
- // Add it to the registry
- Registry::getRegistry()->addInstance('package_fragmenter', $fragmenterInstance);
- } else {
- // Get fragmenter from registry
- $fragmenterInstance = Registry::getRegistry()->getInstance('package_fragmenter');
- }
-
- // Implode the package data array and fragement the resulting string, returns the final hash
- $this->currentFinalHash = $fragmenterInstance->fragmentPackageArray($packageData, $this);
-
- // Get the next raw data chunk from the fragmenter
- $rawDataChunk = $fragmenterInstance->getNextRawDataChunk($this->currentFinalHash);
-
- // Return it
- return $rawDataChunk;
- }
-
/**
* Sends raw package data to the recipient
*
- * @param $packageData Raw package data
- * @return $sentBytes Actual sent bytes to the peer
+ * @param $packageData Raw package data
+ * @return $totalSentBytes Total sent bytes to the peer
* @throws InvalidSocketException If we got a problem with this socket
*/
public function sendRawPackageData (array $packageData) {
- // Convert the package data array to a raw data stream
- $rawDataChunk = $this->getRawDataFromPackageArray($packageData);
+ // The helper's state must be 'connected'
+ $this->getStateInstance()->validatePeerStateConnected();
- // Get socket resource
- $socketResource = $this->getSocketResource();
-
- // And deliver it
- $sentBytes = @socket_write($socketResource, $rawData, $this->getConfigInstance()->getConfigEntry($this->getProtocol() . '_buffer_length') - $this->offset);
-
- // If there was an error, we don't continue here
- if ($sentBytes === false) {
- // Get socket error code for verification
- $socketError = socket_last_error($socketResource);
-
- // Get error message
- $errorMessage = socket_strerror($socketError);
-
- // Shutdown this socket
- $this->shutdownSocket($socketResource);
-
- // And throw it
- throw new InvalidSocketException(array($this, gettype($socketResource), $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET);
- } // END - if
+ // Cache buffer length
+ $bufferSize = $this->getConfigInstance()->getConfigEntry($this->getProtocol() . '_buffer_length');
- // The difference between sent bytes and length of raw data should not be below zero
- assert((strlen($rawData) - $sentBytes) >= 0);
+ // Init variables
+ $rawData = '';
+ $dataStream = ' ';
+ $totalSentBytes = 0;
- // Return sent bytes
- return $sentBytes;
- }
+ // Fill sending buffer with data
+ while ((strlen($rawData) < $bufferSize) && (strlen($dataStream) > 0)) {
+ // Convert the package data array to a raw data stream
+ $dataStream = $this->getRawDataFromPackageArray($packageData);
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: Adding ' . strlen($dataStream) . ' bytes to the sending buffer ...');
+ $rawData .= $dataStream;
+ } // END - while
- /**
- * Getter for real class name
- *
- * @return $class Name of this class
- */
- public function __toString () {
- // Class name representation
- $class = $this->getAddress() . ':' . $this->getPort() . ':' . parent::__toString();
+ // Nothing to sent is bad news, so assert on it
+ assert(strlen($rawData) > 0);
- // Return it
- return $class;
- }
+ // Encode the raw data with our output-stream
+ $encodedData = $this->getOutputStreamInstance()->streamData($rawData);
- /**
- * Checks wether the connect retry is exhausted
- *
- * @return $isExhaused Wether connect retry is exchausted
- */
- public final function isConnectRetryExhausted () {
- // Construct config entry
- $configEntry = $this->getProtocol() . '_connect_retry_max';
+ // Calculate difference
+ $this->diff = $bufferSize - strlen($encodedData);
- // Check it out
- $isExhausted = ($this->retryCount >= $this->getConfigInstance()->getConfigEntry($configEntry));
+ // Get socket resource
+ $socketResource = $this->getSocketResource();
- // Return it
- return $isExhausted;
- }
+ // Init sent bytes
+ $sentBytes = 0;
+
+ // Deliver all data
+ while ($sentBytes !== false) {
+ // And deliver it
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: Sending out ' . strlen($encodedData) . ' bytes,bufferSize=' . $bufferSize . ',diff=' . $this->diff);
+ $sentBytes = @socket_write($socketResource, $encodedData, ($bufferSize - $this->diff));
+
+ // If there was an error, we don't continue here
+ if ($sentBytes === false) {
+ // Handle the error with a faked recipientData array
+ $this->handleSocketError($socketResource, array('0.0.0.0', '0'));
+
+ // And throw it
+ throw new InvalidSocketException(array($this, $socketResource, $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET);
+ } elseif (($sentBytes == 0) && (strlen($encodedData) > 0)) {
+ // Nothing sent means we are done
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: All sent! (' . __LINE__ . ')');
+ break;
+ }
+
+ // The difference between sent bytes and length of raw data should not go below zero
+ assert((strlen($encodedData) - $sentBytes) >= 0);
+
+ // Add total sent bytes
+ $totalSentBytes += $sentBytes;
+
+ // Cut out the last unsent bytes
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: Sent out ' . $sentBytes . ' of ' . strlen($encodedData) . ' bytes ...');
+ $encodedData = substr($encodedData, $sentBytes);
+
+ // Calculate difference again
+ $this->diff = $bufferSize - strlen($encodedData);
+
+ // Can we abort?
+ if (strlen($encodedData) <= 0) {
+ // Abort here, all sent!
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: All sent! (' . __LINE__ . ')');
+ break;
+ } // END - if
+ } // END - while
- /**
- * Increases the connect retry count
- *
- * @return void
- */
- public final function increaseConnectRetry () {
- $this->retryCount++;
+ // Return sent bytes
+ //* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: totalSentBytes=' . $totalSentBytes);
+ return $totalSentBytes;
}
/**
*
* @return void
*/
- protected final function markConnectionShutdown () {
+ protected final function markConnectionShuttedDown () {
+ /* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: ' . $this->__toString() . ' has been marked as shutted down');
$this->shuttedDown = true;
}
* @return $shuttedDown Wether this connection is shutted down
*/
public final function isShuttedDown () {
+ /* NOISY-DEBUG: */ $this->debugOutput('CONNECTION: ' . $this->__toString() . ',shuttedDown=' . intval($this->shuttedDown));
return $this->shuttedDown;
}
+
+ // ************************************************************************
+ // Socket error handler call-back methods
+ // ************************************************************************
+
+ /**
+ * Handles socket error 'connection timed out', but does not clear it for
+ * later debugging purposes.
+ *
+ * @param $socketResource A valid socket resource
+ * @throws SocketConnectionException The connection attempts fails with a time-out
+ */
+ private function socketErrorConnectionTimedOutHandler ($socketResource) {
+ // Get socket error code for verification
+ $socketError = socket_last_error($socketResource);
+
+ // Get error message
+ $errorMessage = socket_strerror($socketError);
+
+ // Shutdown this socket
+ $this->shutdownSocket($socketResource);
+
+ // Throw it again
+ throw new SocketConnectionException(array($this, $socketResource, $socketError, $errorMessage), BaseListener::EXCEPTION_INVALID_SOCKET);
+ }
}
// [EOF]