]> git.mxchange.org Git - hub.git/commitdiff
Added unfinished support for publication of DHT entries (all entries in the DHT will...
authorRoland Haeder <roland@mxchange.org>
Sat, 8 Feb 2014 21:39:57 +0000 (22:39 +0100)
committerRoland Haeder <roland@mxchange.org>
Sat, 8 Feb 2014 21:39:57 +0000 (22:39 +0100)
Signed-off-by: Roland Haeder <roland@mxchange.org>
application/hub/config.php
application/hub/interfaces/distributable/class_Distributable.php
application/hub/main/database/wrapper/node/class_NodeDistributedHashTableDatabaseWrapper.php
application/hub/main/dht/class_BaseDht.php
application/hub/main/filter/task/node/class_NodeTaskHandlerInitializerFilter.php
application/hub/main/tasks/node/dht/class_NodeDhtPublicationCheckTask.php [new file with mode: 0644]
application/hub/main/tasks/node/dht/class_NodeDhtQueryTask.php

index a009c0af5b21123425915400a023080c6ca7409d..13901a71139ed3e650412afdd58a68d55cf88e52 100644 (file)
@@ -408,6 +408,9 @@ $cfg->setConfigEntry('stacker_dht_bootstrap_max_size', 10);
 // CFG: STACKER-DHT-INSERT-NODE-MAX-SIZE
 $cfg->setConfigEntry('stacker_dht_insert_node_max_size', 100);
 
+// CFG: STACKER-DHT-PENDING_PUBLISH-MAX-SIZE
+$cfg->setConfigEntry('stacker_dht_pending_publish_max_size', 100);
+
 // CFG: NEWS-MAIN-LIMIT
 $cfg->setConfigEntry('news_main_limit', 5);
 
@@ -510,6 +513,12 @@ $cfg->setConfigEntry('node_dht_bootstrap_task_class', 'NodeDhtBootstrapTask');
 // CFG: NODE-DHT-QUERY-TASK-CLASS
 $cfg->setConfigEntry('node_dht_query_task_class', 'NodeDhtQueryTask');
 
+// CFG: NODE-DHT-PUBLICATION-CHECK-TASK-CLASS
+$cfg->setConfigEntry('node_dht_publication_check_task_class', 'NodeDhtPublicationCheckTask');
+
+// CFG: NODE-DHT-PUBLICATION-TASK-CLASS
+$cfg->setConfigEntry('node_dht_publication_task_class', 'NodeDhtPublicationTask');
+
 // CFG: TASK-NETWORK-PACKAGE-WRITER-STARTUP-DELAY
 $cfg->setConfigEntry('task_network_package_writer_startup_delay', 2500);
 
@@ -582,6 +591,24 @@ $cfg->setConfigEntry('task_dht_query_interval_delay', 50);
 // CFG: TASK-DHT-QUERY-MAX-RUNS
 $cfg->setConfigEntry('task_dht_query_max_runs', 0);
 
+// CFG: TASK-DHT-CHECK-PUBLICATION-STATUP-DELAY
+$cfg->setConfigEntry('task_dht_check_publication_startup_delay', 10000);
+
+// CFG: TASK-DHT-CHECK-PUBLICATION-INTERVAL-DELAY
+$cfg->setConfigEntry('task_dht_check_publication_interval_delay', 1800000); // = 1/2 hour
+
+// CFG: TASK-DHT-CHECK-PUBLICATION-MAX-RUNS
+$cfg->setConfigEntry('task_dht_check_publication_max_runs', 0);
+
+// CFG: TASK-DHT-PUBLICATION-STATUP-DELAY
+$cfg->setConfigEntry('task_dht_publication_startup_delay', 8000);
+
+// CFG: TASK-DHT-PUBLICATION-INTERVAL-DELAY
+$cfg->setConfigEntry('task_dht_publication_interval_delay', 5000);
+
+// CFG: TASK-DHT-PUBLICATION-MAX-RUNS
+$cfg->setConfigEntry('task_dht_publication_max_runs', 0);
+
 // CFG: TASK-LIST-CLASS
 $cfg->setConfigEntry('task_list_class', 'TaskList');
 
index d7fa805b0c776ff81eb50cce13b8f869c895a59b..f3aadbea3d038cb4252220aaf1a938bc16bbbc46 100644 (file)
@@ -104,6 +104,22 @@ interface Distributable extends FrameworkInterface {
         * @return      void
         */
        function insertSingleNodeData ();
+
+       /**
+        * Checks whether there are unpublished entries
+        *
+        * @return      $hasUnpublished         Whether there are unpublished entries
+        */
+       function hasUnpublishedEntries ();
+
+       /**
+        * Initializes publication of DHT entries. This does only prepare
+        * publication. The next step is to pickup such prepared entries and publish
+        * them by uploading to other (recently appeared) DHT members.
+        *
+        * @return      void
+        */
+       function initEntryPublication ();
 }
 
 // [EOF]
index daea599f608bb99568d1d27a9814e21ed42177eb..3cc15d8e4ac4d8e5fbbe34cb2ec53d02cbf73a0b 100644 (file)
  * along with this program. If not, see <http://www.gnu.org/licenses/>.
  */
 class NodeDistributedHashTableDatabaseWrapper extends BaseDatabaseWrapper implements NodeDhtWrapper, Registerable {
+       /**
+        * "Cached" results for dabase for looking for unpublished entries
+        */
+       private $unpublishedEntriesInstance = NULL;
+
        // Constants for database table names
        const DB_TABLE_NODE_DHT = 'node_dht';
 
        // Constants for database column names
-       const DB_COLUMN_NODE_ID          = 'node_id';
-       const DB_COLUMN_SESSION_ID       = 'session_id';
-       const DB_COLUMN_EXTERNAL_IP      = 'external_ip';
-       const DB_COLUMN_LISTEN_PORT      = 'listen_port';
-       const DB_COLUMN_PRIVATE_KEY_HASH = 'private_key_hash';
-       const DB_COLUMN_NODE_MODE        = 'node_mode';
-       const DB_COLUMN_ACCEPTED_OBJECTS = 'accepted_object_types';
-       const DB_COLUMN_NODE_LIST        = 'node_list';
+       const DB_COLUMN_NODE_ID            = 'node_id';
+       const DB_COLUMN_SESSION_ID         = 'session_id';
+       const DB_COLUMN_EXTERNAL_IP        = 'external_ip';
+       const DB_COLUMN_LISTEN_PORT        = 'listen_port';
+       const DB_COLUMN_PRIVATE_KEY_HASH   = 'private_key_hash';
+       const DB_COLUMN_NODE_MODE          = 'node_mode';
+       const DB_COLUMN_ACCEPTED_OBJECTS   = 'accepted_object_types';
+       const DB_COLUMN_NODE_LIST          = 'node_list';
+       const DB_COLUMN_PUBLICATION_STATUS = 'publication_status';
+
+       // Publication status'
+       const PUBLICATION_STATUS_PENDING = 'PENDING';
 
        // Exception codes
        const EXCEPTION_NODE_ALREADY_REGISTERED = 0x800;
@@ -108,6 +117,15 @@ class NodeDistributedHashTableDatabaseWrapper extends BaseDatabaseWrapper implem
                return $searchInstance;
        }
 
+       /**
+        * Getter for result instance for unpublished entries
+        *
+        * @return      $unpublishedEntriesInstance             Result instance
+        */
+       public final function getUnpublishedEntriesInstance () {
+               return $this->unpublishedEntriesInstance;
+       }
+
        /**
         * Prepares a "local" instance of a StoreableCriteria class with all node
         * data for insert/update queries. This data set contains data from *this*
@@ -408,6 +426,71 @@ class NodeDistributedHashTableDatabaseWrapper extends BaseDatabaseWrapper implem
                // Run the "UPDATE" query
                $this->queryUpdateDataSet($dataSetInstance);
        }
+
+       /**
+        * Checks whether there are unpublished entries
+        *
+        * @return      $hasUnpublished         Whether there are unpublished entries
+        * @todo        Add minimum/maximum age limitations
+        */
+       public function hasUnpublishedEntries () {
+               // Get search instance
+               $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class');
+
+               // Add exclusion key which is the publish status
+               $searchInstance->addExcludeCriteria(NodeDistributedHashTableDatabaseWrapper::DB_COLUMN_PUBLICATION_STATUS, NodeDistributedHashTableDatabaseWrapper::PUBLICATION_STATUS_PENDING);
+
+               // Remember search instance
+               $this->setSearchInstance($searchInstance);
+
+               // Run the query
+               $this->unpublishedEntriesInstance = $this->doSelectByCriteria($searchInstance);
+
+               // Check pending entries
+               $hasUnpublished = $this->unpublishedEntriesInstance->valid();
+
+               // Return it
+               return $hasUnpublished;
+       }
+
+       /**
+        * Initializes publication of DHT entries. This does only prepare
+        * publication. The next step is to pickup such prepared entries and publish
+        * them by uploading to other (recently appeared) DHT members.
+        *
+        * @return      void
+        * @todo        Add timestamp to dataset instance
+        */
+       public function initEntryPublication () {
+               /*
+                * Make sure that hasUnpublishedEntries() has been called first by
+                * asserting on the "cached" object instance. This "caching" saves some
+                * needless queries as this method shall be called immediately after
+                * hasUnpublishedEntries() returns TRUE.
+                */
+               assert($this->unpublishedEntriesInstance instanceof SearchableResult);
+
+               // Result is still okay?
+               assert($this->unpublishedEntriesInstance->valid());
+
+               // Remove 'publication_status'
+               $this->getSearchInstance()->unsetCriteria(self::DB_COLUMN_PUBLICATION_STATUS);
+
+               // Make sure all entries are marked as pending, first get a dataset instance.
+               $dataSetInstance = ObjectFactory::createObjectByConfiguredName('dataset_criteria_class', array(self::DB_TABLE_NODE_DHT));
+
+               // Add search instance
+               $dataSetInstance->setSearchInstance($this->getSearchInstance());
+
+               // Set primary key (node id)
+               $dataSetInstance->setUniqueKey(self::DB_COLUMN_NODE_ID);
+
+               // Add criteria (that should be set)
+               $dataSetInstance->addCriteria(self::DB_COLUMN_PUBLICATION_STATUS, self::PUBLICATION_STATUS_PENDING);
+
+               // Run the "UPDATE" query
+               $this->queryUpdateDataSet($dataSetInstance);
+       }
 }
 
 // [EOF]
