// CFG: STACKER-DHT-INSERT-NODE-MAX-SIZE
$cfg->setConfigEntry('stacker_dht_insert_node_max_size', 100);
+// CFG: STACKER-DHT-PENDING_PUBLISH-MAX-SIZE
+$cfg->setConfigEntry('stacker_dht_pending_publish_max_size', 100);
+
// CFG: NEWS-MAIN-LIMIT
$cfg->setConfigEntry('news_main_limit', 5);
// CFG: NODE-DHT-QUERY-TASK-CLASS
$cfg->setConfigEntry('node_dht_query_task_class', 'NodeDhtQueryTask');
+// CFG: NODE-DHT-PUBLICATION-CHECK-TASK-CLASS
+$cfg->setConfigEntry('node_dht_publication_check_task_class', 'NodeDhtPublicationCheckTask');
+
+// CFG: NODE-DHT-PUBLICATION-TASK-CLASS
+$cfg->setConfigEntry('node_dht_publication_task_class', 'NodeDhtPublicationTask');
+
// CFG: TASK-NETWORK-PACKAGE-WRITER-STARTUP-DELAY
$cfg->setConfigEntry('task_network_package_writer_startup_delay', 2500);
// CFG: TASK-DHT-QUERY-MAX-RUNS
$cfg->setConfigEntry('task_dht_query_max_runs', 0);
+// CFG: TASK-DHT-CHECK-PUBLICATION-STATUP-DELAY
+$cfg->setConfigEntry('task_dht_check_publication_startup_delay', 10000);
+
+// CFG: TASK-DHT-CHECK-PUBLICATION-INTERVAL-DELAY
+$cfg->setConfigEntry('task_dht_check_publication_interval_delay', 1800000); // = 1/2 hour
+
+// CFG: TASK-DHT-CHECK-PUBLICATION-MAX-RUNS
+$cfg->setConfigEntry('task_dht_check_publication_max_runs', 0);
+
+// CFG: TASK-DHT-PUBLICATION-STATUP-DELAY
+$cfg->setConfigEntry('task_dht_publication_startup_delay', 8000);
+
+// CFG: TASK-DHT-PUBLICATION-INTERVAL-DELAY
+$cfg->setConfigEntry('task_dht_publication_interval_delay', 5000);
+
+// CFG: TASK-DHT-PUBLICATION-MAX-RUNS
+$cfg->setConfigEntry('task_dht_publication_max_runs', 0);
+
// CFG: TASK-LIST-CLASS
$cfg->setConfigEntry('task_list_class', 'TaskList');
* @return void
*/
function insertSingleNodeData ();
+
+ /**
+ * Checks whether there are unpublished entries
+ *
+ * @return $hasUnpublished Whether there are unpublished entries
+ */
+ function hasUnpublishedEntries ();
+
+ /**
+ * Initializes publication of DHT entries. This does only prepare
+ * publication. The next step is to pickup such prepared entries and publish
+ * them by uploading to other (recently appeared) DHT members.
+ *
+ * @return void
+ */
+ function initEntryPublication ();
}
// [EOF]
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
class NodeDistributedHashTableDatabaseWrapper extends BaseDatabaseWrapper implements NodeDhtWrapper, Registerable {
+ /**
+ * "Cached" results for dabase for looking for unpublished entries
+ */
+ private $unpublishedEntriesInstance = NULL;
+
// Constants for database table names
const DB_TABLE_NODE_DHT = 'node_dht';
// Constants for database column names
- const DB_COLUMN_NODE_ID = 'node_id';
- const DB_COLUMN_SESSION_ID = 'session_id';
- const DB_COLUMN_EXTERNAL_IP = 'external_ip';
- const DB_COLUMN_LISTEN_PORT = 'listen_port';
- const DB_COLUMN_PRIVATE_KEY_HASH = 'private_key_hash';
- const DB_COLUMN_NODE_MODE = 'node_mode';
- const DB_COLUMN_ACCEPTED_OBJECTS = 'accepted_object_types';
- const DB_COLUMN_NODE_LIST = 'node_list';
+ const DB_COLUMN_NODE_ID = 'node_id';
+ const DB_COLUMN_SESSION_ID = 'session_id';
+ const DB_COLUMN_EXTERNAL_IP = 'external_ip';
+ const DB_COLUMN_LISTEN_PORT = 'listen_port';
+ const DB_COLUMN_PRIVATE_KEY_HASH = 'private_key_hash';
+ const DB_COLUMN_NODE_MODE = 'node_mode';
+ const DB_COLUMN_ACCEPTED_OBJECTS = 'accepted_object_types';
+ const DB_COLUMN_NODE_LIST = 'node_list';
+ const DB_COLUMN_PUBLICATION_STATUS = 'publication_status';
+
+ // Publication status'
+ const PUBLICATION_STATUS_PENDING = 'PENDING';
// Exception codes
const EXCEPTION_NODE_ALREADY_REGISTERED = 0x800;
return $searchInstance;
}
+ /**
+ * Getter for result instance for unpublished entries
+ *
+ * @return $unpublishedEntriesInstance Result instance
+ */
+ public final function getUnpublishedEntriesInstance () {
+ return $this->unpublishedEntriesInstance;
+ }
+
/**
* Prepares a "local" instance of a StoreableCriteria class with all node
* data for insert/update queries. This data set contains data from *this*
// Run the "UPDATE" query
$this->queryUpdateDataSet($dataSetInstance);
}
+
+ /**
+ * Checks whether there are unpublished entries
+ *
+ * @return $hasUnpublished Whether there are unpublished entries
+ * @todo Add minimum/maximum age limitations
+ */
+ public function hasUnpublishedEntries () {
+ // Get search instance
+ $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class');
+
+ // Add exclusion key which is the publish status
+ $searchInstance->addExcludeCriteria(NodeDistributedHashTableDatabaseWrapper::DB_COLUMN_PUBLICATION_STATUS, NodeDistributedHashTableDatabaseWrapper::PUBLICATION_STATUS_PENDING);
+
+ // Remember search instance
+ $this->setSearchInstance($searchInstance);
+
+ // Run the query
+ $this->unpublishedEntriesInstance = $this->doSelectByCriteria($searchInstance);
+
+ // Check pending entries
+ $hasUnpublished = $this->unpublishedEntriesInstance->valid();
+
+ // Return it
+ return $hasUnpublished;
+ }
+
+ /**
+ * Initializes publication of DHT entries. This does only prepare
+ * publication. The next step is to pickup such prepared entries and publish
+ * them by uploading to other (recently appeared) DHT members.
+ *
+ * @return void
+ * @todo Add timestamp to dataset instance
+ */
+ public function initEntryPublication () {
+ /*
+ * Make sure that hasUnpublishedEntries() has been called first by
+ * asserting on the "cached" object instance. This "caching" saves some
+ * needless queries as this method shall be called immediately after
+ * hasUnpublishedEntries() returns TRUE.
+ */
+ assert($this->unpublishedEntriesInstance instanceof SearchableResult);
+
+ // Result is still okay?
+ assert($this->unpublishedEntriesInstance->valid());
+
+ // Remove 'publication_status'
+ $this->getSearchInstance()->unsetCriteria(self::DB_COLUMN_PUBLICATION_STATUS);
+
+ // Make sure all entries are marked as pending, first get a dataset instance.
+ $dataSetInstance = ObjectFactory::createObjectByConfiguredName('dataset_criteria_class', array(self::DB_TABLE_NODE_DHT));
+
+ // Add search instance
+ $dataSetInstance->setSearchInstance($this->getSearchInstance());
+
+ // Set primary key (node id)
+ $dataSetInstance->setUniqueKey(self::DB_COLUMN_NODE_ID);
+
+ // Add criteria (that should be set)
+ $dataSetInstance->addCriteria(self::DB_COLUMN_PUBLICATION_STATUS, self::PUBLICATION_STATUS_PENDING);
+
+ // Run the "UPDATE" query
+ $this->queryUpdateDataSet($dataSetInstance);
+ }
}
// [EOF]
/**
* Stacker name for "INSERT" node data
*/
- const STACKER_NAME_INSERT_NODE = 'dht_insert_node';
+ const STACKER_NAME_INSERT_NODE = 'dht_insert_node';
+ const STACKER_NAME_PENDING_PUBLISHING = 'dht_pending_publish';
/**
* Protected constructor
// Initialize all stacker
$this->getStackerInstance()->initStacks(array(
self::STACKER_NAME_INSERT_NODE,
+ self::STACKER_NAME_PENDING_PUBLISHING,
));
}
// Insert the data
$this->insertDataIntoDht($nodeData);
}
+
+ /**
+ * Checks whether there are unpublished entries
+ *
+ * @return $hasUnpublished Whether there are unpublished entries
+ * @todo Add minimum/maximum age limitations
+ */
+ public function hasUnpublishedEntries () {
+ // Call method on database wrapper
+ $hasUnpublished = $this->getWrapperInstance()->hasUnpublishedEntries();
+
+ // Return it
+ return $hasUnpublished;
+ }
+
+ /**
+ * Initializes publication of DHT entries. This does only prepare
+ * publication. The next step is to pickup such prepared entries and publish
+ * them by uploading to other (recently appeared) DHT members.
+ *
+ * @return void
+ */
+ public function initEntryPublication () {
+ // Call method on database wrapper
+ $this->getWrapperInstance()->initEntryPublication();
+
+ // Get result instance
+ $resultInstance = $this->getWrapperInstance()->getUnpublishedEntriesInstance();
+
+ // Must still be valid
+ assert($resultInstance->valid());
+
+ // "Walk" through all entries
+ while ($resultInstance->next()) {
+ // Get current entry
+ $current = $resultInstance->current();
+
+ // Make sure only valid entries pass
+ // @TODO Maybe add more small checks?
+ assert(is_array($current));
+
+ // ... and push it to the next stack
+ /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('[' . __METHOD__ . ':' . __LINE__ . '] Pushing entry with ' . count($current) . ' elements to stack ' . self::STACKER_NAME_PENDING_PUBLISHING . ' ...');
+ $this->getStackerInstance()->pushNamed(self::STACKER_NAME_PENDING_PUBLISHING, $current);
+ } // END - while
+ }
}
// [EOF]
// Register it as well
$handlerInstance->registerTask('dht_query', $taskInstance);
+ // Generate DHT publication-check task
+ $taskInstance = ObjectFactory::createObjectByConfiguredName('node_dht_publication_check_task_class');
+
+ // Register it as well
+ $handlerInstance->registerTask('dht_check_publication', $taskInstance);
+
+ // Generate DHT publication task
+ $taskInstance = ObjectFactory::createObjectByConfiguredName('node_dht_publication_task_class');
+
+ // Register it as well
+ $handlerInstance->registerTask('dht_publication', $taskInstance);
+
// Prepare a package-tags initialization task for the listeners
$taskInstance = ObjectFactory::createObjectByConfiguredName('node_package_tags_init_task_class');
--- /dev/null
+<?php
+/**
+ * A DhtPublicationCheck node-task
+ *
+ * @author Roland Haeder <webmaster@shipsimu.org>
+ * @version 0.0.0
+ * @copyright Copyright (c) 2007, 2008 Roland Haeder, 2009 - 2012 Hub Developer Team
+ * @license GNU GPL 3.0 or any newer version
+ * @link http://www.shipsimu.org
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation, either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+class NodeDhtPublicationCheckTask extends BaseTask implements Taskable, Visitable {
+ /**
+ * Protected constructor
+ *
+ * @return void
+ */
+ protected function __construct () {
+ // Call parent constructor
+ parent::__construct(__CLASS__);
+ }
+
+ /**
+ * Creates an instance of this class
+ *
+ * @return $taskInstance An instance of a Visitable class
+ */
+ public final static function createNodeDhtPublicationCheckTask () {
+ // Get new instance
+ $taskInstance = new NodeDhtPublicationCheckTask();
+
+ // Get a DHT instance
+ $dhtInstance = DhtObjectFactory::createDhtObjectInstance('node');
+
+ // Set the DHT instance here
+ $taskInstance->setDhtInstance($dhtInstance);
+
+ // Return the prepared instance
+ return $taskInstance;
+ }
+
+ /**
+ * Accepts the visitor to process the visitor
+ *
+ * @param $visitorInstance An instance of a Visitor class
+ * @return void
+ */
+ public function accept (Visitor $visitorInstance) {
+ // Visit this task
+ $visitorInstance->visitTask($this);
+ }
+
+ /**
+ * Executes the task
+ *
+ * @return void
+ * @todo Add more?
+ */
+ public function executeTask () {
+ // Get DHT instance
+ $dhtInstance = $this->getDhtInstance();
+
+ // Has the DHT some unpublished entries?
+ if ($dhtInstance->hasUnpublishedEntries()) {
+ // Then initiate publishing them
+ $dhtInstance->initEntryPublication();
+ } // END - if
+ }
+}
+
+// [EOF]
+?>
* @todo ~5% done
*/
public function executeTask () {
- // "Cache" the DHT instance
+ // Get DHT instance
$dhtInstance = $this->getDhtInstance();
// Are there "INSERT" node data entries?