3 * The hub's main loop. The hub will wait and listen for incoming requests in
6 * @author Roland Haeder <roland __NOSPAM__ [at] __REMOVE_ME__ mxchange [dot] org>
9 class HubCoreLoop extends BaseFrameworkSystem {
11 * Wether the hub is active and running
13 private $hubActivated = false;
16 * Wether the hub is shutting down
18 private $hubShutsDown = false;
21 * A list of all connected peers (sockets)
23 private $connectedPeers = null;
26 * A console output handler
28 private $outputInstance = null;
33 private $readPeers = array();
38 private $writePeers = array();
41 * The main socket (listening) for this hub
43 private $main_socket = null;
46 * A list of authenticated peers
48 private $authPeers = null;
53 private $lastPeerInstance = null;
56 * Wether this hub is a master hub
58 private $hubIsMaster = false;
61 * Maximum auth retries (cached)
63 private $authRetries = 0;
66 * Auth request message
68 private $authRequest = "";
71 * Cached timeout for auth requests
73 private $authRequestTimeout = 0;
76 * An instance to the HubConnector class for master hub connection
78 private $masterConnector = null;
81 const EXCEPTION_SOCKET_PROBLEM = 0xb00;
82 const EXCEPTION_HUB_PEER_TIMEOUT = 0xb01;
83 const EXCEPTION_HUB_PEER_FAILED_AUTH = 0xb02;
84 const EXCEPTION_HELLO_TIMED_OUT = 0xb03;
87 * The private constructor
91 private function __construct () {
92 // Call parent constructor
93 parent::constructor(__CLASS__);
96 $this->setPartDescr("Hub-Core Loop");
99 $this->createUniqueID();
102 $this->removeSystemArray();
103 $this->removeNumberFormaters();
105 // Init the peer list
106 $this->initPeerList();
110 * Factory for main loop
112 * @return $hubInstance An instance of this class
114 public final static function createHubCoreLoop () {
116 $hubInstance = new HubCoreLoop();
118 // Try to setup the socket
119 $hubInstance->setupHub();
121 // Get the configuration variable
122 $outEngine = $hubInstance->getConfigInstance()->readConfig("tpl_engine");
124 // Setup the console output handler
125 $eval = sprintf("\$hubInstance->setOutputInstance(%s::create%s(\"%s\"));",
128 $hubInstance->getConfigInstance()->readConfig("web_content_type")
132 if ((defined('DEBUG_EVAL')) || (defined('DEBUG_ALL'))) $hubInstance->getDebugInstance()->output(sprintf("[%s:] Konstruierte PHP-Anweisung: %s",
133 $hubInstance->__toString(),
138 // Return the prepared instance
143 * Initializes the peer list
147 private final function initPeerList () {
148 $this->connectedPeers = new FrameworkArrayObject();
149 $this->authPeers = new FrameworkArrayObject();
153 * Get total number of connected peers
155 * @return $num The total number of connected peers
157 private final function getTotalConnectedPeers () {
158 $read = array($this->connectedPeers->getIterator()->current());
163 $num = socket_select(
170 // Transfer readers and writers
171 $this->readPeers = $read;
172 $this->writePeers = $write;
179 * Handle newly connected peers
183 private final function handleNewPeers () {
185 $this->getOutputInstance()->output(sprintf("[%s] Validating peer...", __METHOD__));
187 // Is the main socket in the array?
188 if (in_array($this->main_socket, $this->readPeers)) {
189 // Accept the connection and add him to the list
190 $peer_socket = socket_accept($this->main_socket);
192 // Get a new peer instance
193 $this->setCurrPeerInstance(HubPeer::createHubPeerBySocket($peer_socket, $this));
195 // Register the new peer
196 $this->getOutputInstance()->output(sprintf("[%s] Registering new peer with IP %s.",
198 $this->getCurrPeerInstance()->getValidatedIP()
200 $this->connectedPeers->append($this->lastPeerInstance);
202 // A new peer has connected
203 $this->getOutputInstance()->output(sprintf("[%s] New peer with IP %s has been registered.",
205 $this->getCurrPeerInstance()->getValidatedIP()
208 // Remove him from the list
209 $key = array_search($this->main_socket, $this->readPeers);
210 unset($this->readPeers[$key]);
215 * Handles unauthenticated peers
218 * @throws HubPeerTimeoutException If the peer times out to answer a request
219 * @throws HubPeerAuthorizationException If the peer fails to authorize himself (to many retries)
221 private final function handleUnauthPeers () {
222 // Are there some peers?
223 if ($this->connectedPeers->count() > 1) {
224 // Iterate through all connected peers
225 for ($idx = $this->connectedPeers->getIterator(); $idx->valid(); $idx->next()) {
227 $this->lastPeerInstance = $idx->current();
229 // Ignore own socket and invalid entries
230 if (($this->getCurrPeerInstance() !== $this->main_socket) && (is_object($this->getCurrPeerInstance())) && ($this->getCurrPeerInstance() instanceof HubPeer)) {
231 // Is this peer already authorized or is this the master hub?
232 // If this is the master hub then there is no auth required
233 if ((!$this->getCurrPeerInstance()->ifPeerIsAuthorized()) && (!$this->getCurrPeerInstance()->ifPeerIsLocalAdmin()) && (!$this->hubIsMaster)) {
234 // This peer waits for authorization, so does he have some tries left?
235 if ($this->getCurrPeerInstance()->getAuthRetries() <= $this->authRetries) {
236 // This peer is still allowed to try his authorization
237 if ($this->getCurrPeerInstance()->getLastSentMessage() == $this->authRequest) {
238 // Already asked so maybe timed out?
239 if ((time() - $this->getCurrPeerInstance()->getLastSentMessageStamp()) >= $this->authRequestTimeout) {
240 // Timed out so disconnect the peer
241 throw new HubPeerTimeoutException (
244 'peer' => $this->getCurrPeerInstance()
245 ), self::EXCEPTION_HUB_PEER_TIMEOUT
248 } elseif ($this->getCurrPeerInstance()->ifHelloReceived()) {
249 // HELLO received so we need to sent the AUTH request
250 $this->getCurrPeerInstance()->askAuthorizationKey();
253 // This peer needs disconnecting!
254 throw new HubPeerAuthorizationException (
257 'peer' => $this->getCurrPeerInstance(),
258 'max' => $this->authRetries
259 ), self::EXCEPTION_HUB_PEER_FAILED_AUTH
262 } elseif ((!$this->getCurrPeerInstance()->ifPeerIsAuthorized()) && ($this->getCurrPeerInstance()->ifPeerIsLocalAdmin())) {
263 // This peer is a local admin so he is always authorized!
264 $this->getCurrPeerInstance()->enableLocalAdmin();
266 // Output debug message
267 $this->getOutputInstance()->output(sprintf("[%s] A local admin has connected.",
271 // Say "hi" to the admin
272 $this->getCurrPeerInstance()->sayHi2Admin();
273 } elseif (($this->hubIsMaster) && ($this->getCurrPeerInstance()->ifHelloReceived()) && (!$this->getCurrPeerInstance()->ifELHOsent())) {
274 // Is the master hub so authorize the peer here...
275 $this->getCurrPeerInstance()->enableIsAuthorized();
276 $this->authPeers->append($this->getCurrPeerInstance());
278 // Reply the HELLO request
279 $this->getCurrPeerInstance()->replyHelloMessage();
287 * Handles only authorized peers
291 public function handleAuthPeers () {
292 // Are there some peers?
293 if (($this->connectedPeers->count() > 1) && ($this->authPeers->count() > 0)) {
294 // Iterate through all connected peers
295 for ($idx = $this->authPeers->getIterator(); $idx->valid(); $idx->next()) {
297 $this->lastPeerInstance = $idx->current();
299 // Do the ping and update our authPeer list (LATER!)
300 $this->lastPeerInstance->handlePingPeer();
306 * Setup the hub: Create a socket for listening on incoming requests,
307 * try to bind to a port, etc.
310 * @throws SocketCreationException If a socket cannot be created
311 * @throws SocketSetupException If a socket cannot be setuped
312 * @throws SocketBindException If a socket cannot be bind to
313 * an address and port
314 * @throws SocketListeningException If listening to a socket fails
316 private final function setupHub () {
317 // Create a new TCP socket
318 $main_socket = socket_create(AF_INET, SOCK_STREAM, SOL_TCP);
321 if (!is_resource($main_socket)) {
323 throw new SocketCreationException(
326 'code' => socket_last_error()
327 ), self::EXCEPTION_SOCKET_PROBLEM
331 // Set socket options
332 if (!socket_set_option($main_socket, SOL_SOCKET, SO_REUSEADDR, 1)) {
334 $this->closeSocket($main_socket);
336 // Problems setting socket options
337 throw new SocketSetupException (
340 'code' => socket_last_error()
341 ), self::EXCEPTION_SOCKET_PROBLEM
345 // Set to non-blocking
346 socket_set_nonblock($main_socket);
348 // Bind the socket to an address
349 if (!socket_bind($main_socket, $this->getConfigInstance()->readConfig("hub_listen_addr"), $this->getConfigInstance()->readConfig("hub_listen_port"))) {
351 throw new SocketBindException (
354 'host' => $this->getConfigInstance()->readConfig("hub_listen_addr"),
355 'port' => $this->getConfigInstance()->readConfig("hub_listen_port"),
356 'code' => socket_last_error()
357 ), self::EXCEPTION_SOCKET_PROBLEM
361 // Listen to ... the socket ;-)
362 if (!socket_listen($main_socket)) {
363 // Opps, that didn't work. Next time better listen to your heart... Roxette
364 throw new SocketListeningException(
367 'code' => socket_last_error()
368 ), self::EXCEPTION_SOCKET_PROBLEM
372 // Ignore user abort and do not time out
374 @ignore_user_abort(true);
376 // Cache more configuration stuff
377 $this->authRetries = $this->getConfigInstance()->readConfig("hub_max_auth_tries");
378 $this->authRequest = $this->getConfigInstance()->readConfig("hub_auth_request");
379 $this->authRequestTimeout = $this->getConfigInstance()->readConfig("hub_auth_request_timeout");
381 // The hub is running now!
382 $this->hubActivated = true;
384 // Append the main hub
385 $this->main_socket = $main_socket;
386 $this->connectedPeers->append($main_socket);
390 * Removes the current peer from the list
394 public final function removePeerFromConnectList () {
395 // Iterate through all connected peers
396 for ($idx = $this->connectedPeers->getIterator(); $idx->valid(); $idx->next()) {
397 // Get current peer from list
398 $peer = $idx->current();
400 // Is it a peer instance?
401 if (($peer !== $this->main_socket) && (is_object($peer)) && ($peer instanceof HubPeer)) {
402 // Okay, we have a peer instance so is it the same?
403 if ($this->getCurrPeerInstance()->equals($peer)) {
405 $idx->offsetUnset($idx->key());
406 $this->lastPeerInstance = null;
414 * Getter for console output handler
416 * @return $outputInstance An instance of the output handler
418 public final function getOutputInstance () {
419 return $this->outputInstance;
423 * Setter for console output handler
425 * @param $outputInstance An instance of the output handler
427 public final function setOutputInstance ($outputInstance) {
428 $this->outputInstance = $outputInstance;
432 * Getter for current peer instance to HubPeer class
434 * @return $lastPeerInstance Last peer instance
436 public final function getCurrPeerInstance () {
437 return $this->lastPeerInstance;
441 * Setter for current peer instance to HubPeer class
443 * @param $lastPeerInstance Last peer instance
446 public final function setCurrPeerInstance (HubPeer $lastPeerInstance) {
447 $this->lastPeerInstance = $lastPeerInstance;
451 * Checks wether the hub is running and not in shutdown phase
453 * @return $isRunning If the hub is running and not in shutdown phase
455 public function ifHubIsRunning () {
456 return ((($this->hubActivated) || (!$this->hubShutsDown)) && ($this->connectedPeers->count() > 0));
460 * Output the intro text
464 public final function outputIntro () {
465 if ($this->getConfigInstance()->readConfig("hub_intro_enabled") == "Y") {
467 $this->getOutputInstance()->output(sprintf("%s v%s Copyright (c) 2007, 2008 by Roland Häder",
468 ApplicationHelper::getInstance()->getAppName(),
469 ApplicationHelper::getInstance()->getAppVersion()
471 $this->getOutputInstance()->output("");
472 $this->getOutputInstance()->output("This software is free software licensed under the GNU LGPL. In telnet session enter "/license" to read the license.");
473 $this->getOutputInstance()->output("This software uses the MXChange Framework, Copyright (c) 2007, 2008 by Roland Häder which is licensed under the GNU LGPL.");
474 $this->getOutputInstance()->output("Enter "/framework" in telnet session to read that license.");
475 $this->getOutputInstance()->output("");
476 $this->getOutputInstance()->output("All core systems are initialized. Input on *this* console will currently be ignored!");
477 $this->getOutputInstance()->output("");
478 $this->getOutputInstance()->output(sprintf("[%s] Listening on: %s:%d",
480 $this->getConfigInstance()->readConfig("hub_listen_addr"),
481 $this->getConfigInstance()->readConfig("hub_listen_port")
483 $this->getOutputInstance()->output("----------------------------------------------------------------------------------------------------------------------------");
491 * @throws SocketCreationException If the main socket is not a resource
493 public function coreLoop () {
494 // Is the main socket vailid?
495 if (!is_resource($this->main_socket)) {
497 throw new SocketCreationException(
500 'code' => socket_last_error()
501 ), self::EXCEPTION_SOCKET_PROBLEM
505 // We are ready to serve requests
506 $this->getOutputInstance()->output(sprintf("[%s] Ready to serve requests.",
510 // Wait until the hub is shutting down
511 while ($this->ifHubIsRunning()) {
512 // Get number of total connected peers
513 $num = $this->getTotalConnectedPeers();
516 // Handle the master hub connection
517 if ($this->masterConnector instanceof HubConnector) {
518 $this->masterConnector->handleMasterRequests();
521 // Check for unauthorized peers
522 $this->handleUnauthPeers();
524 // Handle authorized peers
525 $this->handleAuthPeers();
526 } catch (HubPeerAuthorizationException $e) {
527 // Authorization has failed
528 $this->disconnectPeerWithReason("hub_msg_auth_tries");
530 // Get new total connected peers
531 $num = $this->getTotalConnectedPeers();
532 } catch (HubPeerTimeoutException $e) {
533 // Disconnect and remove the peer
534 $this->disconnectPeerWithReason("hub_msg_auth_reply_timeout");
536 // Get new total connected peers
537 $num = $this->getTotalConnectedPeers();
538 } catch (HubMasterDisconnectedException $e) {
539 // The master hub has disconnected us... :(
540 $this->masterConnector = null;
541 $this->getOutputInstance()->output(sprintf("[%s] The master has disconnected us. Reason given: %s",
545 } catch (BrokenPipeException $e) {
546 // Broken pipes are bad for us... :(
547 $this->removePeerFromConnectList();
548 $this->getOutputInstance()->output(sprintf("[%s] A peer has closed the connection unexpected: %s",
553 // Get new total connected peers
554 $num = $this->getTotalConnectedPeers();
555 } catch (FrameworkException $e) {
556 // Catch all other exceptions and output them
557 echo "CATCH".__LINE__.":".$e->__toString()."\n";
558 hub_exception_handler($e);
560 // Get new total connected peers
561 $num = $this->getTotalConnectedPeers();
564 // Are there some peers?
566 // Wait for more peers
570 // A new peer has connected
571 $this->getOutputInstance()->output(sprintf("[%s] A new peer is connecting.",
576 // Check for new peers
577 $this->handleNewPeers();
578 } catch (IPSpoofingException $e) {
580 $this->getOutputInstance()->output(sprintf("[%s] The peer's IP number has changed!",
584 // Output debug message
585 $this->getDebugInstance()->output(sprintf("[%s] Peer spoofes IP number: %s",
590 // Disconnect the peer first
591 $this->disconnectPeerWithReason("hub_msg_spoofing");
593 // Get new total connected peers
594 $num = $this->getTotalConnectedPeers();
595 } catch (FrameworkException $e) {
596 // Catch all exceptions and output them to avoid aborting the program unexpectly
597 echo "CATCH".__LINE__.":".$e->__toString()."\n";
598 hub_exception_handler($e);
600 // Get new total connected peers
601 $num = $this->getTotalConnectedPeers();
608 * Tries to contact the master server or simply reports that we are the master server
612 public function contactMasterHub () {
613 // Checks wether we are the master hub
614 if ($_SERVER['SERVER_ADDR'] == $this->getConfigInstance()->readConfig("hub_master_ip")) {
616 $this->hubIsMaster = true;
617 $this->getOutputInstance()->output(sprintf("[%s] Our IP %s matches the master IP. Becoming master hub...",
619 $_SERVER['SERVER_ADDR']
622 // A regular hub or ultra hub so let's contact the master
623 $this->getOutputInstance()->output(sprintf("[%s] Contacting the master hub at %s:%d...",
625 $this->getConfigInstance()->readConfig("hub_master_ip"),
626 $this->getConfigInstance()->readConfig("hub_master_port")
629 // Try to aquire a connection to the master...
631 // Announce us to the master hub
632 $this->announceToMasterHub();
633 } catch (FrameworkException $e) {
634 // Catch all exceptions and output them
635 echo "CATCH".__LINE__.":".$e->__toString()."\n";
636 hub_exception_handler($e);
642 * Disconnects the current with a configurable reason
644 * @param $reasonEntry The entry with the disconnection reason aka. message to the peer
647 public function disconnectPeerWithReason ($reasonEntry) {
650 // Try to disconnect here
652 // First get the raw IP number
653 $ip = $this->getCurrPeerInstance()->getValidatedIP();
655 // Disconnect the peer...
656 $this->getCurrPeerInstance()->disconnectWithReason($this->getConfigInstance()->readConfig($reasonEntry));
657 } catch (FrameworkException $e) {
658 // Catch all exceptions and output them
659 echo "CATCH".__LINE__.":".$e->__toString()."\n";
660 hub_exception_handler($e);
663 // Remove him from the list anyway
664 $this->removePeerFromConnectList();
666 // Output the message
667 $this->getOutputInstance()->output(sprintf("[%s] Peer %s has been disconnected.",
674 * Announces this hub to the master hub. This is being done by connecting to it and asking for auth request
678 public function announceToMasterHub () {
680 // Create a new instance for communicating to the master hub
681 $this->masterConnector = HubConnector::createHubConnectorByAddressPort(
682 $this->getConfigInstance()->readConfig("hub_master_ip"),
683 $this->getConfigInstance()->readConfig("hub_master_port"),
687 // Send all our accepted objects to the master hub
688 $this->masterConnector->sendAllAcceptedObjects();
689 } catch (SocketConnectException $e) {
690 // The master hub is down!
691 ApplicationEntryPoint::app_die($e->getMessage());