5 * @author Roland Haeder <roland __NOSPAM__ [at] __REMOVE_ME__ mxchange [dot] org>
8 class HubPeer extends BaseFrameworkSystem {
10 * This peer's socket resource
12 private $peerSocket = null;
15 * Timestamp for connection
17 private $connectTime = 0;
20 * Timestamp of last received message
22 private $lastReceivedMessageStamp = 0;
25 * Last received message
27 private $lastReceivedMessage = "";
30 * Timestamp of last sent message
32 private $lastSentMessageStamp = 0;
37 private $lastSentMessage = "";
40 * Number of tries for authorization
42 private $authRetries = 0;
45 * Wether the peer is authorized (DO NEVER SET THIS TO "true"!)
47 private $isAuthorized = false;
50 * Wether this peer needs to be requested for the AUTH command
52 private $needAuthRequest = true;
55 * The peer's IP number
57 private static $peerIP = "0.0.0.0";
60 * Wether the peer has "hello-ed" to the other
62 private $helloSent = false;
65 * Wether the peer has replied our HELLO
67 private $helloReplied = false;
70 * Wether we have sent our ELHO
72 private $elhoSent = false;
75 * An instance of HubCoreLoop
77 private $hubInstance = null;
80 * When the last ping was
82 private $lastPinged = 0;
85 * When the last pong was
87 private $lastPonged = 0;
90 * Number of missing pongs from the other peer
92 private $missingPongs = 0;
95 * An instance of a HubCommandProcessor class
97 private $commandInstance = null;
100 * A queue for reading data in non-blocking mode
102 private $queues = "";
107 const LINE_END = "\n";
109 //----------------------------------------------------------
111 //----------------------------------------------------------
112 const EXCEPTION_PEER_SOCKET_INVALID = 0x200;
113 const EXCEPTION_PEER_IP_CHANGED = 0x201;
114 const EXCEPTION_PEER_SOCKET_BROKEN = 0x202;
115 //----------------------------------------------------------
118 * The private constructor
120 private function __construct () {
121 // Call parent constructor
122 parent::constructor(__CLASS__);
125 $this->setPartDescr("Hub-Peer");
128 $this->createUniqueID();
131 $this->removeSystemArray();
132 $this->removeNumberFormaters();
134 // Get a command processor
135 $this->commandInstance = HubCommandProcessor::createHubCommandProcessor($this);
139 * Creates an instance of a HubPeer by a provided valid socket resource
141 * @param $peerSocket The socket resource for the peer
142 * @param $hubInstance An instance of HubCoreLoop
143 * @return $peerInstance An instance of HubPeer
145 public final static function createHubPeerBySocket ($peerSocket, HubCoreLoop $hubInstance) {
146 // Get a new instance
147 $peerInstance = new HubPeer();
149 // Is the peer socket fine?
150 if (!is_resource($peerSocket)) {
151 // There is a problem with the socket
152 throw new PeerSocketException (
154 'this' => $peerInstance,
155 'type' => gettype($peerSocket)
156 ), self::EXCEPTION_PEER_SOCKET_INVALID
161 $peerInstance->setPeerSocket($peerSocket);
163 // Set connection timestamp
164 $peerInstance->setConnectionTimestamp();
166 // Set the hub instance
167 $peerInstance->setHubInstance($hubInstance);
169 // Return the instance
170 return $peerInstance;
173 //----------------------------------------------------------
174 // Public getter/setter
175 //----------------------------------------------------------
178 * Setter for peer socket
180 * @param $peerSocket The peer's socket
183 public final function setPeerSocket ($peerSocket) {
184 if (is_resource($peerSocket)) {
185 $this->peerSocket = $peerSocket;
187 $this->peerSocket = null;
192 * Setter for connection timestamp only once
196 public final function setConnectionTimestamp () {
197 if ($this->connectTime == 0) $this->connectTime = time();
201 * Getter for a raw IP number
203 * @return $ip The peer's IP number
204 * @throws BrokenPipeException If a socket has lost its connection to the peer
206 public final function getRawIP () {
207 if (is_resource($this->peerSocket)) {
208 // Get IP from socket
209 @socket_getpeername($this->peerSocket, $ip);
211 // Connection problems?
212 if (socket_last_error() > 0) {
213 // Throw an exception
214 throw new BrokenPipeException(
217 'code' => socket_last_error()
218 ), self::EXCEPTION_PEER_SOCKET_BROKEN
222 // Without a socket we cannot determine the right IP
231 * Getter for a validated peer IP
233 * @return $peerIP The peer's IP number
235 public final function getValidatedIP() {
236 // Is the socket valid and IP not set?
237 if ((is_resource($this->peerSocket)) && (socket_last_error() == 0) && (self::$peerIP == "0.0.0.0")) {
238 // Get peer's IP number
239 self::$peerIP = $this->getRawIP();
240 } elseif ((is_resource($this->peerSocket)) && (socket_last_error() == 0)) {
241 // Get current raw IP number for validation
242 $ip = $this->getRawIP();
244 // Check if the IP has changed
245 if ($ip !== self::$peerIP) {
246 // The IP number has changed!
247 throw new IPSpoofingException(
250 'old_ip' => self::$peerIP,
252 ), self::EXCEPTION_PEER_IP_CHANGED
257 return self::$peerIP;
261 * Getter for number of authorization tries
262 * @return $authRetries Authorization tries
264 public final function getAuthRetries () {
265 return $this->authRetries;
268 //----------------------------------------------------------
270 //----------------------------------------------------------
273 * Setter for HubCoreLoop instances
275 * @param $hubInstance An instance of a HubCoreLoop class
278 public final function setHubInstance(HubCoreLoop $hubInstance) {
279 $this->hubInstance = $hubInstance;
283 * Getter for HubCoreLoop instances
285 * @return $hubInstance An instance of a HubCoreLoop class
287 public final function getHubInstance() {
288 return $this->hubInstance;
292 * Getter for last sent message timestamp
294 * @return $lastSentMessageStamp Timestamp of last sent message
296 public final function getLastSentMessageStamp () {
297 return $this->lastSentMessageStamp;
301 * Getter for last sent message
303 * @return $lastSentMessage Last sent message to this peer
305 public final function getLastSentMessage () {
306 return $this->lastSentMessage;
310 * Determines wether the peer is authorized
314 public final function ifPeerIsAuthorized () {
315 return (($this->isAuthorized === true) && ($this->ifPeerNeedsAuthRequest() === false));
319 * Returns wether this peer needs to receive the AUTH command
321 public final function ifPeerNeedsAuthRequest () {
322 return $this->needAuthRequest;
326 * Disconnects the peer with a special reason (status)
328 * @param $reason The special reason
331 public final function disconnectWithReason ($reason) {
332 // Is the message set?
333 if (!empty($reason)) {
334 // Send the given message
335 $this->sendMessage($reason);
337 // Send default "good bye"
338 $this->sendMessage($this->getConfigInstance()->readConfig("hub_msg_bye"));
341 // Disconnect the client
342 socket_shutdown($this->peerSocket, 2);
343 socket_close($this->peerSocket);
347 * Sends a specified message to the peer's socket
349 * @param $message The special message
351 * @throws BrokenPipeException If a pipe to a socket peer is lost
353 public final function sendMessage ($message) {
354 if (empty($message)) {
355 // Set default message
356 $message = $this->getConfigInstance()->readConfig("hub_msg_bye");
360 $this->getDebugInstance()->output(sprintf("[%s] Sending message \"%s\" to peer %s.<br />\n",
363 $this->getValidatedIP()
366 // Send it to the peer
367 if (socket_write($this->peerSocket, $message."\n") !== false) {
368 // Set the timestamp and message
369 $this->lastSentMessage = $message;
370 $this->lastSentMessageStamp = time();
372 // Message could not be sent
373 throw new BrokenPipeException (
376 'code' => socket_last_error()
377 ), self::EXCEPTION_PEER_SOCKET_BROKEN
383 * Asks the connected new peer for the authorization key
387 public final function askAuthorizationKey () {
388 if ($this->ifPeerNeedsAuthRequest()) {
389 // This peer needs to receive the AUTH command so send it to him
390 $this->sendMessage($this->getConfigInstance()->readConfig("hub_auth_request"));
393 $this->needAuthRequest = false;
398 * Determines wether the peer is a local admin (localhost connection)
400 * @return $isLocalAdmin Wether the peer is a local admin
402 public function ifPeerIsLocalAdmin () {
403 // Get the remote IP and extract only the first three numbers
404 $remoteArray = explode(".", self::$peerIP);
406 // Is the peer a local admin?
407 return (($remoteArray[0].".".$remoteArray[1].".".$remoteArray[2]) == "127.0.0");
411 * Enables the local admin and authorizes this peer
415 public function enableLocalAdmin() {
416 // Is this IP really local?
417 if (($this->ifPeerIsLocalAdmin()) && (!$this->ifPeerIsAuthorized())) {
418 // Then authorize him
419 $this->isAuthorized = true;
420 $this->needAuthRequest = false;
425 * Says "hi" to the local admin
429 public function sayHi2Admin () {
430 // Send a message to him... ;-)
431 $this->sendMessage("Local admin console is ready. Enter HELP for instructions.");
435 * Enables wether the peer is authorized
437 * @param $isAuthValid Wether the authorization wents fine
440 public function enableIsAuthorized ($isAuthValid = true) {
441 $this->isAuthorized = true;
445 * Checks wether a HELLO has been sent and if not it will be send to the other peer
447 * @return $helloSent Wether a HELLO has been sent
449 public function ifHelloReceived () {
450 // HELLO has been sent?
451 if (!$this->helloSent) {
453 $read = $this->readFromSocket();
456 if ($read == $this->getConfigInstance()->readConfig("hub_peer_hello")) {
457 // All right! A HELLO has been received
458 $this->helloSent = true;
459 $this->getHubInstance()->getOutputInstance()->output(sprintf("[%s] Peer %s said HELLO to us.",
461 $this->getValidatedIP()
467 return $this->helloSent;
471 * Wether this hub has replied our HELLO request
473 * @return $helloReplied Wether this hub has replied our HELLO request
475 public function ifHelloReplied () {
476 if ((!$this->helloReplied) && ($this->lastSentMessage == $this->getConfigInstance()->readConfig("hub_peer_hello"))) {
478 $read = $this->readFromSocket();
481 if ($read == $this->getConfigInstance()->readConfig("hub_hello_reply")) {
482 // Is this the master IP?
483 if ($this->getValidatedIP() == $this->getConfigInstance()->readConfig("hub_master_ip")) {
484 // All right! A HELLO has been received
485 $this->helloReplied = true;
486 $this->getHubInstance()->getOutputInstance()->output(sprintf("[%s] The master hub at %s:%d replied our %s.",
488 $this->getValidatedIP(),
489 $this->getConfigInstance()->readConfig("hub_master_port"),
490 $this->getConfigInstance()->readConfig("hub_peer_hello")
493 // ELHOs from non-masters are not valid!
494 $this->getHubInstance()->getOutputInstance()->output(sprintf("[%s] Peer %s replied our %s but is not the master hub!",
496 $this->getValidatedIP(),
497 $this->getConfigInstance()->readConfig("hub_peer_hello")
504 return $this->helloReplied;
508 * Returns wether a ELHO (HELLO reply has been sent)
510 * @return $elhoSent Wether a ELHO has been sent to the peer
512 public final function ifELHOsent () {
513 return $this->elhoSent;
517 * Replies a HELLO message with a ELHO message
521 public function replyHelloMessage () {
522 $this->sendMessage($this->getConfigInstance()->readConfig("hub_hello_reply"));
523 $this->elhoSent = true;
527 * Handles pinging this peer
531 public function handlePingPeer () {
534 // Do we need to ping?
535 if ((time() - $this->lastPinged) > $this->getConfigInstance()->readConfig("hub_ping_timeout")) {
536 // Don't ping any masters! ;-)
537 if ($this->getValidatedIP() != $this->getConfigInstance()->readConfig("hub_master_ip")) {
539 $this->getHubInstance()->getOutputInstance()->output(sprintf("[%s] Sending ping to peer %s...",
541 $this->getValidatedIP()
544 // Send out a PING and await a PONG
545 $this->commandInstance->simpleExecute(
546 $this->getConfigInstance()->readConfig("hub_peer_ping"),
547 $this->getConfigInstance()->readConfig("hub_ping_reply")
550 // PONG received within last ping?
551 if (($this->lastPonged < time()) && (($this->lastPonged - $this->lastPinged) < 0)) {
552 // Not replied so far
553 $this->missingPongs++;
556 $this->getHubInstance()->getOutputInstance()->output(sprintf("[%s] Ping not replied! Try: %d",
562 if ($this->missingPongs == $this->getConfigInstance()->readConfig("hub_ping_maxdrops")) {
564 $this->getHubInstance()->disconnectPeerWithReason("hub_peer_miss_pong");
569 // Last time we pinged is now.
570 $this->lastPinged = time();
575 // Connection is lost?
576 if ($lost === true) return false;
578 // Awaiting PONG here
579 if ($this->commandInstance->ifAwaitsCommand($this->getConfigInstance()->readConfig("hub_ping_reply"))) {
580 // PONG received! :-) So reset all counters...
581 $this->lastPonged = time();
582 $this->missingPongs = 0;
584 // Notify the loop about the ping-pong-time
585 $this->getHubInstance()->updatePeerEntry($this, (time() - $this->lastPinged));
590 * Handles any incoming commands from the master hub
594 public function handleMasterRequests () {
595 // Read the raw socket for data packages
596 if ($this->commandInstance->awaitAnyCommand()) {
597 // A command has been received from the master hub
598 $command = $this->commandInstance->pull();
600 // TODO Handle a command from the master here...
605 * Reads raw data from the socket and trims leading/trailing spaces away.
606 * Returns an empty string if no data has been received.
608 * @return $data Raw data from the underlaying socket
609 * @throws BrokenPipeException If a socket has lost its connection to the peer
611 public function readFromSocket () {
613 $read = array($this->peerSocket);
616 $num = socket_select($read, $write, $except, 0);
618 // Something has changed on a socket
619 foreach ($read as $socket) {
620 if (is_resource($socket)) {
621 $data = trim(@socket_read($socket, 1024, PHP_NORMAL_READ));
622 if (socket_last_error() > 0) {
623 // Throw an exception
624 throw new BrokenPipeException(
627 'code' => socket_last_error()
628 ), self::EXCEPTION_PEER_SOCKET_BROKEN