// Get new instance
$dhtInstance = new NodeDhtFacade();
+ // Get a database wrapper instance
+ $wrapperInstance = DatabaseWrapperFactory::createWrapperByConfiguredName('node_dht_db_wrapper_class');
+
+ // Set it in this class
+ $dhtInstance->setWrapperInstance($wrapperInstance);
+
// Return the prepared instance
return $dhtInstance;
}
/**
- * Initializes the distributable hash table (DHT)
+ * Registers/updates an entry in the DHT with given data from $dhtData
+ * array. Different DHT implemtations may handle this differently as they
+ * may enrich the data with more meta data.
+ *
+ * @param $dhtData A valid array with DHT-related data (e.g. node/peer data)
+ * @return void
+ * @todo Does the data need to be enriched?
+ */
+ protected function insertDataIntoDht (array $dhtData) {
+ // Check if there is already an entry for given node_id
+ if ($this->getWrapperInstance()->isNodeRegisteredByNodeId($dhtData[NodeDistributedHashTableDatabaseWrapper::DB_COLUMN_NODE_ID])) {
+ /*
+ * Update existing record. Please note that this step is not secure
+ * (e.g. DHT poisoning) it would be good to implement some checks if
+ * the both node owner trust each other (see sub-project 'DSHT').
+ */
+ $this->getWrapperInstance()->updateNodeEntry($nodeData);
+ } else {
+ /*
+ * Inserts given node data into the DHT. As above, this step does
+ * currently not perform any security checks.
+ */
+ $this->getWrapperInstance()->registerNodeByData($nodeData);
+ }
+ }
+
+ /**
+ * Initializes the distributed hash table (DHT)
*
* @return void
- * @todo Please implement this method
*/
public function initDht () {
- $this->partialStub('Please implement this method.');
+ // Is the local node registered?
+ if ($this->getWrapperInstance()->isLocalNodeRegistered()) {
+ // Then only update session id
+ $this->getWrapperInstance()->updateLocalNode();
+ } else {
+ // No, so register it
+ $this->getWrapperInstance()->registerLocalNode();
+ }
+
+ // Change state
+ $this->getStateInstance()->dhtHasInitialized();
+ }
+
+ /**
+ * Bootstraps the DHT by sending out a message to all available nodes
+ * (including itself). This step helps the node to get to know more nodes
+ * which can be queried later for object distribution.
+ *
+ * @return void
+ */
+ public function bootstrapDht () {
+ // Get a helper instance
+ $helperInstance = ObjectFactory::createObjectByConfiguredName('dht_bootstrap_helper_class');
+
+ // Load the announcement descriptor
+ $helperInstance->loadDescriptorXml($this);
+
+ // Compile all variables
+ $helperInstance->getTemplateInstance()->compileConfigInVariables();
+
+ // "Publish" the descriptor by sending it to the bootstrap/list nodes
+ $helperInstance->sendPackage($this);
+
+ // Change state
+ $this->getStateInstance()->dhtHasBooted();
+ }
+
+ /**
+ * Finds a node locally by given session id
+ *
+ * @param $sessionId Session id to lookup
+ * @return $nodeData Node-data array
+ */
+ public function findNodeLocalBySessionId ($sessionId) {
+ // Default is empty data array
+ $nodeData = array();
+
+ /*
+ * Call the wrapper to do the job and get back a result instance. There
+ * will come back zero or one entry from the wrapper.
+ */
+ $resultInstance = $this->getWrapperInstance()->findNodeLocalBySessionId($sessionId);
+
+ // Is the next entry valid?
+ if ($resultInstance->next()) {
+ /*
+ * Then load the first entry (more entries are being ignored and
+ * should not happen).
+ */
+ $nodeData = $resultInstance->current();
+ } // END - if
+
+ // Return node data
+ return $nodeData;
+ }
+
+ /**
+ * Registers an other node with this node by given message data. The
+ * following data must always be present:
+ *
+ * - session-id (for finding the node's record together with below data)
+ * - external-ip (hostname or IP number)
+ * - listen-port (TCP/UDP listen port for inbound connections)
+ *
+ * @param $messageArray An array with all minimum message data
+ * @param $handlerInstance An instance of a Handleable class
+ * @param $forceUpdate Optionally force update, don't register (default: register if not found)
+ * @return void
+ * @throws NodeSessionIdVerficationException If the node was not found and update is forced
+ */
+ public function registerNodeByMessageData (array $messageData, Handleable $handlerInstance, $forceUpdate = FALSE) {
+ // Get a search criteria class
+ $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class');
+
+ // Debug message
+ /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-FACADE[' . __LINE__ . ']: state=' . $this->getPrintableState() . ',messageData=' . print_r($messageData, true) . ',handlerInstance=' . $handlerInstance->__toString() . ',forceUpdate=' . intval($forceUpdate) . ',count(getSearchData())=' . count($handlerInstance->getSearchData()));
+
+ // Search for the node's session id and external IP/hostname + TCP/UDP listen port
+ foreach ($handlerInstance->getSearchData() as $key) {
+ // Debug message
+ /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-FACADE[' . __LINE__ . ']: state=' . $this->getPrintableState() . ',key=' . $key);
+
+ // Is it there?
+ assert(isset($messageData[$key]));
+
+ // Debug message
+ /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-FACADE[' . __LINE__ . ']: state=' . $this->getPrintableState() . ',messageData[' . $key . ']=' . $messageData[$key]);
+
+ // Add criteria
+ $searchInstance->addCriteria(str_replace('my-', '', $key), $messageData[$key]);
+ } // END - foreach
+
+ // Only one entry is fine
+ $searchInstance->setLimit(1);
+
+ // Run the query
+ $resultInstance = $this->getWrapperInstance()->doSelectByCriteria($searchInstance);
+
+ // Is there already an entry?
+ if ($resultInstance->next()) {
+ // Entry found, so update it
+ $this->getWrapperInstance()->updateNodeByMessageData($messageData, $handlerInstance, $searchInstance);
+ } elseif ($forceUpdate === FALSE) {
+ // Nothing found, so register it
+ $this->getWrapperInstance()->registerNodeByMessageData($messageData, $handlerInstance);
+ } else {
+ /*
+ * Do not register non-existent nodes here. This is maybe fatal,
+ * caused by "stolen" session id and/or not matching IP
+ * number/port combination.
+ */
+ throw new NodeSessionIdVerficationException(array($this, $messageData), BaseHubSystem::EXCEPTION_NODE_SESSION_ID_NOT_VERIFYING);
+ }
+
+ // Save last exception
+ $handlerInstance->setLastException($this->getWrapperInstance()->getLastException());
+ }
+
+ /**
+ * Queries the local DHT data(base) for a node list with all supported
+ * object types except the node by given session id.
+ *
+ * @param $messageData An array with message data from a node_list request
+ * @param $handlerInstance An instance of a Handleable class
+ * @param $excludeKey Array key which should be excluded
+ * @param $andKey Array of $separator-separated list of elements which all must match
+ * @param $separator Sepator char (1st parameter for explode() call)
+ * @return $nodeList An array with all found nodes
+ */
+ public function queryLocalNodeListExceptByMessageData (array $messageData, Handleable $handlerInstance, $excludeKey, $andKey, $separator) {
+ // Make sure both keys are there
+ assert((isset($messageData[$excludeKey])) && (isset($messageData[$andKey])));
+
+ // Debug message
+ /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-FACADE[' . __LINE__ . ']: state=' . $this->getPrintableState() . ',messageData=' . print_r($messageData, true));
+
+ // Get a search criteria class
+ $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class');
+
+ // Add all keys
+ foreach (explode($separator, $messageData[$andKey]) as $criteria) {
+ // Debug message
+ /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-FACADE[' . __LINE__ . ']: andKey=' . $andKey . ',criteria=' . $criteria);
+
+ // Add it and leave any 'my-' prefix out
+ $searchInstance->addChoiceCriteria(str_replace('my-', '', $andKey), $criteria);
+ } // END - foreach
+
+ // Add exclusion key
+ $searchInstance->addExcludeCriteria(str_replace('my-', '', $excludeKey), $messageData[$excludeKey]);
+
+ // Only X entries are fine
+ $searchInstance->setLimit($this->getConfigInstance()->getConfigEntry('node_dht_list_limit'));
+
+ // Run the query
+ $resultInstance = $this->getWrapperInstance()->doSelectByCriteria($searchInstance);
+
+ // Get node list
+ $nodeList = array();
+ while ($resultInstance->next()) {
+ // Get current element (it should be an array, and have at least 1 entry)
+ $current = $resultInstance->current();
+ assert(is_array($current));
+ assert(count($current) > 0);
+
+ // Debug message
+ /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-FACADE[' . __LINE__ . ']: current(' . count($current) . ')[' . gettype($current) . ']=' . print_r($current, TRUE));
+
+ /*
+ * Remove some keys as they should not be published.
+ */
+ unset($current[$this->getWrapperInstance()->getIndexKey()]);
+
+ // Add this entry
+ array_push($nodeList, $current);
+ } // END - while
+
+ // Save last exception
+ $handlerInstance->setLastException($this->getWrapperInstance()->getLastException());
+
+ // Return node list (array)
+ return $nodeList;
+ }
+
+ /**
+ * Inserts given node list array (from earlier database result produced by
+ * an other node) into the DHT. This array origins from above method
+ * queryLocalNodeListExceptByMessageData().
+ *
+ * @param $nodeList An array from an earlier database result instance
+ * @return void
+ */
+ public function insertNodeList (array $nodeList) {
+ // If no node is in the list (array), skip the rest of this method
+ if (count($nodeList) == 0) {
+ // Debug message
+ self::createDebugInstance(__CLASS__)->debugOutput('DHT-FACADE[' . __LINE__ . ']: No node record has been returned.');
+
+ // Abort here
+ return;
+ } // END - if
+
+ // Put them all into a stack
+ foreach ($nodeList as $nodeData) {
+ // Insert all entries
+ $this->getStackerInstance()->pushNamed(self::STACKER_NAME_INSERT_NODE, $nodeData);
+ } // END - foreach
}
}