* @version 0.0.0 * @copyright Copyright (c) 2007, 2008 Roland Haeder, 2009 - 2012 Hub Developer Team * @license GNU GPL 3.0 or any newer version * @link http://www.shipsimu.org * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ class NodeDistributedHashTableDatabaseWrapper extends BaseDatabaseWrapper implements NodeDhtWrapper, Registerable { /** * "Cached" results for dabase for looking for unpublished entries */ private $unpublishedEntriesInstance = NULL; // Constants for database table names const DB_TABLE_NODE_DHT = 'node_dht'; // Constants for database column names const DB_COLUMN_NODE_ID = 'node_id'; const DB_COLUMN_SESSION_ID = 'session_id'; const DB_COLUMN_EXTERNAL_IP = 'external_ip'; const DB_COLUMN_LISTEN_PORT = 'listen_port'; const DB_COLUMN_PRIVATE_KEY_HASH = 'private_key_hash'; const DB_COLUMN_NODE_MODE = 'node_mode'; const DB_COLUMN_ACCEPTED_OBJECTS = 'accepted_object_types'; const DB_COLUMN_NODE_LIST = 'node_list'; const DB_COLUMN_PUBLICATION_STATUS = 'publication_status'; const DB_COLUMN_ANSWER_STATUS = 'answer_status'; const DB_COLUMN_ACCEPT_BOOTSTRAP = 'accept_bootstrap'; // Publication status' const PUBLICATION_STATUS_PENDING = 'PENDING'; // Exception codes const EXCEPTION_NODE_ALREADY_REGISTERED = 0x800; const EXCEPTION_NODE_NOT_REGISTERED = 0x801; /** * Protected constructor * * @return void */ protected function __construct () { // Call parent constructor parent::__construct(__CLASS__); } /** * Creates an instance of this database wrapper by a provided user class * * @return $wrapperInstance An instance of the created wrapper class */ public static final function createNodeDistributedHashTableDatabaseWrapper () { // Get a new instance $wrapperInstance = new NodeDistributedHashTableDatabaseWrapper(); // Set (primary!) table name $wrapperInstance->setTableName(self::DB_TABLE_NODE_DHT); // Return the instance return $wrapperInstance; } /** * Static getter for an array of all DHT database entries * * @return $elements All elements for the DHT dabase */ public static final function getAllElements () { // Create array and ... $elements = array( self::DB_COLUMN_NODE_ID, self::DB_COLUMN_SESSION_ID, self::DB_COLUMN_EXTERNAL_IP, self::DB_COLUMN_LISTEN_PORT, self::DB_COLUMN_PRIVATE_KEY_HASH, self::DB_COLUMN_NODE_MODE, self::DB_COLUMN_ACCEPTED_OBJECTS, self::DB_COLUMN_NODE_LIST ); // ... return it return $elements; } /** * Prepares a search instance for given node data * * @param $nodeData An array with valid node data * @return $searchInstance An instance of a SearchCriteria class */ private function prepareSearchInstance (array $nodeData) { // Assert on array elements assert(isset($nodeData[self::DB_COLUMN_NODE_ID])); // Get instance $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class'); // Search for node id and limit it to one entry $searchInstance->addCriteria(self::DB_COLUMN_NODE_ID, $nodeData[self::DB_COLUMN_NODE_ID]); $searchInstance->setLimit(1); // Return it return $searchInstance; } /** * Getter for result instance for unpublished entries * * @return $unpublishedEntriesInstance Result instance */ public final function getUnpublishedEntriesInstance () { return $this->unpublishedEntriesInstance; } /** * Prepares a "local" instance of a StoreableCriteria class with all node * data for insert/update queries. This data set contains data from *this* * (local) node. * * @return $dataSetInstance An instance of a StoreableCriteria class */ private function prepareLocalDataSetInstance () { // Get node/request instances $nodeInstance = NodeObjectFactory::createNodeInstance(); $requestInstance = ApplicationHelper::getSelfInstance()->getRequestInstance(); // Get a dataset instance $dataSetInstance = ObjectFactory::createObjectByConfiguredName('dataset_criteria_class', array(self::DB_TABLE_NODE_DHT)); // Set the primary key $dataSetInstance->setUniqueKey(self::DB_COLUMN_NODE_ID); // Get ip:port combination and "explode" it $ipPort = $nodeInstance->getAddressPortArray(); // Make sure both is valid assert(($ipPort[0] !== 'invalid') && ($ipPort[1] !== 'invalid')); // Get an array of all accepted object types $objectList = $nodeInstance->getListFromAcceptedObjectTypes(); // Make sure this is an array assert(is_array($objectList)); // Add public node data $dataSetInstance->addCriteria(self::DB_COLUMN_NODE_MODE , $requestInstance->getRequestElement('mode')); $dataSetInstance->addCriteria(self::DB_COLUMN_EXTERNAL_IP , $ipPort[0]); $dataSetInstance->addCriteria(self::DB_COLUMN_LISTEN_PORT , $ipPort[1]); $dataSetInstance->addCriteria(self::DB_COLUMN_NODE_ID , $nodeInstance->getNodeId()); $dataSetInstance->addCriteria(self::DB_COLUMN_SESSION_ID , $nodeInstance->getSessionId()); $dataSetInstance->addCriteria(self::DB_COLUMN_PRIVATE_KEY_HASH, $nodeInstance->getPrivateKeyHash()); $dataSetInstance->addCriteria(self::DB_COLUMN_ACCEPTED_OBJECTS, implode(BaseHubNode::OBJECT_LIST_SEPARATOR, $objectList)); $dataSetInstance->addCriteria(self::DB_COLUMN_ACCEPT_BOOTSTRAP, $this->translateBooleanToYesNo($nodeInstance->isAcceptingDhtBootstrap())); // Return it return $dataSetInstance; } /** * Checks whether the local (*this*) node is registered in the DHT by * checking if the external ip/port is found. * * @return $isRegistered Whether *this* node is registered in the DHT */ public function isLocalNodeRegistered () { /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: CALLED!'); // Get a search criteria instance $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class'); // Get node instance $nodeInstance = NodeObjectFactory::createNodeInstance(); // Get ip:port combination and "explode" it $ipPort = $nodeInstance->getAddressPortArray(); /* * Make sure both is not 'invalid' which means that the resolver * didn't work. */ assert(($ipPort[0] !== 'invalid') && ($ipPort[1] !== 'invalid')); // Add ip:port/node id as criteria $searchInstance->addCriteria(self::DB_COLUMN_EXTERNAL_IP, $ipPort[0]); $searchInstance->addCriteria(self::DB_COLUMN_LISTEN_PORT, $ipPort[1]); $searchInstance->addCriteria(self::DB_COLUMN_NODE_ID , $nodeInstance->getNodeId()); $searchInstance->addCriteria(self::DB_COLUMN_SESSION_ID , $nodeInstance->getSessionId()); $searchInstance->setLimit(1); // Query database and get a result instance back $resultInstance = $this->doSelectByCriteria($searchInstance); // Cache result of if there is an entry, valid() will tell us if an entry is there $isRegistered = $resultInstance->valid(); /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: isRegistered=' . intval($isRegistered) . ' - EXIT!'); // Return result return $isRegistered; } /** * Registeres the local (*this*) node with its data in the DHT. * * @return void */ public function registerLocalNode () { /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: CALLED!'); // Assert to make sure this method is called with no record in DB (the actual backend of the DHT) assert(!$this->isLocalNodeRegistered()); // Get prepared data set instance $dataSetInstance = $this->prepareLocalDataSetInstance(); // "Insert" this dataset instance completely into the database $this->queryInsertDataSet($dataSetInstance); /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: EXIT!'); } /** * Updates local (*this*) node's data in DHT, this is but not limited to the * session id, ip number (and/or hostname) and port number. * * @return void */ public function updateLocalNode () { /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: CALLED!'); // Assert to make sure this method is called with one record in DB (the actual backend of the DHT) assert($this->isLocalNodeRegistered()); // Get node instance $nodeInstance = NodeObjectFactory::createNodeInstance(); // Get search criteria $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class'); // Search for node id and limit it to one entry $searchInstance->addCriteria(self::DB_COLUMN_NODE_ID, $nodeInstance->getNodeId()); $searchInstance->setLimit(1); // Get a prepared dataset instance $dataSetInstance = $this->prepareLocalDataSetInstance(); // Set search instance $dataSetInstance->setSearchInstance($searchInstance); // Update DHT database record $this->queryUpdateDataSet($dataSetInstance); /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: EXIT!'); } /** * Finds a node locally by given session id * * @param $sessionId Session id to lookup * @return $nodeData Node data array */ public function findNodeLocalBySessionId ($sessionId) { /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: sessionId=' . $sessionId . ' - CALLED!'); // Get search criteria $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class'); // Search for session id and limit it to one entry $searchInstance->addCriteria(self::DB_COLUMN_SESSION_ID, $sessionId); $searchInstance->setLimit(1); // Query database and get a result instance back $resultInstance = $this->doSelectByCriteria($searchInstance); // Return result instance /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: resultInstance->valid()=' . intval($resultInstance->valid()) . ' - EXIT!'); return $resultInstance; } /** * Registeres a node by given message data. * * @param $messageData An array of all message data * @param $handlerInstance An instance of a HandleableDataSet class * @return void */ public function registerNodeByMessageData (array $messageData, HandleableDataSet $handlerInstance) { /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: handlerInstance=' . $handlerInstance->__toString() . ' - CALLED!'); // Get a data set instance $dataSetInstance = ObjectFactory::createObjectByConfiguredName('dataset_criteria_class', array(self::DB_TABLE_NODE_DHT)); // Set primary key (session id) $dataSetInstance->setUniqueKey(self::DB_COLUMN_SESSION_ID); // Add all array elements $handlerInstance->addArrayToDataSet($dataSetInstance, $messageData); // Remove 'node_list' $dataSetInstance->unsetCriteria(self::DB_COLUMN_NODE_LIST); // Run the "INSERT" query $this->queryInsertDataSet($dataSetInstance); /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . '] - EXIT!'); } /** * Updates an existing entry in node list * * @param $messageData An array of all message data * @param $handlerInstance An instance of a HandleableDataSet class * @param $searchInstance An instance of LocalSearchCriteria class * @return void */ public function updateNodeByMessageData (array $messageData, HandleableDataSet $handlerInstance, LocalSearchCriteria $searchInstance) { /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: CALLED!'); // Get a data set instance $dataSetInstance = ObjectFactory::createObjectByConfiguredName('dataset_criteria_class', array(self::DB_TABLE_NODE_DHT)); // Add search instance $dataSetInstance->setSearchInstance($searchInstance); // Set primary key (session id) $dataSetInstance->setUniqueKey(self::DB_COLUMN_SESSION_ID); // Add all array elements $handlerInstance->addArrayToDataSet($dataSetInstance, $messageData); // Remove 'node_list' $dataSetInstance->unsetCriteria(self::DB_COLUMN_NODE_LIST); // Run the "UPDATE" query $this->queryUpdateDataSet($dataSetInstance); /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: EXIT!'); } /** * Determines whether the given node data is already inserted in the DHT * * @param $nodeData An array with valid node data * @return $isRegistered Whether the given node data is already inserted */ public function isNodeRegistered (array $nodeData) { /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: CALLED!'); // Assert on array elements assert(isset($nodeData[self::DB_COLUMN_NODE_ID])); /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: node-id=' . $nodeData[self::DB_COLUMN_NODE_ID]); // Get search criteria $searchInstance = $this->prepareSearchInstance($nodeData); // Query database and get a result instance back $resultInstance = $this->doSelectByCriteria( // Search instance $searchInstance, // Only look for these array elements ("keys") array( self::DB_COLUMN_NODE_ID => TRUE, self::DB_COLUMN_EXTERNAL_IP => TRUE, self::DB_COLUMN_LISTEN_PORT => TRUE, ) ); // Check if there is an entry $isRegistered = $resultInstance->valid(); /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: isRegistered=' . intval($isRegistered) . ' - EXIT!'); // Return registration status return $isRegistered; } /** * Registers a node with given data in the DHT. If the node is already * registered this method shall throw an exception. * * @param $nodeData An array with valid node data * @return void * @throws NodeAlreadyRegisteredException If the node is already registered */ public function registerNode (array $nodeData) { // Assert on array elements assert(isset($nodeData[self::DB_COLUMN_NODE_ID])); // Is the node registered? if ($this->isNodeRegistered($nodeData)) { // Throw an exception throw new NodeAlreadyRegisteredException(array($this, $nodeData), self::EXCEPTION_NODE_ALREADY_REGISTERED); } // END - if // @TODO Unimplemented part $this->partialStub('nodeData=' . print_r($nodeData, TRUE)); } /** * Updates a node's entry in the DHT with given data. This will enrich or * just update already exsiting data. If the node is not found this method * shall throw an exception. * * @param $nodeData An array with valid node data * @return void * @throws NodeDataMissingException If the node's data is missing */ public function updateNode (array $nodeData) { // Assert on array elements assert(isset($nodeData[self::DB_COLUMN_NODE_ID])); // Debug message /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: Updating DHT entry for node-id=' . $nodeData[self::DB_COLUMN_NODE_ID] . ' ...'); // Is the node registered? if (!$this->isNodeRegistered($nodeData)) { // No, then throw an exception throw new NodeDataMissingException(array($this, $nodeData), self::EXCEPTION_NODE_NOT_REGISTERED); } // END - if // Get a search instance $searchInstance = $this->prepareSearchInstance($nodeData); // Get a data set instance $dataSetInstance = ObjectFactory::createObjectByConfiguredName('dataset_criteria_class', array(self::DB_TABLE_NODE_DHT)); // Add search instance $dataSetInstance->setSearchInstance($searchInstance); // Set primary key (session id) $dataSetInstance->setUniqueKey(self::DB_COLUMN_SESSION_ID); // Get node instance $nodeInstance = NodeObjectFactory::createNodeInstance(); // Debug message /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: nodeData=' . print_r($nodeData, TRUE)); // Add all array elements $nodeInstance->addArrayToDataSet($dataSetInstance, $nodeData); // Remove 'node_list' $dataSetInstance->unsetCriteria(self::DB_COLUMN_NODE_LIST); // Run the "UPDATE" query $this->queryUpdateDataSet($dataSetInstance); } /** * Checks whether there are unpublished entries * * @return $hasUnpublished Whether there are unpublished entries * @todo Add minimum/maximum age limitations */ public function hasUnpublishedEntries () { // Get search instance $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class'); // Add exclusion key which is the publish status $searchInstance->addExcludeCriteria(NodeDistributedHashTableDatabaseWrapper::DB_COLUMN_PUBLICATION_STATUS, NodeDistributedHashTableDatabaseWrapper::PUBLICATION_STATUS_PENDING); // Remember search instance $this->setSearchInstance($searchInstance); // Run the query $this->unpublishedEntriesInstance = $this->doSelectByCriteria($searchInstance); // Check pending entries $hasUnpublished = $this->unpublishedEntriesInstance->valid(); // Return it return $hasUnpublished; } /** * Initializes publication of DHT entries. This does only prepare * publication. The next step is to pickup such prepared entries and publish * them by uploading to other (recently appeared) DHT members. * * @return void * @todo Add timestamp to dataset instance */ public function initEntryPublication () { /* * Make sure that hasUnpublishedEntries() has been called first by * asserting on the "cached" object instance. This "caching" saves some * needless queries as this method shall be called immediately after * hasUnpublishedEntries() returns TRUE. */ assert($this->unpublishedEntriesInstance instanceof SearchableResult); // Result is still okay? assert($this->unpublishedEntriesInstance->valid()); // Remove 'publication_status' $this->getSearchInstance()->unsetCriteria(self::DB_COLUMN_PUBLICATION_STATUS); // Make sure all entries are marked as pending, first get a dataset instance. $dataSetInstance = ObjectFactory::createObjectByConfiguredName('dataset_criteria_class', array(self::DB_TABLE_NODE_DHT)); // Add search instance $dataSetInstance->setSearchInstance($this->getSearchInstance()); // Set primary key (node id) $dataSetInstance->setUniqueKey(self::DB_COLUMN_NODE_ID); // Add criteria (that should be set) $dataSetInstance->addCriteria(self::DB_COLUMN_PUBLICATION_STATUS, self::PUBLICATION_STATUS_PENDING); // Run the "UPDATE" query $this->queryUpdateDataSet($dataSetInstance); } /** * Removes non-public data from given array. * * @param $data An array with possible non-public data that needs to be removed. * @return $data A cleaned up array with only public data. */ public function removeNonPublicDataFromArray(array $data) { // Currently call only inner method //* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: Calling parent::removeNonPublicDataFromArray(data) ...'); $data = parent::removeNonPublicDataFromArray($data); //* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: data[]=' . gettype($data)); // Return cleaned data return $data; } /** * Find recipients for given package data and exclude the sender * * @param $packageData An array of valid package data * @return $recipients An indexed array with DHT recipients */ public function getResultFromExcludedSender (array $packageData) { /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: CALLED!'); // Assert on required array field assert(isset($packageData[NetworkPackage::PACKAGE_DATA_SENDER])); // Get max recipients $maxRecipients = $this->getConfigInstance()->getConfigEntry('max_dht_recipients'); // First creata a search instance $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class'); // Then exclude 'sender' field as the sender is the current (*this*) node $searchInstance->addExcludeCriteria(NodeDistributedHashTableDatabaseWrapper::DB_COLUMN_SESSION_ID, $packageData[NetworkPackage::PACKAGE_DATA_SENDER]); // Set limit to maximum DHT recipients $searchInstance->setLimit($maxRecipients); // Get a result instance back from DHT database wrapper. $resultInstance = $this->doSelectByCriteria($searchInstance); /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: EXIT!'); // Return result instance return $resultInstance; } /** * Find recopients by given key/value pair. First look for the key and if it * matches, compare the value. * * @param $key Key to look for * @param $value Value to compare if key matches * @return $recipients An indexed array with DHT recipients */ public function getResultFromKeyValue ($key, $value) { // Get max recipients $maxRecipients = $this->getConfigInstance()->getConfigEntry('max_dht_recipients'); // First creata a search instance $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class'); // Find the key/value pair $searchInstance->addCriteria($key, $value); // Get a result instance back from DHT database wrapper. $resultInstance = $this->doSelectByCriteria($searchInstance); // Return result instance return $resultInstance; } /** * Enable DHT bootstrap request acceptance for local node * * @return void */ public function enableAcceptDhtBootstrap () { // Debug message /* DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . __METHOD__ . ':' . __LINE__ . ']: Enabling DHT bootstrap requests ...'); // Is the node already registered? if ($this->isLocalNodeRegistered()) { // Just update our record $this->updateLocalNode(); } else { // Register it $this->registerLocalNode(); } } } // [EOF] ?>