index 6802de9e5d1c9fd72e67ebc21d061394ba8ab1d7..9f5cb4c799cfdc03295d231fec9db6599e46d5ba 100644 (file)
@@ -25,7 +25,8 @@ abstract class BaseDht extends BaseHubSystem {
        /**
         * Stacker name for "INSERT" node data
         */
-       const STACKER_NAME_INSERT_NODE = 'dht_insert_node';
+       const STACKER_NAME_INSERT_NODE        = 'dht_insert_node';
+       const STACKER_NAME_PENDING_PUBLISHING = 'dht_pending_publish';
 
        /**
         * Protected constructor
@@ -62,6 +63,7 @@ abstract class BaseDht extends BaseHubSystem {
                // Initialize all stacker
                $this->getStackerInstance()->initStacks(array(
                        self::STACKER_NAME_INSERT_NODE,
+                       self::STACKER_NAME_PENDING_PUBLISHING,
                ));
        }
 
@@ -114,6 +116,52 @@ abstract class BaseDht extends BaseHubSystem {
                // Insert the data
                $this->insertDataIntoDht($nodeData);
        }
+
+       /**
+        * Checks whether there are unpublished entries
+        *
+        * @return      $hasUnpublished         Whether there are unpublished entries
+        * @todo        Add minimum/maximum age limitations
+        */
+       public function hasUnpublishedEntries () {
+               // Call method on database wrapper
+               $hasUnpublished = $this->getWrapperInstance()->hasUnpublishedEntries();
+
+               // Return it
+               return $hasUnpublished;
+       }
+
+       /**
+        * Initializes publication of DHT entries. This does only prepare
+        * publication. The next step is to pickup such prepared entries and publish
+        * them by uploading to other (recently appeared) DHT members.
+        *
+        * @return      void
+        */
+       public function initEntryPublication () {
+               // Call method on database wrapper
+               $this->getWrapperInstance()->initEntryPublication();
+
+               // Get result instance
+               $resultInstance = $this->getWrapperInstance()->getUnpublishedEntriesInstance();
+
+               // Must still be valid
+               assert($resultInstance->valid());
+
+               // "Walk" through all entries
+               while ($resultInstance->next()) {
+                       // Get current entry
+                       $current = $resultInstance->current();
+
+                       // Make sure only valid entries pass
+                       // @TODO Maybe add more small checks?
+                       assert(is_array($current));
+
+                       // ... and push it to the next stack
+                       /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('[' . __METHOD__ . ':' . __LINE__ . '] Pushing entry with ' . count($current) . ' elements to stack ' . self::STACKER_NAME_PENDING_PUBLISHING . ' ...');
+                       $this->getStackerInstance()->pushNamed(self::STACKER_NAME_PENDING_PUBLISHING, $current);
+               } // END - while
+       }
 }
 
 // [EOF]
