// CFG: NODE-REQUEST-NODE-LIST-ENTRY-STACKER-CLASS
$cfg->setConfigEntry('node_request_node_list_entry_stacker_class', 'FiLoStacker');
+// CFG: DHT-STACKER-CLASS
+$cfg->setConfigEntry('dht_stacker_class', 'FiLoStacker');
+
// CFG: NODE-ANNOUNCEMENT-ANSWER-TEMPLATE-TYPE
$cfg->setConfigEntry('node_announcement_answer_template_type', 'xml/answer/announcement');
// CFG: STACKER-DHT-BOOTSTRAP-MAX-SIZE
$cfg->setConfigEntry('stacker_dht_bootstrap_max_size', 10);
+// CFG: STACKER-DHT-INSERT-NODE-MAX-SIZE
+$cfg->setConfigEntry('stacker_dht_insert_node_max_size', 100);
+
// CFG: NEWS-MAIN-LIMIT
$cfg->setConfigEntry('news_main_limit', 5);
* @return void
*/
function updateDhtData ();
+
+ /**
+ * Checks whether there are entries in "INSERT" node data stack
+ *
+ * @return $isPending Whether there are pending entries
+ */
+ function ifInsertNodeDataPending ();
+
+ /**
+ * Inserts a single entry of node data into the DHT
+ *
+ * @return void
+ */
+ function insertSingleNodeData ();
}
// [EOF]
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
class BaseDht extends BaseHubSystem {
+ /**
+ * Stacker name for "INSERT" node data
+ */
+ const STACKER_NAME_INSERT_NODE = 'dht_insert_node';
+
/**
* Protected constructor
*
// Call parent constructor
parent::__construct($className);
+ // Get a stacker instance for this DHT
+ $stackerInstance = ObjectFactory::createObjectByConfiguredName('dht_stacker_class');
+
+ // Set it in this class
+ $this->setStackerInstance($stackerInstance);
+
+ // Init all stackers
+ $this->initStackers();
+
/*
* Get the state factory and create the initial state, we don't need
* the state instance here
DhtStateFactory::createDhtStateInstanceByName('init', $this);
}
+ /**
+ * Initializes all stackers
+ *
+ * @return void
+ */
+ private function initStackers () {
+ // "Walk" through all (more will be added as needed
+ foreach (
+ array(
+ self::STACKER_NAME_INSERT_NODE,
+ ) as $stackerName) {
+ // Init this stack
+ $this->getStackerInstance()->initStacker($stackerName);
+ } // END - foreach
+ }
+
/**
* Updates/refreshes DHT data (e.g. status).
*
// Set some dummy configuration entries, e.g. dht_status
$this->getConfigInstance()->setConfigEntry('dht_status', $this->getStateInstance()->getStateName());
}
+
+ /**
+ * Checks whether there are entries in "INSERT" node data stack
+ *
+ * @return $isPending Whether there are pending entries
+ */
+ public function ifInsertNodeDataPending () {
+ // Determine it if it is not empty
+ $isPending = ($this->getStackerInstance()->isStackEmpty(self::STACKER_NAME_INSERT_NODE) === FALSE);
+
+ // Return status
+ return $isPending;
+ }
+
+ /**
+ * Inserts a single entry of node data into the DHT
+ *
+ * @return void
+ */
+ public function insertSingleNodeData () {
+ // Get next node data from stack
+ $nodeData = $this->getStackerInstance()->popNamed(self::STACKER_NAME_INSERT_NODE);
+
+ die(__METHOD__ . ':nodeData=' . print_r($nodeData, TRUE));
+ }
}
// [EOF]
*
* @param $nodeList An array from an earlier database result instance
* @return void
- * @todo ~10% done
*/
public function insertNodeList (array $nodeList) {
// If no node is in the list (array), skip the rest of this method
return;
} // END - if
- // @TODO Not finish yet
- $this->partialStub('DHT: Needs implementing to insert ' . count($nodeList) . ' entry(-ies) into DHT.');
+ // Put them all into a stack
+ foreach ($nodeList as $nodeData) {
+ // Insert all entries
+ $this->getStackerInstance()->pushNamed(self::STACKER_NAME_INSERT_NODE, $nodeData);
+ } // END - foreach
}
}
$this->getDhtInstance()->registerNodeByMessageData($messageData, $this, TRUE);
// Prepare next message ("hello" message to all returned nodes)
- $this->prepareNextMessage($messageData, $packageInstance);
+ //$this->prepareNextMessage($messageData, $packageInstance);
}
/**
*/
protected function initMessageConfigurationData (array $messageData) {
// Debug message
- //* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput(' REQUEST-HANDLER[' . __LINE__ . ']: messageData=' . print_r($messageData, true));
+ //* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('REQUEST-HANDLER[' . __LINE__ . ']: messageData=' . print_r($messageData, true));
// "Walk" throught the config-copy array
foreach ($this->configCopy as $targetKey => $sourceKey) {
$this->getConfigInstance()->setConfigEntry($targetKey, $this->getConfigInstance()->getConfigEntry($sourceKey));
} // END - foreach
+ // Debug message
+ /* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('REQUEST-HANDLER[' . __LINE__ . ']: Got a node list of ' . count($nodeList) . ' entry/-ies back.');
+
// Query local DHT for nodes except given session id
- $nodeList = $this->getDhtInstance()->queryLocalNodeListExceptByMessageData($messageData, $this, XmlRequestNodeListTemplateEngine::REQUEST_DATA_SESSION_ID, XmlRequestNodeListTemplateEngine::REQUEST_DATA_ACCEPTED_OBJECT_TYPES, BaseHubNode::OBJECT_LIST_SEPARATOR);
+ $nodeList = $this->getDhtInstance()->queryLocalNodeListExceptByMessageData(
+ $messageData,
+ $this,
+ XmlRequestNodeListTemplateEngine::REQUEST_DATA_SESSION_ID,
+ XmlRequestNodeListTemplateEngine::REQUEST_DATA_ACCEPTED_OBJECT_TYPES,
+ BaseHubNode::OBJECT_LIST_SEPARATOR
+ );
// Set it serialized in configuration (temporarily)
$this->getConfigInstance()->setConfigEntry('node_list', base64_encode(serialize($nodeList)));
$currentTask['task_instance']->accept($this->getVisitorInstance());
// Remember this task
- $tasks[] = $currentTask;
+ array_push($tasks, $currentTask);
// Advance to next one
$this->getListInstance()->getIterator()->next();
*/
public function accept (Visitor $visitorInstance) {
// Debug message
- //* DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput(strtoupper($this->getProtocol()) . '-LISTENER: ' . $visitorInstance->__toString() . ' has visited ' . $this->__toString() . ' - START');
+ //* DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput(strtoupper($this->getProtocol()) . '-LISTENER[' . __LINE__ . ']: ' . $visitorInstance->__toString() . ' has visited ' . $this->__toString() . ' - START');
// Visit this listener
$visitorInstance->visitListener($this);
} // END - if
// Debug message
- //* DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput(strtoupper($this->getProtocol()) . '-LISTENER: ' . $visitorInstance->__toString() . ' has visited ' . $this->__toString() . ' - FINISHED');
+ //* DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput(strtoupper($this->getProtocol()) . '-LISTENER[' . __LINE__ . ']: ' . $visitorInstance->__toString() . ' has visited ' . $this->__toString() . ' - FINISHED');
}
/**
* that all connections on this port are now our resposibility to
* send/recv data, disconnect, etc..
*/
- self::createDebugInstance(__CLASS__)->debugOutput('TCP-LISTENER: Binding to address ' . $this->getListenAddress() . ':' . $this->getListenPort());
+ self::createDebugInstance(__CLASS__)->debugOutput('TCP-LISTENER[' . __LINE__ . ']: Binding to address ' . $this->getListenAddress() . ':' . $this->getListenPort());
if (!socket_bind($mainSocket, $this->getListenAddress(), $this->getListenPort())) {
// Handle this socket error with a faked recipientData array
$this->handleSocketError(__METHOD__, __LINE__, $mainSocket, array('0.0.0.0', '0'));
} // END - if
// Start listen for connections
- self::createDebugInstance(__CLASS__)->debugOutput('TCP-LISTENER: Listening for connections.');
+ self::createDebugInstance(__CLASS__)->debugOutput('TCP-LISTENER[' . __LINE__ . ']: Listening for connections.');
if (!socket_listen($mainSocket)) {
// Handle this socket error with a faked recipientData array
$this->handleSocketError(__METHOD__, __LINE__, $mainSocket, array('0.0.0.0', '0'));
} // END - if
// Now, we want non-blocking mode
- self::createDebugInstance(__CLASS__)->debugOutput('TCP-LISTENER: Setting non-blocking mode.');
+ self::createDebugInstance(__CLASS__)->debugOutput('TCP-LISTENER[' . __LINE__ . ']: Setting non-blocking mode.');
if (!socket_set_nonblock($mainSocket)) {
// Handle this socket error with a faked recipientData array
$this->handleSocketError(__METHOD__, __LINE__, $mainSocket, array('0.0.0.0', '0'));
$this->setHandlerInstance($handlerInstance);
// Output message
- self::createDebugInstance(__CLASS__)->debugOutput('TCP-LISTENER: TCP listener now ready on IP ' . $this->getListenAddress() . ', port ' . $this->getListenPort() . ' for service.');
+ self::createDebugInstance(__CLASS__)->debugOutput('TCP-LISTENER[' . __LINE__ . ']: TCP listener now ready on IP ' . $this->getListenAddress() . ', port ' . $this->getListenPort() . ' for service.');
}
/**
// Some new peers found?
if ($left < 1) {
// Debug message
- //* EXTREME-NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('TCP-LISTENER: left=' . $left . ',serverSocket=' . $this->getSocketResource() . ',readers=' . print_r($readers, true));
+ //* EXTREME-NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('TCP-LISTENER[' . __LINE__ . ']: left=' . $left . ',serverSocket=' . $this->getSocketResource() . ',readers=' . print_r($readers, true));
// Nothing new found
return;
} // END - if
// Debug message
- //* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('TCP-LISTENER: serverSocket=' . $this->getSocketResource() . ',readers=' . print_r($readers, true));
+ //* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('TCP-LISTENER[' . __LINE__ . ']: serverSocket=' . $this->getSocketResource() . ',readers=' . print_r($readers, true));
// Do we have changed peers?
if (in_array($this->getSocketResource(), $readers)) {
$currentSocket = $this->getIteratorInstance()->current();
// Handle it here, if not main server socket
- //* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('TCP-LISTENER: currentSocket=' . $currentSocket[BasePool::SOCKET_ARRAY_RESOURCE] . ',type=' . $currentSocket[BasePool::SOCKET_ARRAY_CONN_TYPE] . ',serverSocket=' . $this->getSocketResource());
+ //* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('TCP-LISTENER[' . __LINE__ . ']: currentSocket=' . $currentSocket[BasePool::SOCKET_ARRAY_RESOURCE] . ',type=' . $currentSocket[BasePool::SOCKET_ARRAY_CONN_TYPE] . ',serverSocket=' . $this->getSocketResource());
if (($currentSocket[BasePool::SOCKET_ARRAY_CONN_TYPE] != BaseConnectionHelper::CONNECTION_TYPE_SERVER) && ($currentSocket[BasePool::SOCKET_ARRAY_RESOURCE] != $this->getSocketResource())) {
// ... or else it will raise warnings like 'Transport endpoint is not connected'
$this->getHandlerInstance()->processRawDataFromResource($currentSocket);
* that all connections on this port are now our resposibility to
* send/recv data, disconnect, etc..
*/
- self::createDebugInstance(__CLASS__)->debugOutput('UDP-LISTENER: Binding to address ' . $this->getListenAddress() . ':' . $this->getListenPort());
+ self::createDebugInstance(__CLASS__)->debugOutput('UDP-LISTENER[' . __LINE__ . ']: Binding to address ' . $this->getListenAddress() . ':' . $this->getListenPort());
if (!socket_bind($mainSocket, $this->getListenAddress(), $this->getListenPort())) {
// Handle the socket error with a faked recipientData array
$this->handleSocketError(__METHOD__, __LINE__, $mainSocket, array('0.0.0.0', '0'));
} // END - if
// Now, we want non-blocking mode
- self::createDebugInstance(__CLASS__)->debugOutput('UDP-LISTENER: Setting non-blocking mode.');
+ self::createDebugInstance(__CLASS__)->debugOutput('UDP-LISTENER[' . __LINE__ . ']: Setting non-blocking mode.');
if (!socket_set_nonblock($mainSocket)) {
// Handle the socket error with a faked recipientData array
$this->handleSocketError(__METHOD__, __LINE__, $mainSocket, array('0.0.0.0', '0'));
} // END - if
// Set the option to reuse the port
- self::createDebugInstance(__CLASS__)->debugOutput('UDP-LISTENER: Setting re-use address option.');
+ self::createDebugInstance(__CLASS__)->debugOutput('UDP-LISTENER[' . __LINE__ . ']: Setting re-use address option.');
if (!socket_set_option($mainSocket, SOL_SOCKET, SO_REUSEADDR, 1)) {
// Handle the socket error with a faked recipientData array
$this->handleSocketError(__METHOD__, __LINE__, $mainSocket, array('0.0.0.0', '0'));
$this->setHandlerInstance($handlerInstance);
// Output message
- self::createDebugInstance(__CLASS__)->debugOutput('UDP-LISTENER: UDP listener now ready on IP ' . $this->getListenAddress() . ', port ' . $this->getListenPort() . ' for service.');
+ self::createDebugInstance(__CLASS__)->debugOutput('UDP-LISTENER[' . __LINE__ . ']: UDP listener now ready on IP ' . $this->getListenAddress() . ', port ' . $this->getListenPort() . ' for service.');
}
/**
return;
} elseif ($lastError > 0) {
// Other error detected
- self::createDebugInstance(__CLASS__)->debugOutput('UDP-LISTENER: Error detected: ' . socket_strerror($lastError));
+ self::createDebugInstance(__CLASS__)->debugOutput('UDP-LISTENER[' . __LINE__ . ']: Error detected: ' . socket_strerror($lastError));
// Skip further processing
return;
} // END - if
// Debug only
- self::createDebugInstance(__CLASS__)->debugOutput('UDP-LISTENER: Handling UDP package with size ' . strlen($rawData) . ' from peer ' . $peer . ':' . $port);
+ self::createDebugInstance(__CLASS__)->debugOutput('UDP-LISTENER[' . __LINE__ . ']: Handling UDP package with size ' . strlen($rawData) . ' from peer ' . $peer . ':' . $port);
}
/**
$this->listGroups[$groupName]->addEntry($subGroup, $hash);
// Add the hash to the index
- $this->listIndex[] = $hash;
+ array_push($this->listIndex, $hash);
// Add the instance itself to the list
$this->listEntries[$hash] = $instance;
// Is the list entry set?
if ($this->isHashValid($hash)) {
// Add it
- $array[] = $this->listEntries[$hash];
+ array_push($array, $this->listEntries[$hash]);
//* DEBUG: */ print __METHOD__.": ADDED!\n";
} // END - if
} // END - foreach
//* DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput('[' . __METHOD__ . ':' . __LINE__ . ']: groupName=' . $groupName . ', entry=' . $entry . ', hash=' . $hash);
// Add the hash to the index
- $this->listIndex[] = $hash;
+ array_push($this->listIndex, $hash);
//* DEBUG: */ print $groupName.'/'.$this->count().chr(10);
// Now add the entry to the list
// Debug message
//* NOISY-DEBUG: */ self::createDebugInstance(__CLASS__)->debugOutput(__METHOD__ . ': Appending ' . strlen($rawData) . ' bytes of a chunk, finalHash=' . $finalHash . ' ...');
$this->chunks[$finalHash][] = $rawData;
- $this->chunkHashes[$finalHash][] = $rawDataHash;
+ array_push($this->chunkHashes[$finalHash], $rawDataHash);
}
}
// "Walk" through all socket arrays
foreach ($socketArrays as $socketArray) {
// Add the socket
- $sockets[] = $socketArray[self::SOCKET_ARRAY_RESOURCE];
+ array_push($sockets, $socketArray[self::SOCKET_ARRAY_RESOURCE]);
} // END - foreach
// Return it
// Does it match?
if ($socketArray[self::SOCKET_ARRAY_CONN_TYPE] === $connectionType) {
// Add the socket
- $sockets[] = $socketArray[self::SOCKET_ARRAY_RESOURCE];
+ array_push($sockets, $socketArray[self::SOCKET_ARRAY_RESOURCE]);
} // END - if
} // END - foreach
* @return void
*/
public function executeTask () {
+ // "Cache" handler instance
+ $handlerInstance = $this->getHandlerInstance();
+
// Are there chunks to handle or a final array to assemble?
- if ($this->getHandlerInstance()->ifUnassembledChunksAvailable()) {
+ if ($handlerInstance->ifUnassembledChunksAvailable()) {
/*
* Then do the final steps:
*
* 4) If the package is assembled back together, hash it again for
* the very final verification.
*/
- $this->getHandlerInstance()->assembleChunksFromFinalArray();
- } elseif ($this->getHandlerInstance()->ifUnhandledChunksWithFinalAvailable()) {
+ $handlerInstance->assembleChunksFromFinalArray();
+ } elseif ($handlerInstance->ifUnhandledChunksWithFinalAvailable()) {
/*
* Then handle them (not all!). This should push all chunks into a
* 'final array' for last verification.
*/
- $this->getHandlerInstance()->handleAvailableChunksWithFinal();
- } elseif ($this->getHandlerInstance()->ifRawPackageDataIsAvailable()) {
+ $handlerInstance->handleAvailableChunksWithFinal();
+ } elseif ($handlerInstance->ifRawPackageDataIsAvailable()) {
/*
* The final raw package data is back together again. So feed it
* into the next stack for further decoding/processing
*/
- $this->getHandlerInstance()->handledAssembledRawPackageData();
+ $handlerInstance->handledAssembledRawPackageData();
}
}
}
* @return void
*/
public function executeTask () {
+ // "Cache" decoder instance
+ $decoderInstance = $this->getDecoderInstance();
+
// Check if the stacker has some entries left
- if ($this->getDecoderInstance()->ifUnhandledRawPackageDataLeft()) {
+ if ($decoderInstance->ifUnhandledRawPackageDataLeft()) {
// Then handle it
- $this->getDecoderInstance()->handleRawPackageData();
- } elseif ($this->getDecoderInstance()->ifDeocedPackagesLeft()) {
+ $decoderInstance->handleRawPackageData();
+ } elseif ($decoderInstance->ifDeocedPackagesLeft()) {
// Some decoded packages have arrived (for this peer)
- $this->getDecoderInstance()->handleDecodedPackage();
+ $decoderInstance->handleDecodedPackage();
}
}
}
public function executeTask () {
// "Cache" the DHT instance
$dhtInstance = $this->getDhtInstance();
+
+ // Are there "INSERT" node data entries?
+ if ($dhtInstance->ifInsertNodeDataPending()) {
+ // Then insert a single entry
+ $dhtInstance->insertSingleNodeData();
+ } // END - if
}
}