]> git.mxchange.org Git - hub.git/blobdiff - application/hub/main/dht/node/class_NodeDhtFacade.php
DHT data registration/update added to NodeDhtFacade, now every DHT facade must implem...
[hub.git] / application / hub / main / dht / node / class_NodeDhtFacade.php
index 5007c47f0ef041cf485e5da4db07ce2fecf58147..23e3abfe13596c28d52146fd1e08a15d93c9bd9a 100644 (file)
@@ -41,18 +41,266 @@ class NodeDhtFacade extends BaseDht implements Distributable, Registerable {
                // Get new instance
                $dhtInstance = new NodeDhtFacade();
 
+               // Get a database wrapper instance
+               $wrapperInstance = DatabaseWrapperFactory::createWrapperByConfiguredName('node_dht_db_wrapper_class');
+
+               // Set it in this class
+               $dhtInstance->setWrapperInstance($wrapperInstance);
+
                // Return the prepared instance
                return $dhtInstance;
        }
 
        /**
-        * Initializes the distributable hash table (DHT)
+        * Registers/updates an entry in the DHT with given data from $dhtData
+        * array. Different DHT implemtations may handle this differently as they
+        * may enrich the data with more meta data.
+        *
+        * @param       $dhtData        A valid array with DHT-related data (e.g. node/peer data)
+        * @return      void
+        * @todo        Does the data need to be enriched?
+        */
+       protected function insertDataIntoDht (array $dhtData) {
+               // Check if there is already an entry for given node_id
+               if ($this->getWrapperInstance()->isNodeRegisteredByNodeId($dhtData[NodeDistributedHashTableDatabaseWrapper::DB_COLUMN_NODE_ID])) {
+                       /*
+                        * Update existing record. Please note that this step is not secure
+                        * (e.g. DHT poisoning) it would be good to implement some checks if
+                        * the both node owner trust each other (see sub-project 'DSHT').
+                        */
+                       $this->getWrapperInstance()->updateNodeEntry($nodeData);
+               } else {
+                       /*
+                        * Inserts given node data into the DHT. As above, this step does
+                        * currently not perform any security checks.
+                        */
+                       $this->getWrapperInstance()->registerNodeByData($nodeData);
+               }
+       }
+
+       /**
+        * Initializes the distributed hash table (DHT)
         *
         * @return      void
-        * @todo        Please implement this method
         */
        public function initDht () {
-               $this->partialStub('Please implement this method.');
+               // Is the local node registered?
+               if ($this->getWrapperInstance()->isLocalNodeRegistered()) {
+                       // Then only update session id
+                       $this->getWrapperInstance()->updateLocalNode();
+               } else {
+                       // No, so register it
+                       $this->getWrapperInstance()->registerLocalNode();
+               }
+
+               // Change state
+               $this->getStateInstance()->dhtHasInitialized();
+       }
+
+       /**
+        * Bootstraps the DHT by sending out a message to all available nodes
+        * (including itself). This step helps the node to get to know more nodes
+        * which can be queried later for object distribution.
+        *
+        * @return      void
+        */
+       public function bootstrapDht () {
+               // Get a helper instance
+               $helperInstance = ObjectFactory::createObjectByConfiguredName('dht_bootstrap_helper_class');
+
+               // Load the announcement descriptor
+               $helperInstance->loadDescriptorXml($this);
+
+               // Compile all variables
+               $helperInstance->getTemplateInstance()->compileConfigInVariables();
+
+               // "Publish" the descriptor by sending it to the bootstrap/list nodes
+               $helperInstance->sendPackage($this);
+
+               // Change state
+               $this->getStateInstance()->dhtHasBooted();
+       }
+
+       /**
+        * Finds a node locally by given session id
+        *
+        * @param       $sessionId      Session id to lookup
+        * @return      $nodeData       Node-data array
+        */
+       public function findNodeLocalBySessionId ($sessionId) {
+               // Default is empty data array
+               $nodeData = array();
+
+               /*
+                * Call the wrapper to do the job and get back a result instance. There
+                * will come back zero or one entry from the wrapper.
+                */
+               $resultInstance = $this->getWrapperInstance()->findNodeLocalBySessionId($sessionId);
+
+               // Is the next entry valid?
+               if ($resultInstance->next()) {
+                       /*
+                        * Then load the first entry (more entries are being ignored and
+                        * should not happen).
+                        */
+                       $nodeData = $resultInstance->current();
+               } // END - if
+
+               // Return node data
+               return $nodeData;
+       }
+
+       /**
+        * Registers an other node with this node by given message data. The
+        * following data must always be present:
+        *
+        * - session-id  (for finding the node's record together with below data)
+        * - external-ip (hostname or IP number)
+        * - listen-port (TCP/UDP listen port for inbound connections)
+        *
+        * @param       $messageArray           An array with all minimum message data
+        * @param       $handlerInstance        An instance of a Handleable class
+        * @param       $forceUpdate            Optionally force update, don't register (default: register if not found)
+        * @return      void
+        * @throws      NodeSessionIdVerficationException       If the node was not found and update is forced
+        */
+       public function registerNodeByMessageData (array $messageData, Handleable $handlerInstance, $forceUpdate = FALSE) {
+               // Get a search criteria class
+               $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class');
+
+               // Debug message
+               /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-FACADE[' . __LINE__ . ']: state=' . $this->getPrintableState() . ',messageData=' . print_r($messageData, true) . ',handlerInstance=' . $handlerInstance->__toString() . ',forceUpdate=' . intval($forceUpdate) . ',count(getSearchData())=' . count($handlerInstance->getSearchData()));
+
+               // Search for the node's session id and external IP/hostname + TCP/UDP listen port
+               foreach ($handlerInstance->getSearchData() as $key) {
+                       // Debug message
+                       /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-FACADE[' . __LINE__ . ']: state=' . $this->getPrintableState() . ',key=' . $key);
+
+                       // Is it there?
+                       assert(isset($messageData[$key]));
+
+                       // Debug message
+                       /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-FACADE[' . __LINE__ . ']: state=' . $this->getPrintableState() . ',messageData[' . $key . ']=' . $messageData[$key]);
+
+                       // Add criteria
+                       $searchInstance->addCriteria(str_replace('my-', '', $key), $messageData[$key]);
+               } // END - foreach
+
+               // Only one entry is fine
+               $searchInstance->setLimit(1);
+
+               // Run the query
+               $resultInstance = $this->getWrapperInstance()->doSelectByCriteria($searchInstance);
+
+               // Is there already an entry?
+               if ($resultInstance->next()) {
+                       // Entry found, so update it
+                       $this->getWrapperInstance()->updateNodeByMessageData($messageData, $handlerInstance, $searchInstance);
+               } elseif ($forceUpdate === FALSE) {
+                       // Nothing found, so register it
+                       $this->getWrapperInstance()->registerNodeByMessageData($messageData, $handlerInstance);
+               } else {
+                       /*
+                        * Do not register non-existent nodes here. This is maybe fatal,
+                        * caused by "stolen" session id and/or not matching IP
+                        * number/port combination.
+                        */
+                       throw new NodeSessionIdVerficationException(array($this, $messageData), BaseHubSystem::EXCEPTION_NODE_SESSION_ID_NOT_VERIFYING);
+               }
+
+               // Save last exception
+               $handlerInstance->setLastException($this->getWrapperInstance()->getLastException());
+       }
+
+       /**
+        * Queries the local DHT data(base) for a node list with all supported
+        * object types except the node by given session id.
+        *
+        * @param       $messageData            An array with message data from a node_list request
+        * @param       $handlerInstance        An instance of a Handleable class
+        * @param       $excludeKey                     Array key which should be excluded
+        * @param       $andKey                         Array of $separator-separated list of elements which all must match
+        * @param       $separator                      Sepator char (1st parameter for explode() call)
+        * @return      $nodeList                       An array with all found nodes
+        */
+       public function queryLocalNodeListExceptByMessageData (array $messageData, Handleable $handlerInstance, $excludeKey, $andKey, $separator) {
+               // Make sure both keys are there
+               assert((isset($messageData[$excludeKey])) && (isset($messageData[$andKey])));
+
+               // Debug message
+               /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-FACADE[' . __LINE__ . ']: state=' . $this->getPrintableState() . ',messageData=' . print_r($messageData, true));
+
+               // Get a search criteria class
+               $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class');
+
+               // Add all keys
+               foreach (explode($separator, $messageData[$andKey]) as $criteria) {
+                       // Debug message
+                       /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-FACADE[' . __LINE__ . ']: andKey=' . $andKey . ',criteria=' . $criteria);
+
+                       // Add it and leave any 'my-' prefix out
+                       $searchInstance->addChoiceCriteria(str_replace('my-', '', $andKey), $criteria);
+               } // END - foreach
+
+               // Add exclusion key
+               $searchInstance->addExcludeCriteria(str_replace('my-', '', $excludeKey), $messageData[$excludeKey]);
+
+               // Only X entries are fine
+               $searchInstance->setLimit($this->getConfigInstance()->getConfigEntry('node_dht_list_limit'));
+
+               // Run the query
+               $resultInstance = $this->getWrapperInstance()->doSelectByCriteria($searchInstance);
+
+               // Get node list
+               $nodeList = array();
+               while ($resultInstance->next()) {
+                       // Get current element (it should be an array, and have at least 1 entry)
+                       $current = $resultInstance->current();
+                       assert(is_array($current));
+                       assert(count($current) > 0);
+
+                       // Debug message
+                       /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-FACADE[' . __LINE__ . ']: current(' . count($current) . ')[' . gettype($current) . ']=' . print_r($current, TRUE));
+
+                       /*
+                        * Remove some keys as they should not be published.
+                        */
+                       unset($current[$this->getWrapperInstance()->getIndexKey()]);
+
+                       // Add this entry
+                       array_push($nodeList, $current);
+               } // END - while
+
+               // Save last exception
+               $handlerInstance->setLastException($this->getWrapperInstance()->getLastException());
+
+               // Return node list (array)
+               return $nodeList;
+       }
+
+       /**
+        * Inserts given node list array (from earlier database result produced by
+        * an other node) into the DHT. This array origins from above method
+        * queryLocalNodeListExceptByMessageData().
+        *
+        * @param       $nodeList       An array from an earlier database result instance
+        * @return      void
+        */
+       public function insertNodeList (array $nodeList) {
+               // If no node is in the list (array), skip the rest of this method
+               if (count($nodeList) == 0) {
+                       // Debug message
+                       self::createDebugInstance(__CLASS__)->debugOutput('DHT-FACADE[' . __LINE__ . ']: No node record has been returned.');
+
+                       // Abort here
+                       return;
+               } // END - if
+
+               // Put them all into a stack
+               foreach ($nodeList as $nodeData) {
+                       // Insert all entries
+                       $this->getStackerInstance()->pushNamed(self::STACKER_NAME_INSERT_NODE, $nodeData);
+               } // END - foreach
        }
 }