3 * A database wrapper for distributed hash tables
5 * @author Roland Haeder <webmaster@shipsimu.org>
7 * @copyright Copyright (c) 2007, 2008 Roland Haeder, 2009 - 2012 Hub Developer Team
8 * @license GNU GPL 3.0 or any newer version
9 * @link http://www.shipsimu.org
11 * This program is free software: you can redistribute it and/or modify
12 * it under the terms of the GNU General Public License as published by
13 * the Free Software Foundation, either version 3 of the License, or
14 * (at your option) any later version.
16 * This program is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with this program. If not, see <http://www.gnu.org/licenses/>.
24 class NodeDistributedHashTableDatabaseWrapper extends BaseDatabaseWrapper implements NodeDhtWrapper, Registerable {
26 * "Cached" results for dabase for looking for unpublished entries
28 private $unpublishedEntriesInstance = NULL;
30 // Constants for database table names
31 const DB_TABLE_NODE_DHT = 'node_dht';
33 // Constants for database column names
34 const DB_COLUMN_NODE_ID = 'node_id';
35 const DB_COLUMN_SESSION_ID = 'session_id';
36 const DB_COLUMN_EXTERNAL_IP = 'external_ip';
37 const DB_COLUMN_LISTEN_PORT = 'listen_port';
38 const DB_COLUMN_PRIVATE_KEY_HASH = 'private_key_hash';
39 const DB_COLUMN_NODE_MODE = 'node_mode';
40 const DB_COLUMN_ACCEPTED_OBJECTS = 'accepted_object_types';
41 const DB_COLUMN_NODE_LIST = 'node_list';
42 const DB_COLUMN_PUBLICATION_STATUS = 'publication_status';
43 const DB_COLUMN_ANSWER_STATUS = 'answer_status';
45 // Publication status'
46 const PUBLICATION_STATUS_PENDING = 'PENDING';
49 const EXCEPTION_NODE_ALREADY_REGISTERED = 0x800;
50 const EXCEPTION_NODE_NOT_REGISTERED = 0x801;
53 * Protected constructor
57 protected function __construct () {
58 // Call parent constructor
59 parent::__construct(__CLASS__);
63 * Creates an instance of this database wrapper by a provided user class
65 * @return $wrapperInstance An instance of the created wrapper class
67 public static final function createNodeDistributedHashTableDatabaseWrapper () {
69 $wrapperInstance = new NodeDistributedHashTableDatabaseWrapper();
71 // Set (primary!) table name
72 $wrapperInstance->setTableName(self::DB_TABLE_NODE_DHT);
74 // Return the instance
75 return $wrapperInstance;
79 * Static getter for an array of all DHT database entries
81 * @return $elements All elements for the DHT dabase
83 public static final function getAllElements () {
84 // Create array and ...
86 self::DB_COLUMN_NODE_ID,
87 self::DB_COLUMN_SESSION_ID,
88 self::DB_COLUMN_EXTERNAL_IP,
89 self::DB_COLUMN_LISTEN_PORT,
90 self::DB_COLUMN_PRIVATE_KEY_HASH,
91 self::DB_COLUMN_NODE_MODE,
92 self::DB_COLUMN_ACCEPTED_OBJECTS,
93 self::DB_COLUMN_NODE_LIST
101 * Prepares a search instance for given node data
103 * @param $nodeData An array with valid node data
104 * @return $searchInstance An instance of a SearchCriteria class
106 private function prepareSearchInstance (array $nodeData) {
107 // Assert on array elements
108 assert(isset($nodeData[self::DB_COLUMN_NODE_ID]));
111 $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class');
113 // Search for node id and limit it to one entry
114 $searchInstance->addCriteria(self::DB_COLUMN_NODE_ID, $nodeData[self::DB_COLUMN_NODE_ID]);
115 $searchInstance->setLimit(1);
118 return $searchInstance;
122 * Getter for result instance for unpublished entries
124 * @return $unpublishedEntriesInstance Result instance
126 public final function getUnpublishedEntriesInstance () {
127 return $this->unpublishedEntriesInstance;
131 * Prepares a "local" instance of a StoreableCriteria class with all node
132 * data for insert/update queries. This data set contains data from *this*
135 * @return $dataSetInstance An instance of a StoreableCriteria class
137 private function prepareLocalDataSetInstance () {
138 // Get node/request instances
139 $nodeInstance = Registry::getRegistry()->getInstance('node');
140 $requestInstance = ApplicationHelper::getSelfInstance()->getRequestInstance();
142 // Get a dataset instance
143 $dataSetInstance = ObjectFactory::createObjectByConfiguredName('dataset_criteria_class', array(self::DB_TABLE_NODE_DHT));
145 // Set the primary key
146 $dataSetInstance->setUniqueKey(self::DB_COLUMN_NODE_ID);
148 // Get ip:port combination and "explode" it
149 $ipPort = $nodeInstance->getAddressPortArray();
151 // Make sure both is valid
152 assert(($ipPort[0] !== 'invalid') && ($ipPort[1] !== 'invalid'));
154 // Get an array of all accepted object types
155 $objectList = $nodeInstance->getListFromAcceptedObjectTypes();
157 // Make sure this is an array
158 assert(is_array($objectList));
160 // Add public node data
161 $dataSetInstance->addCriteria(self::DB_COLUMN_NODE_MODE , $requestInstance->getRequestElement('mode'));
162 $dataSetInstance->addCriteria(self::DB_COLUMN_EXTERNAL_IP , $ipPort[0]);
163 $dataSetInstance->addCriteria(self::DB_COLUMN_LISTEN_PORT , $ipPort[1]);
164 $dataSetInstance->addCriteria(self::DB_COLUMN_NODE_ID , $nodeInstance->getNodeId());
165 $dataSetInstance->addCriteria(self::DB_COLUMN_SESSION_ID , $nodeInstance->getSessionId());
166 $dataSetInstance->addCriteria(self::DB_COLUMN_PRIVATE_KEY_HASH, $nodeInstance->getPrivateKeyHash());
167 $dataSetInstance->addCriteria(self::DB_COLUMN_ACCEPTED_OBJECTS, implode(BaseHubNode::OBJECT_LIST_SEPARATOR, $objectList));
170 return $dataSetInstance;
174 * Checks whether the local (*this*) node is registered in the DHT by
175 * checking if the external ip/port is found.
177 * @return $isRegistered Whether *this* node is registered in the DHT
179 public function isLocalNodeRegistered () {
181 if (!isset($GLOBALS[__METHOD__])) {
182 // Get a search criteria instance
183 $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class');
186 $nodeInstance = Registry::getRegistry()->getInstance('node');
188 // Get ip:port combination and "explode" it
189 $ipPort = $nodeInstance->getAddressPortArray();
192 * Make sure both is not 'invalid' which means that the resolver
195 assert(($ipPort[0] !== 'invalid') && ($ipPort[1] !== 'invalid'));
197 // Add ip:port/node id as criteria
198 $searchInstance->addCriteria(self::DB_COLUMN_EXTERNAL_IP, $ipPort[0]);
199 $searchInstance->addCriteria(self::DB_COLUMN_LISTEN_PORT, $ipPort[1]);
200 $searchInstance->addCriteria(self::DB_COLUMN_NODE_ID , $nodeInstance->getNodeId());
201 $searchInstance->setLimit(1);
203 // Query database and get a result instance back
204 $resultInstance = $this->doSelectByCriteria($searchInstance);
206 // Cache result of if there is an entry, valid() will tell us if an entry is there
207 $GLOBALS[__METHOD__] = $resultInstance->valid();
211 return $GLOBALS[__METHOD__];
215 * Registeres the local (*this*) node with its data in the DHT.
219 public function registerLocalNode () {
220 // Assert to make sure this method is called with no record in DB (the actual backend of the DHT)
221 assert(!$this->isLocalNodeRegistered());
223 // Get prepared data set instance
224 $dataSetInstance = $this->prepareLocalDataSetInstance();
226 // "Insert" this dataset instance completely into the database
227 $this->queryInsertDataSet($dataSetInstance);
231 * Updates local (*this*) node data in DHT, this is but not limited to the
232 * session id, ip number (and/or hostname) and port number.
236 public function updateLocalNode () {
237 // Assert to make sure this method is called with one record in DB (the actual backend of the DHT)
238 assert($this->isLocalNodeRegistered());
241 $nodeInstance = Registry::getRegistry()->getInstance('node');
243 // Get search criteria
244 $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class');
246 // Search for node id and limit it to one entry
247 $searchInstance->addCriteria(self::DB_COLUMN_NODE_ID, $nodeInstance->getNodeId());
248 $searchInstance->setLimit(1);
250 // Get a prepared dataset instance
251 $dataSetInstance = $this->prepareLocalDataSetInstance();
253 // Set search instance
254 $dataSetInstance->setSearchInstance($searchInstance);
256 // Update DHT database record
257 $this->queryUpdateDataSet($dataSetInstance);
261 * Finds a node locally by given session id
263 * @param $sessionId Session id to lookup
264 * @return $nodeData Node data array
266 public function findNodeLocalBySessionId ($sessionId) {
267 // Get search criteria
268 $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class');
270 // Search for session id and limit it to one entry
271 $searchInstance->addCriteria(self::DB_COLUMN_SESSION_ID, $sessionId);
272 $searchInstance->setLimit(1);
274 // Query database and get a result instance back
275 $resultInstance = $this->doSelectByCriteria($searchInstance);
277 // Return result instance
278 return $resultInstance;
282 * Registeres a node by given message data.
284 * @param $messageData An array of all message data
285 * @param $handlerInstance An instance of a Handleable class
288 public function registerNodeByMessageData (array $messageData, Handleable $handlerInstance) {
289 // Get a data set instance
290 $dataSetInstance = ObjectFactory::createObjectByConfiguredName('dataset_criteria_class', array(self::DB_TABLE_NODE_DHT));
292 // Set primary key (session id)
293 $dataSetInstance->setUniqueKey(self::DB_COLUMN_SESSION_ID);
295 // Add all array elements
296 $handlerInstance->addArrayToDataSet($dataSetInstance, $messageData);
298 // Remove 'node_list'
299 $dataSetInstance->unsetCriteria(self::DB_COLUMN_NODE_LIST);
301 // Run the "INSERT" query
302 $this->queryInsertDataSet($dataSetInstance);
306 * Updates an existing entry in node list
308 * @param $messageData An array of all message data
309 * @param $handlerInstance An instance of a Handleable class
310 * @param $searchInstance An instance of LocalSearchCriteria class
313 public function updateNodeByMessageData (array $messageData, Handleable $handlerInstance, LocalSearchCriteria $searchInstance) {
314 // Get a data set instance
315 $dataSetInstance = ObjectFactory::createObjectByConfiguredName('dataset_criteria_class', array(self::DB_TABLE_NODE_DHT));
317 // Add search instance
318 $dataSetInstance->setSearchInstance($searchInstance);
320 // Set primary key (session id)
321 $dataSetInstance->setUniqueKey(self::DB_COLUMN_SESSION_ID);
323 // Add all array elements
324 $handlerInstance->addArrayToDataSet($dataSetInstance, $messageData);
326 // Remove 'node_list'
327 $dataSetInstance->unsetCriteria(self::DB_COLUMN_NODE_LIST);
329 // Run the "UPDATE" query
330 $this->queryUpdateDataSet($dataSetInstance);
334 * Determines whether the given node data is already inserted in the DHT
336 * @param $nodeData An array with valid node data
337 * @return $isRegistered Whether the given node data is already inserted
339 public function isNodeRegistered (array $nodeData) {
340 // Assert on array elements
341 assert(isset($nodeData[self::DB_COLUMN_NODE_ID]));
343 // Get search criteria
344 $searchInstance = $this->prepareSearchInstance($nodeData);
346 // Query database and get a result instance back
347 $resultInstance = $this->doSelectByCriteria(
350 // Only look for these array elements ("keys")
352 self::DB_COLUMN_NODE_ID => TRUE,
353 self::DB_COLUMN_EXTERNAL_IP => TRUE,
354 self::DB_COLUMN_LISTEN_PORT => TRUE,
358 // Check if there is an entry
359 $isRegistered = $resultInstance->valid();
361 // Return registration status
362 return $isRegistered;
366 * Registers a node with given data in the DHT. If the node is already
367 * registered this method shall throw an exception.
369 * @param $nodeData An array with valid node data
371 * @throws NodeAlreadyRegisteredException If the node is already registered
373 public function registerNode (array $nodeData) {
374 // Assert on array elements
375 assert(isset($nodeData[self::DB_COLUMN_NODE_ID]));
377 // Is the node registered?
378 if ($this->isNodeRegistered($nodeData)) {
379 // Throw an exception
380 throw new NodeAlreadyRegisteredException(array($this, $nodeData), self::EXCEPTION_NODE_ALREADY_REGISTERED);
383 // @TODO Unimplemented part
384 $this->partialStub('nodeData=' . print_r($nodeData, TRUE));
388 * Updates a node's entry in the DHT with given data. This will enrich or
389 * just update already exsiting data. If the node is not found this method
390 * shall throw an exception.
392 * @param $nodeData An array with valid node data
394 * @throws NodeDataMissingException If the node's data is missing
396 public function updateNode (array $nodeData) {
397 // Assert on array elements
398 assert(isset($nodeData[self::DB_COLUMN_NODE_ID]));
400 // Is the node registered?
401 if (!$this->isNodeRegistered($nodeData)) {
402 // No, then throw an exception
403 throw new NodeDataMissingException(array($this, $nodeData), self::EXCEPTION_NODE_NOT_REGISTERED);
406 // Get a search instance
407 $searchInstance = $this->prepareSearchInstance($nodeData);
409 // Get a data set instance
410 $dataSetInstance = ObjectFactory::createObjectByConfiguredName('dataset_criteria_class', array(self::DB_TABLE_NODE_DHT));
412 // Add search instance
413 $dataSetInstance->setSearchInstance($searchInstance);
415 // Set primary key (session id)
416 $dataSetInstance->setUniqueKey(self::DB_COLUMN_SESSION_ID);
419 $nodeInstance = Registry::getRegistry()->getInstance('node');
421 // Add all array elements
422 $nodeInstance->addArrayToDataSet($dataSetInstance, $nodeData);
424 // Remove 'node_list'
425 $dataSetInstance->unsetCriteria(self::DB_COLUMN_NODE_LIST);
427 // Run the "UPDATE" query
428 $this->queryUpdateDataSet($dataSetInstance);
432 * Checks whether there are unpublished entries
434 * @return $hasUnpublished Whether there are unpublished entries
435 * @todo Add minimum/maximum age limitations
437 public function hasUnpublishedEntries () {
438 // Get search instance
439 $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class');
441 // Add exclusion key which is the publish status
442 $searchInstance->addExcludeCriteria(NodeDistributedHashTableDatabaseWrapper::DB_COLUMN_PUBLICATION_STATUS, NodeDistributedHashTableDatabaseWrapper::PUBLICATION_STATUS_PENDING);
444 // Remember search instance
445 $this->setSearchInstance($searchInstance);
448 $this->unpublishedEntriesInstance = $this->doSelectByCriteria($searchInstance);
450 // Check pending entries
451 $hasUnpublished = $this->unpublishedEntriesInstance->valid();
454 return $hasUnpublished;
458 * Initializes publication of DHT entries. This does only prepare
459 * publication. The next step is to pickup such prepared entries and publish
460 * them by uploading to other (recently appeared) DHT members.
463 * @todo Add timestamp to dataset instance
465 public function initEntryPublication () {
467 * Make sure that hasUnpublishedEntries() has been called first by
468 * asserting on the "cached" object instance. This "caching" saves some
469 * needless queries as this method shall be called immediately after
470 * hasUnpublishedEntries() returns TRUE.
472 assert($this->unpublishedEntriesInstance instanceof SearchableResult);
474 // Result is still okay?
475 assert($this->unpublishedEntriesInstance->valid());
477 // Remove 'publication_status'
478 $this->getSearchInstance()->unsetCriteria(self::DB_COLUMN_PUBLICATION_STATUS);
480 // Make sure all entries are marked as pending, first get a dataset instance.
481 $dataSetInstance = ObjectFactory::createObjectByConfiguredName('dataset_criteria_class', array(self::DB_TABLE_NODE_DHT));
483 // Add search instance
484 $dataSetInstance->setSearchInstance($this->getSearchInstance());
486 // Set primary key (node id)
487 $dataSetInstance->setUniqueKey(self::DB_COLUMN_NODE_ID);
489 // Add criteria (that should be set)
490 $dataSetInstance->addCriteria(self::DB_COLUMN_PUBLICATION_STATUS, self::PUBLICATION_STATUS_PENDING);
492 // Run the "UPDATE" query
493 $this->queryUpdateDataSet($dataSetInstance);
497 * Removes non-public data from given array.
499 * @param $data An array with possible non-public data that needs to be removed.
500 * @return $data A cleaned up array with only public data.
502 public function removeNonPublicDataFromArray(array $data) {
503 // Currently call only inner method
504 //* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . $this->__toString() . ']: Calling parent::removeNonPublicDataFromArray(data) ...');
505 $data = parent::removeNonPublicDataFromArray($data);
506 //* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('DHT-WRAPPER[' . $this->__toString() . ']: data[]=' . gettype($data));
508 // Return cleaned data
513 * Find recipients for given package data and exclude the sender
515 * @param $packageData An array of valid package data
516 * @return $recipients An indexed array with DHT recipients
518 public function getResultFromExcludedSender (array $packageData) {
519 // Assert on required array field
520 assert(isset($packageData[NetworkPackage::PACKAGE_DATA_SENDER]));
522 // First creata a search instance
523 $searchInstance = ObjectFactory::createObjectByConfiguredName('search_criteria_class');
525 // Then exclude 'sender' field as the sender is the current (*this*) node
526 $searchInstance->addExcludeCriteria(NodeDistributedHashTableDatabaseWrapper::DB_COLUMN_SESSION_ID, $packageData[NetworkPackage::PACKAGE_DATA_SENDER]);
528 // Get a result instance back from DHT database wrapper.
529 $resultInstance = $this->doSelectByCriteria($searchInstance);
531 // Return result instance
532 return $resultInstance;