* @version 0.0
* @copyright Copyright(c) 2007, 2008 Roland Haeder, this is free software
* @license GNU GPL 3.0 or any newer version
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with this program. If not, see .
*/
class HubPeer extends BaseFrameworkSystem {
/**
* This peer's socket resource
*/
private $peerSocket = null;
/**
* Timestamp for connection
*/
private $connectTime = 0;
/**
* Timestamp of last received message
*/
private $lastReceivedMessageStamp = 0;
/**
* Last received message
*/
private $lastReceivedMessage = "";
/**
* Timestamp of last sent message
*/
private $lastSentMessageStamp = 0;
/**
* Last sent message
*/
private $lastSentMessage = "";
/**
* Number of tries for authorization
*/
private $authRetries = 0;
/**
* Wether the peer is authorized (DO NEVER SET THIS TO "true"!)
*/
private $isAuthorized = false;
/**
* Wether this peer needs to be requested for the AUTH command
*/
private $needAuthRequest = true;
/**
* The peer's IP number
*/
private static $peerIP = "0.0.0.0";
/**
* Wether the peer has "hello-ed" to the other
*/
private $helloSent = false;
/**
* Wether the peer has replied our HELLO
*/
private $helloReplied = false;
/**
* Wether we have sent our ELHO
*/
private $elhoSent = false;
/**
* An instance of HubCoreLoop
*/
private $hubInstance = null;
/**
* When the last ping was
*/
private $lastPinged = 0;
/**
* When the last pong was
*/
private $lastPonged = 0;
/**
* Number of missing pongs from the other peer
*/
private $missingPongs = 0;
/**
* An instance of a HubCommandProcessor class
*/
private $commandInstance = null;
/**
* A queue for reading data in non-blocking mode
*/
private $queues = "";
/**
* Line ends
*/
const LINE_END = "\n";
//----------------------------------------------------------
// Exceptions
//----------------------------------------------------------
const EXCEPTION_PEER_SOCKET_INVALID = 0x200;
const EXCEPTION_PEER_IP_CHANGED = 0x201;
const EXCEPTION_PEER_SOCKET_BROKEN = 0x202;
//----------------------------------------------------------
/**
* The private constructor
*/
protected function __construct () {
// Call parent constructor
parent::__construct(__CLASS__);
// Tidy up a little
$this->removeSystemArray();
$this->removeNumberFormaters();
// Get a command processor
$this->commandInstance = HubCommandProcessor::createHubCommandProcessor($this);
}
/**
* Creates an instance of a HubPeer by a provided valid socket resource
*
* @param $peerSocket The socket resource for the peer
* @param $hubInstance An instance of HubCoreLoop
* @return $peerInstance An instance of HubPeer
*/
public final static function createHubPeerBySocket ($peerSocket, HubCoreLoop $hubInstance) {
// Get a new instance
$peerInstance = new HubPeer();
// Is the peer socket fine?
if (!is_resource($peerSocket)) {
// There is a problem with the socket
throw new PeerSocketException (
array(
'this' => $peerInstance,
'type' => gettype($peerSocket)
), self::EXCEPTION_PEER_SOCKET_INVALID
);
}
// Set the socket
$peerInstance->setPeerSocket($peerSocket);
// Set connection timestamp
$peerInstance->setConnectionTimestamp();
// Set the hub instance
$peerInstance->setHubInstance($hubInstance);
// Return the instance
return $peerInstance;
}
//----------------------------------------------------------
// Public getter/setter
//----------------------------------------------------------
/**
* Setter for peer socket
*
* @param $peerSocket The peer's socket
* @return void
*/
public final function setPeerSocket ($peerSocket) {
if (is_resource($peerSocket)) {
$this->peerSocket = $peerSocket;
} else {
$this->peerSocket = null;
}
}
/**
* Getter for peer socket
*
* @return $peerSocket The peer's socket
*/
public final function getPeerSocket () {
return $this->peerSocket;
}
/**
* Setter for connection timestamp only once
*
* @return void
*/
public final function setConnectionTimestamp () {
if ($this->connectTime == 0) $this->connectTime = time();
}
/**
* Getter for a raw IP number
*
* @return $ip The peer's IP number
* @throws BrokenPipeException If a socket has lost its connection to the peer
*/
public final function getRawIP () {
if (is_resource($this->peerSocket)) {
// Get IP from socket
@socket_getpeername($this->peerSocket, $ip);
// Connection problems?
if (socket_last_error() > 0) {
// Throw an exception
throw new BrokenPipeException(
array(
'this' => $this,
'code' => socket_last_error()
), self::EXCEPTION_PEER_SOCKET_BROKEN
);
}
} else {
// Without a socket we cannot determine the right IP
$ip = "0.0.0.0";
}
// And return it...
return $ip;
}
/**
* Getter for a validated peer IP
*
* @return $peerIP The peer's IP number
*/
public final function getValidatedIP() {
// Is the socket valid and IP not set?
if ((is_resource($this->peerSocket)) && (socket_last_error() == 0) && (self::$peerIP == "0.0.0.0")) {
// Get peer's IP number
self::$peerIP = $this->getRawIP();
} elseif ((is_resource($this->peerSocket)) && (socket_last_error() == 0)) {
// Get current raw IP number for validation
$ip = $this->getRawIP();
// Check if the IP has changed
if ($ip !== self::$peerIP) {
// The IP number has changed!
throw new IPSpoofingException(
array(
'this' => $this,
'old_ip' => self::$peerIP,
'new_ip' => $ip
), self::EXCEPTION_PEER_IP_CHANGED
);
}
}
return self::$peerIP;
}
/**
* Getter for number of authorization tries
* @return $authRetries Authorization tries
*/
public final function getAuthRetries () {
return $this->authRetries;
}
//----------------------------------------------------------
// Public methods
//----------------------------------------------------------
/**
* Setter for HubCoreLoop instances
*
* @param $hubInstance An instance of a HubCoreLoop class
* @return void
*/
public final function setHubInstance(HubCoreLoop $hubInstance) {
$this->hubInstance = $hubInstance;
}
/**
* Getter for HubCoreLoop instances
*
* @return $hubInstance An instance of a HubCoreLoop class
*/
public final function getHubInstance() {
return $this->hubInstance;
}
/**
* Getter for last sent message timestamp
*
* @return $lastSentMessageStamp Timestamp of last sent message
*/
public final function getLastSentMessageStamp () {
return $this->lastSentMessageStamp;
}
/**
* Getter for last sent message
*
* @return $lastSentMessage Last sent message to this peer
*/
public final function getLastSentMessage () {
return $this->lastSentMessage;
}
/**
* Determines wether the peer is authorized
*
* @return void
*/
public final function ifPeerIsAuthorized () {
return (($this->isAuthorized === true) && ($this->ifPeerNeedsAuthRequest() === false));
}
/**
* Returns wether this peer needs to receive the AUTH command
*/
public final function ifPeerNeedsAuthRequest () {
return $this->needAuthRequest;
}
/**
* Disconnects the peer with a special reason (status)
*
* @param $reason The special reason
* @return void
*/
public final function disconnectWithReason ($reason) {
// Is the message set?
if (!empty($reason)) {
// Send the given message
$this->sendMessage($reason);
} else {
// Send default "good bye"
$this->sendMessage($this->getConfigInstance()->readConfig("hub_msg_bye"));
}
// Disconnect the client
socket_shutdown($this->peerSocket, 2);
socket_close($this->peerSocket);
}
/**
* Sends a specified message to the peer's socket
*
* @param $message The special message
* @return void
* @throws BrokenPipeException If a pipe to a socket peer is lost
*/
public final function sendMessage ($message) {
if (empty($message)) {
// Set default message
$message = $this->getConfigInstance()->readConfig("hub_msg_bye");
}
// Debug message
$this->getDebugInstance()->output(sprintf("[%s] Sending message \"%s\" to peer %s.
\n",
__METHOD__,
$message,
$this->getValidatedIP()
));
// Is the socket still valid?
if (!is_resource($this->peerSocket)) {
// The socket is no longer valid!
throw new BrokenPipeException (
array(
'this' => $this,
'code' => socket_last_error()
), self::EXCEPTION_PEER_SOCKET_BROKEN
);
}
// Send it to the peer
if (socket_write($this->peerSocket, $message."\n") !== false) {
// Set the timestamp and message
$this->lastSentMessage = $message;
$this->lastSentMessageStamp = time();
} else {
// Message could not be sent
throw new BrokenPipeException (
array(
'this' => $this,
'code' => socket_last_error()
), self::EXCEPTION_PEER_SOCKET_BROKEN
);
}
}
/**
* Asks the connected new peer for the authorization key
*
* @return void
*/
public final function askAuthorizationKey () {
if ($this->ifPeerNeedsAuthRequest()) {
// This peer needs to receive the AUTH command so send it to him
$this->sendMessage($this->getConfigInstance()->readConfig("hub_auth_request"));
// Set it to done
$this->needAuthRequest = false;
}
}
/**
* Determines wether the peer is a local admin (localhost connection)
*
* @return $isLocalAdmin Wether the peer is a local admin
*/
public function ifPeerIsLocalAdmin () {
// Get the remote IP and extract only the first three numbers
$remoteArray = explode(".", self::$peerIP);
// Is the peer a local admin?
return (($remoteArray[0].".".$remoteArray[1].".".$remoteArray[2]) == "127.0.0");
}
/**
* Enables the local admin and authorizes this peer
*
* @return void
*/
public function enableLocalAdmin() {
// Is this IP really local?
if (($this->ifPeerIsLocalAdmin()) && (!$this->ifPeerIsAuthorized())) {
// Then authorize him
$this->isAuthorized = true;
$this->needAuthRequest = false;
} // END - if
}
/**
* Says "hi" to the local admin
*
* @return void
*/
public function sayHi2Admin () {
// Send a message to him... ;-)
$this->sendMessage("Local admin console is ready. Enter HELP for instructions.");
}
/**
* Enables wether the peer is authorized
*
* @param $isAuthValid Wether the authorization wents fine
* @return void
*/
public function enableIsAuthorized ($isAuthValid = true) {
$this->isAuthorized = true;
}
/**
* Checks wether a HELLO has been sent and if not it will be send to the other peer
*
* @return $helloSent Wether a HELLO has been sent
*/
public function ifHelloReceived () {
// HELLO has been sent?
if (!$this->helloSent) {
// Read some data
$read = $this->readFromSocket();
// Is this a HELLO?
if ($read == $this->getConfigInstance()->readConfig("hub_peer_hello")) {
// All right! A HELLO has been received
$this->helloSent = true;
$this->getHubInstance()->getOutputInstance()->output(sprintf("[%s] Peer %s said HELLO to us.",
__METHOD__,
$this->getValidatedIP()
));
} // END - if
} // END - if
// Return status
return $this->helloSent;
}
/**
* Wether this hub has replied our HELLO request
*
* @return $helloReplied Wether this hub has replied our HELLO request
*/
public function ifHelloReplied () {
if ((!$this->helloReplied) && ($this->lastSentMessage == $this->getConfigInstance()->readConfig("hub_peer_hello"))) {
// Read some data
$read = $this->readFromSocket();
// Is this a HELLO?
if ($read == $this->getConfigInstance()->readConfig("hub_hello_reply")) {
// Is this the master IP?
if ($this->getValidatedIP() == $this->getConfigInstance()->readConfig("hub_master_ip")) {
// All right! A HELLO has been received
$this->helloReplied = true;
$this->getHubInstance()->getOutputInstance()->output(sprintf("[%s] The master hub at %s:%d replied our %s.",
__METHOD__,
$this->getValidatedIP(),
$this->getConfigInstance()->readConfig("hub_master_port"),
$this->getConfigInstance()->readConfig("hub_peer_hello")
));
} else {
// ELHOs from non-masters are not valid!
$this->getHubInstance()->getOutputInstance()->output(sprintf("[%s] Peer %s replied our %s but is not the master hub!",
__METHOD__,
$this->getValidatedIP(),
$this->getConfigInstance()->readConfig("hub_peer_hello")
));
}
} // END - if
} // END - if
// Return status
return $this->helloReplied;
}
/**
* Returns wether a ELHO (HELLO reply has been sent)
*
* @return $elhoSent Wether a ELHO has been sent to the peer
*/
public final function ifELHOsent () {
return $this->elhoSent;
}
/**
* Replies a HELLO message with a ELHO message
*
* @return void
*/
public function replyHelloMessage () {
$this->sendMessage($this->getConfigInstance()->readConfig("hub_hello_reply"));
$this->elhoSent = true;
}
/**
* Handles pinging this peer
*
* @return void
*/
public function handlePingPeer () {
$lost = false;
// Do we need to ping?
if ((time() - $this->lastPinged) > $this->getConfigInstance()->readConfig("hub_ping_timeout")) {
// Don't ping any masters! ;-)
if ($this->getValidatedIP() != $this->getConfigInstance()->readConfig("hub_master_ip")) {
// Debug message
$this->getHubInstance()->getOutputInstance()->output(sprintf("[%s] Sending ping to peer %s...",
$this->__toString(),
$this->getValidatedIP()
));
// Send out a PING and await a PONG
$this->commandInstance->simpleExecute(
$this->getConfigInstance()->readConfig("hub_peer_ping"),
$this->getConfigInstance()->readConfig("hub_ping_reply")
);
// PONG received within last ping?
if (($this->lastPonged < time()) && (($this->lastPonged - $this->lastPinged) < 0)) {
// Not replied so far
$this->missingPongs++;
// Debug message
$this->getHubInstance()->getOutputInstance()->output(sprintf("[%s] Ping not replied! Try: %d",
$this->__toString(),
$this->missingPongs
));
// Limit reached?
if ($this->missingPongs == $this->getConfigInstance()->readConfig("hub_ping_maxdrops")) {
// This peer is lost
$this->getHubInstance()->disconnectPeerWithReason("hub_peer_miss_pong");
$lost = true;
} // END - if
} // END - if
// Last time we pinged is now.
$this->lastPinged = time();
} // END - if
} // END - if
// Connection is lost?
if ($lost === true) return false;
// Awaiting PONG here
if ($this->commandInstance->ifAwaitsCommand($this->getConfigInstance()->readConfig("hub_ping_reply"))) {
// PONG received! :-) So reset all counters...
$this->lastPonged = time();
$this->missingPongs = 0;
// Notify the loop about the ping-pong-time
$this->getHubInstance()->updatePeerEntry($this, (time() - $this->lastPinged));
} // END - if
}
/**
* Handles any incoming commands from the master hub
*
* @return void
*/
public function handleMasterRequests () {
// Read the raw socket for data packages
if ($this->commandInstance->awaitAnyCommand()) {
// A command has been received from the master hub
$command = $this->commandInstance->pull();
// TODO Handle a command from the master here...
} // END - if
}
/**
* Reads raw data from the socket and trims leading/trailing spaces away.
* Returns an empty string if no data has been received.
*
* @return $data Raw data from the underlaying socket
* @throws BrokenPipeException If a socket has lost its connection to the peer
*/
public function readFromSocket () {
$data = "";
$read = array($this->peerSocket);
$write = null;
$except = null;
$num = socket_select($read, $write, $except, 0);
if ($num > 0) {
// Something has changed on a socket
foreach ($read as $socket) {
if (is_resource($socket)) {
$data = trim(@socket_read($socket, 1024, PHP_NORMAL_READ));
if (socket_last_error() > 0) {
// Throw an exception
throw new BrokenPipeException(
array(
'this' => $this,
'code' => socket_last_error()
), self::EXCEPTION_PEER_SOCKET_BROKEN
);
}
break;
} // END - if
} // END - foreach
} // END - if
return $data;
}
} // END - class
// [EOF]
?>