index 5fb366c881249930d08ed3b0b573d3c1bd5b3a03..9f540061304065de8a4bb8f8e748a76edc0298d0 100644 (file)
@@ -103,6 +103,18 @@ class NodeTaskHandlerInitializerFilter extends BaseNodeFilter implements Filtera
                // Register it as well
                $handlerInstance->registerTask('dht_query', $taskInstance);
 
+               // Generate DHT publication-check task
+               $taskInstance = ObjectFactory::createObjectByConfiguredName('node_dht_publication_check_task_class');
+
+               // Register it as well
+               $handlerInstance->registerTask('dht_check_publication', $taskInstance);
+
+               // Generate DHT publication task
+               $taskInstance = ObjectFactory::createObjectByConfiguredName('node_dht_publication_task_class');
+
+               // Register it as well
+               $handlerInstance->registerTask('dht_publication', $taskInstance);
+
                // Prepare a package-tags initialization task for the listeners
                $taskInstance = ObjectFactory::createObjectByConfiguredName('node_package_tags_init_task_class');
 
diff --git a/application/hub/main/tasks/node/dht/class_NodeDhtPublicationCheckTask.php b/application/hub/main/tasks/node/dht/class_NodeDhtPublicationCheckTask.php
new file mode 100644 (file)
index 0000000..a37351c
--- /dev/null
@@ -0,0 +1,84 @@
+<?php
+/**
+ * A DhtPublicationCheck node-task
+ *
+ * @author             Roland Haeder <webmaster@shipsimu.org>
+ * @version            0.0.0
+ * @copyright  Copyright (c) 2007, 2008 Roland Haeder, 2009 - 2012 Hub Developer Team
+ * @license            GNU GPL 3.0 or any newer version
+ * @link               http://www.shipsimu.org
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+class NodeDhtPublicationCheckTask extends BaseTask implements Taskable, Visitable {
+       /**
+        * Protected constructor
+        *
+        * @return      void
+        */
+       protected function __construct () {
+               // Call parent constructor
+               parent::__construct(__CLASS__);
+       }
+
+       /**
+        * Creates an instance of this class
+        *
+        * @return      $taskInstance   An instance of a Visitable class
+        */
+       public final static function createNodeDhtPublicationCheckTask () {
+               // Get new instance
+               $taskInstance = new NodeDhtPublicationCheckTask();
+
+               // Get a DHT instance
+               $dhtInstance = DhtObjectFactory::createDhtObjectInstance('node');
+
+               // Set the DHT instance here
+               $taskInstance->setDhtInstance($dhtInstance);
+
+               // Return the prepared instance
+               return $taskInstance;
+       }
+
+       /**
+        * Accepts the visitor to process the visitor
+        *
+        * @param       $visitorInstance        An instance of a Visitor class
+        * @return      void
+        */
+       public function accept (Visitor $visitorInstance) {
+               // Visit this task
+               $visitorInstance->visitTask($this);
+       }
+
+       /**
+        * Executes the task
+        *
+        * @return      void
+        * @todo        Add more?
+        */
+       public function executeTask () {
+               // Get DHT instance
+               $dhtInstance = $this->getDhtInstance();
+
+               // Has the DHT some unpublished entries?
+               if ($dhtInstance->hasUnpublishedEntries()) {
+                       // Then initiate publishing them
+                       $dhtInstance->initEntryPublication();
+               } // END - if
+       }
+}
+
+// [EOF]
+?>
index 177c7a33c46a206c50a1d19a31007948f1a18ac0..f8daa16d34c0aafc0412a0c223c98339af95fe96 100644 (file)
@@ -69,7 +69,7 @@ class NodeDhtQueryTask extends BaseTask implements Taskable, Visitable {
         * @todo        ~5% done
         */
        public function executeTask () {
-               // "Cache" the DHT instance
+               // Get DHT instance
                $dhtInstance = $this->getDhtInstance();
 
                // Are there "INSERT" node data entries?