* @version 0.0 * @copyright Copyright(c) 2007, 2008 Roland Haeder, this is free software * @license GNU GPL 3.0 or any newer version * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU General Public License as published by * the Free Software Foundation, either version 3 of the License, or * (at your option) any later version. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU General Public License for more details. * * You should have received a copy of the GNU General Public License * along with this program. If not, see . */ class HubCoreLoop extends BaseFrameworkSystem { /** * Wether the hub is active and running */ private $hubActivated = false; /** * Wether the hub is shutting down */ private $hubShutsDown = false; /** * A list of all connected peers (sockets) */ private $connectedPeers = null; /** * A console output handler */ private $outputInstance = null; /** * Reading peers */ private $readPeers = array(); /** * Writing peers */ private $writePeers = array(); /** * The main socket (listening) for this hub */ private $main_socket = null; /** * A list of authenticated peers */ private $authPeers = null; /** * Last peer instance */ private $lastPeerInstance = null; /** * Wether this hub is a master hub */ private $hubIsMaster = false; /** * Maximum auth retries (cached) */ private $authRetries = 0; /** * Auth request message */ private $authRequest = ""; /** * Cached timeout for auth requests */ private $authRequestTimeout = 0; /** * An instance to the HubConnector class for master hub connection */ private $masterConnector = null; // Exception codes const EXCEPTION_SOCKET_PROBLEM = 0xb00; const EXCEPTION_HUB_PEER_TIMEOUT = 0xb01; const EXCEPTION_HUB_PEER_FAILED_AUTH = 0xb02; const EXCEPTION_HELLO_TIMED_OUT = 0xb03; /** * The private constructor * * @return void */ protected function __construct () { // Call parent constructor parent::__construct(__CLASS__); // Set description $this->setObjectDescription("Hub-Core Loop"); // Set unique ID $this->generateUniqueId(); // Tidy up a little $this->removeSystemArray(); $this->removeNumberFormaters(); // Init the peer list $this->initPeerList(); } /** * Factory for main loop * * @return $hubInstance An instance of this class */ public final static function createHubCoreLoop () { // Get an instance $hubInstance = new HubCoreLoop(); // Try to setup the socket $hubInstance->setupHub(); // Get the configuration variable $outEngine = $hubInstance->getConfigInstance()->readConfig("tpl_engine"); // Setup the console output handler $eval = sprintf("\$hubInstance->setOutputInstance(%s::create%s(\"%s\"));", $outEngine, $outEngine, $hubInstance->getConfigInstance()->readConfig("web_content_type") ); // Debug message if ((defined('DEBUG_EVAL')) || (defined('DEBUG_ALL'))) $hubInstance->getDebugInstance()->output(sprintf("[%s:] Konstruierte PHP-Anweisung: %s", $hubInstance->__toString(), htmlentities($eval) )); @eval($eval); // Return the prepared instance return $hubInstance; } /** * Initializes the peer list * * @return void */ private final function initPeerList () { $this->connectedPeers = new FrameworkArrayObject("FakedConnectedPeers"); $this->authPeers = new FrameworkArrayObject("FakeAuthPeers"); } /** * Get total number of connected peers * * @return $num The total number of connected peers */ private final function getTotalConnectedPeers () { $read = array($this->connectedPeers->getIterator()->current()); $write = array(); $except = array(); // Get socket number $num = socket_select( $read, $write, $except, 0 ); // Transfer readers and writers $this->readPeers = $read; $this->writePeers = $write; // Return the number return $num; } /** * Handle newly connected peers * * @return void */ private final function handleNewPeers () { // Output message $this->getOutputInstance()->output(sprintf("[%s] Validating peer...", __METHOD__)); // Is the main socket in the array? if (in_array($this->main_socket, $this->readPeers)) { // Accept the connection and add him to the list $peer_socket = socket_accept($this->main_socket); // Get a new peer instance $this->setCurrPeerInstance(HubPeer::createHubPeerBySocket($peer_socket, $this)); // Register the new peer $this->getOutputInstance()->output(sprintf("[%s] Registering new peer with IP %s.", __METHOD__, $this->getCurrPeerInstance()->getValidatedIP() )); $this->connectedPeers->append($this->lastPeerInstance); // A new peer has connected $this->getOutputInstance()->output(sprintf("[%s] New peer with IP %s has been registered.", __METHOD__, $this->getCurrPeerInstance()->getValidatedIP() )); // Remove him from the list $key = array_search($this->main_socket, $this->readPeers); unset($this->readPeers[$key]); } // END - if } /** * Handles unauthenticated peers * * @return void * @throws HubPeerTimeoutException If the peer times out to answer a request * @throws HubPeerAuthorizationException If the peer fails to authorize himself (to many retries) */ private final function handleUnauthPeers () { // Are there some peers? if ($this->connectedPeers->count() > 1) { // Iterate through all connected peers for ($idx = $this->connectedPeers->getIterator(); $idx->valid(); $idx->next()) { // Get current peer $this->lastPeerInstance = $idx->current(); // Ignore own socket and invalid entries if (($this->getCurrPeerInstance() !== $this->main_socket) && (is_object($this->getCurrPeerInstance())) && ($this->getCurrPeerInstance() instanceof HubPeer)) { // Is this peer already authorized or is this the master hub? // If this is the master hub then there is no auth required if ((!$this->getCurrPeerInstance()->ifPeerIsAuthorized()) && (!$this->getCurrPeerInstance()->ifPeerIsLocalAdmin()) && (!$this->hubIsMaster)) { // This peer waits for authorization, so does he have some tries left? if ($this->getCurrPeerInstance()->getAuthRetries() <= $this->authRetries) { // This peer is still allowed to try his authorization if ($this->getCurrPeerInstance()->getLastSentMessage() == $this->authRequest) { // Already asked so maybe timed out? if ((time() - $this->getCurrPeerInstance()->getLastSentMessageStamp()) >= $this->authRequestTimeout) { // Timed out so disconnect the peer throw new HubPeerTimeoutException ( array( 'this' => $this, 'peer' => $this->getCurrPeerInstance() ), self::EXCEPTION_HUB_PEER_TIMEOUT ); } // END - if } elseif ($this->getCurrPeerInstance()->ifHelloReceived()) { // HELLO received so we need to sent the AUTH request $this->getCurrPeerInstance()->askAuthorizationKey(); } } else { // This peer needs disconnecting! throw new HubPeerAuthorizationException ( array( 'this' => $this, 'peer' => $this->getCurrPeerInstance(), 'max' => $this->authRetries ), self::EXCEPTION_HUB_PEER_FAILED_AUTH ); } // END - else } elseif ((!$this->getCurrPeerInstance()->ifPeerIsAuthorized()) && ($this->getCurrPeerInstance()->ifPeerIsLocalAdmin())) { // This peer is a local admin so he is always authorized! $this->getCurrPeerInstance()->enableLocalAdmin(); // Output debug message $this->getOutputInstance()->output(sprintf("[%s] A local admin has connected.", $this->__toString() )); // Say "hi" to the admin $this->getCurrPeerInstance()->sayHi2Admin(); } elseif (($this->hubIsMaster) && ($this->getCurrPeerInstance()->ifHelloReceived()) && (!$this->getCurrPeerInstance()->ifELHOsent())) { // Is the master hub so authorize the peer here... $this->getCurrPeerInstance()->enableIsAuthorized(); $this->authPeers->append($this->getCurrPeerInstance()); // Reply the HELLO request $this->getCurrPeerInstance()->replyHelloMessage(); } } // END - if } // END - for } // END - if } /** * Handles only authorized peers * * @return void */ public function handleAuthPeers () { // Are there some peers? if (($this->connectedPeers->count() > 1) && ($this->authPeers->count() > 0)) { // Iterate through all connected peers for ($idx = $this->authPeers->getIterator(); $idx->valid(); $idx->next()) { // Get current peer $this->lastPeerInstance = $idx->current(); // Do the ping and update our authPeer list (LATER!) $this->lastPeerInstance->handlePingPeer(); // Avoids a notice... if (is_null($this->getCurrPeerInstance())) break; } // END - for } // END - for } /** * Setup the hub: Create a socket for listening on incoming requests, * try to bind to a port, etc. * * @return void * @throws SocketCreationException If a socket cannot be created * @throws SocketSetupException If a socket cannot be setuped * @throws SocketBindException If a socket cannot be bind to * an address and port * @throws SocketListeningException If listening to a socket fails */ private final function setupHub () { // Create a new TCP socket $main_socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP); // Was it a success? if (!is_resource($main_socket)) { // This fails! :( throw new SocketCreationException( array( 'this' => $this, 'code' => socket_last_error() ), self::EXCEPTION_SOCKET_PROBLEM ); } // Set socket options if (!socket_set_option($main_socket, SOL_SOCKET, SO_REUSEADDR, 1)) { // Close the socket $this->closeSocket($main_socket); // Problems setting socket options throw new SocketSetupException ( array( 'this' => $this, 'code' => socket_last_error() ), self::EXCEPTION_SOCKET_PROBLEM ); } // Set to non-blocking socket_set_nonblock($main_socket); // Bind the socket to an address if (!socket_bind($main_socket, $this->getConfigInstance()->readConfig("hub_listen_addr"), $this->getConfigInstance()->readConfig("hub_listen_port"))) { // Bind failed throw new SocketBindException ( array( 'this' => $this, 'host' => $this->getConfigInstance()->readConfig("hub_listen_addr"), 'port' => $this->getConfigInstance()->readConfig("hub_listen_port"), 'code' => socket_last_error() ), self::EXCEPTION_SOCKET_PROBLEM ); } // Listen to ... the socket ;-) if (!socket_listen($main_socket)) { // Opps, that didn't work. Next time better listen to your heart... Roxette throw new SocketListeningException( array( 'this' => $this, 'code' => socket_last_error() ), self::EXCEPTION_SOCKET_PROBLEM ); } // Ignore user abort and do not time out @set_time_limit(0); @ignore_user_abort(true); // Cache more configuration stuff $this->authRetries = $this->getConfigInstance()->readConfig("hub_max_auth_tries"); $this->authRequest = $this->getConfigInstance()->readConfig("hub_auth_request"); $this->authRequestTimeout = $this->getConfigInstance()->readConfig("hub_auth_request_timeout"); // The hub is running now! $this->hubActivated = true; // Append the main hub $this->main_socket = $main_socket; $this->connectedPeers->append($main_socket); } /** * Removes the current peer from the list * * @return void */ public final function removePeerFromConnectList () { // Iterate through all connected peers for ($idx = $this->connectedPeers->getIterator(); $idx->valid(); $idx->next()) { // Get current peer from list $peer = $idx->current(); // Is it a peer instance? if (($peer !== $this->main_socket) && (is_object($peer)) && ($peer instanceof HubPeer)) { // Okay, we have a peer instance so is it the same? if ($this->getCurrPeerInstance()->equals($peer)) { // Remove him! $idx->offsetUnset($idx->key()); // Remove the last instance as well $this->lastPeerInstance = null; // Remove the socket as well $this->removeSocket($peer); // Stop searching here break; } } } } /** * Getter for console output handler * * @return $outputInstance An instance of the output handler */ public final function getOutputInstance () { return $this->outputInstance; } /** * Setter for console output handler * * @param $outputInstance An instance of the output handler */ public final function setOutputInstance ($outputInstance) { $this->outputInstance = $outputInstance; } /** * Getter for current peer instance to HubPeer class * * @return $lastPeerInstance Last peer instance */ public final function getCurrPeerInstance () { return $this->lastPeerInstance; } /** * Setter for current peer instance to HubPeer class * * @param $lastPeerInstance Last peer instance * @return void */ public final function setCurrPeerInstance (HubPeer $lastPeerInstance) { $this->lastPeerInstance = $lastPeerInstance; } /** * Checks wether the hub is running and not in shutdown phase * * @return $isRunning If the hub is running and not in shutdown phase */ public function ifHubIsRunning () { return ((($this->hubActivated) || (!$this->hubShutsDown)) && ($this->connectedPeers->count() > 0)); } /** * Output the intro text * * @return void */ public final function outputIntro () { if ($this->getConfigInstance()->readConfig("hub_intro_enabled") == "Y") { // Output intro text $this->getOutputInstance()->output(sprintf("%s v%s Copyright (c) 2007, 2008 by Roland Häder", ApplicationHelper::getInstance()->getAppName(), ApplicationHelper::getInstance()->getAppVersion() )); $this->getOutputInstance()->output(""); $this->getOutputInstance()->output("This software is free software licensed under the GNU GPL. In telnet session enter "/license" to read the license."); $this->getOutputInstance()->output("This software uses the MXChange Framework, Copyright (c) 2007, 2008 by Roland Häder which is licensed under the GNU GPL."); $this->getOutputInstance()->output("Enter "/framework" in telnet session to read that license."); $this->getOutputInstance()->output(""); $this->getOutputInstance()->output("All core systems are initialized. Input on *this* console will currently be ignored!"); $this->getOutputInstance()->output(""); $this->getOutputInstance()->output(sprintf("[%s] Listening on: %s:%d", $this->__toString(), $this->getConfigInstance()->readConfig("hub_listen_addr"), $this->getConfigInstance()->readConfig("hub_listen_port") )); $this->getOutputInstance()->output("----------------------------------------------------------------------------------------------------------------------------"); } } /** * Do the main loop * * @return void * @throws SocketCreationException If the main socket is not a resource */ public function coreLoop () { // Is the main socket vailid? if (!is_resource($this->main_socket)) { // Is not valid! throw new SocketCreationException( array( 'this' => $this, 'code' => socket_last_error() ), self::EXCEPTION_SOCKET_PROBLEM ); } // END - if // We are ready to serve requests $this->getOutputInstance()->output(sprintf("[%s] Ready to serve requests.", $this->__toString() )); // Wait until the hub is shutting down while ($this->ifHubIsRunning()) { // Get number of total connected peers $num = $this->getTotalConnectedPeers(); try { // Handle the master hub connection if ($this->masterConnector instanceof HubConnector) { $this->masterConnector->handleMasterRequests(); } // Check for unauthorized peers $this->handleUnauthPeers(); // Handle authorized peers $this->handleAuthPeers(); } catch (HubPeerAuthorizationException $e) { // Authorization has failed $this->disconnectPeerWithReason("hub_msg_auth_tries"); // Get new total connected peers $num = $this->getTotalConnectedPeers(); } catch (HubPeerTimeoutException $e) { // Disconnect and remove the peer $this->disconnectPeerWithReason("hub_msg_auth_reply_timeout"); // Get new total connected peers $num = $this->getTotalConnectedPeers(); } catch (HubMasterDisconnectedException $e) { // The master hub has disconnected us... :( $this->masterConnector = null; $this->getOutputInstance()->output(sprintf("[%s] The master has disconnected us. Reason given: %s", $this->__toString(), $e->getMessage() )); // Get new total connected peers $num = $this->getTotalConnectedPeers(); } catch (BrokenPipeException $e) { // Broken pipes are bad for us... :( $this->removePeerFromConnectList(); $this->getOutputInstance()->output(sprintf("[%s] A peer has closed the connection unexpected: %s", $this->__toString(), $e->getMessage() )); // Get new total connected peers $num = $this->getTotalConnectedPeers(); } catch (FrameworkException $e) { // Catch all other exceptions and output them hub_exception_handler($e); // Get new total connected peers $num = $this->getTotalConnectedPeers(); } // Are there some peers? if ($num < 1) { // Wait for more peers continue; } // END - if // A new peer has connected $this->getOutputInstance()->output(sprintf("[%s] A new peer is connecting.", $this->__toString() )); try { // Check for new peers $this->handleNewPeers(); } catch (IPSpoofingException $e) { // Output message $this->getOutputInstance()->output(sprintf("[%s] The peer's IP number has changed!", $this->__toString() )); // Output debug message $this->getDebugInstance()->output(sprintf("[%s] Peer spoofes IP number: %s", $this->__toString(), $e->getMessage() )); // Disconnect the peer first $this->disconnectPeerWithReason("hub_msg_spoofing"); // Get new total connected peers $num = $this->getTotalConnectedPeers(); } catch (FrameworkException $e) { // Catch all exceptions and output them to avoid aborting the program unexpectly hub_exception_handler($e); // Get new total connected peers $num = $this->getTotalConnectedPeers(); } } // END - while } /** * Tries to contact the master server or simply reports that we are the master server * * @return void */ public function contactMasterHub () { // Checks wether we are the master hub if ($_SERVER['SERVER_ADDR'] == $this->getConfigInstance()->readConfig("hub_master_ip")) { // We are master! $this->hubIsMaster = true; $this->getOutputInstance()->output(sprintf("[%s] Our IP %s matches the master IP. Becoming master hub...", $this->__toString(), $_SERVER['SERVER_ADDR'] )); } else { // A regular hub or ultra hub so let's contact the master $this->getOutputInstance()->output(sprintf("[%s] Contacting the master hub at %s:%d...", $this->__toString(), $this->getConfigInstance()->readConfig("hub_master_ip"), $this->getConfigInstance()->readConfig("hub_master_port") )); // Try to aquire a connection to the master... try { // Announce us to the master hub $this->announceToMasterHub(); } catch (FrameworkException $e) { // Catch all exceptions and output them hub_exception_handler($e); } } // END - else } /** * Disconnects the current with a configurable reason * * @param $reasonEntry The entry with the disconnection reason aka. message to the peer * @return void */ public function disconnectPeerWithReason ($reasonEntry) { // Default is that we cannot read the peer's IP number $ip = "0.0.0.0"; // Try to disconnect here try { // First get the raw IP number $ip = $this->getCurrPeerInstance()->getValidatedIP(); // Disconnect the peer... $this->getCurrPeerInstance()->disconnectWithReason($this->getConfigInstance()->readConfig($reasonEntry)); } catch (FrameworkException $e) { // Catch all exceptions and output them hub_exception_handler($e); } // Remove him from the list anyway $this->removePeerFromConnectList(); // Output the message $this->getOutputInstance()->output(sprintf("[%s] Peer %s has been disconnected.", $this->__toString(), $ip )); } /** * Announces this hub to the master hub. This is being done by connecting to it and asking for auth request * * @return void */ public function announceToMasterHub () { try { // Create a new instance for communicating to the master hub $this->masterConnector = HubConnector::createHubConnectorByAddressPort( $this->getConfigInstance()->readConfig("hub_master_ip"), $this->getConfigInstance()->readConfig("hub_master_port"), $this ); // Send all our accepted objects to the master hub $this->masterConnector->sendAllAcceptedObjects(); } catch (SocketConnectException $e) { // The master hub is down! ApplicationEntryPoint::app_die($e->getMessage()); } } /** * Removes a given peer instance (socket) from all lists * * @param $peerInstance An instance of a HubPeer class * @return void */ private function removeSocket (HubPeer $peerInstance) { // Get socket from peer $socket = $peerInstance->getPeerSocket(); // Search for readers $key = array_search($socket, $this->readPeers, true); if ($key !== false) { // Remove from reader list unset($this->readPeers[$key]); } // Search for writers $key = array_search($socket, $this->writePeers, true); if ($key !== false) { // Remove from writer list unset($this->writePeers[$key]); } // Remove from auth peers as well for ($idx = $this->authPeers->getIterator(); $idx->valid(); $idx->next()) { // Get current entry $current = $idx->current(); // Is it the same? if ($current->equals($peerInstance)) { // Remove from auth (-awaiting) list $idx->offsetUnset($idx->key()); break; } } } } // END - class // [EOF] ?>