9 * This source file is subject to the new BSD license that is bundled
10 * with this package in the file LICENSE.
11 * It is also available through the world-wide-web at this URL:
12 * http://phergie.org/license
16 * @author Phergie Development Team <team@phergie.org>
17 * @copyright 2008-2010 Phergie Development Team (http://phergie.org)
18 * @license http://phergie.org/license New BSD License
19 * @link http://pear.phergie.org/package/Phergie
23 * Connection data processor which polls to handle input in an
24 * asynchronous manner. Will also cause the application tick at
25 * the user-defined wait time.
29 * @author Phergie Development Team <team@phergie.org>
30 * @license http://phergie.org/license New BSD License
31 * @link http://pear.phergie.org/package/Phergie
33 class Phergie_Process_Async extends Phergie_Process_Abstract
36 * Length of time to poll for stream activity (seconds)
43 * Length of time to poll for stream activity (microseconds)
50 * Length of time to wait between ticks.
57 * Records when the application last performed a tick
61 protected $lastTick = 0;
64 * Overrides the parent class to set the poll time.
66 * @param Phergie_Bot $bot Main bot class
67 * @param array $options Processor arguments
71 public function __construct(Phergie_Bot $bot, array $options)
73 if (!$bot->getDriver() instanceof Phergie_Driver_Streams) {
74 throw new Phergie_Process_Exception(
75 'The Async event processor requires the Streams driver'
79 foreach (array('sec', 'usec') as $var) {
80 if (isset($options[$var])) {
81 if (!is_int($options[$var])) {
82 throw new Phergie_Process_Exception(
83 'Processor option "' . $var . '" must be an integer'
86 $this->$var = $options[$var];
90 if (empty($this->sec) && empty($this->usec)) {
91 throw new Phergie_Process_Exception(
92 'One of the processor options "sec" or "usec" must be specified'
96 parent::__construct($bot, $options);
100 * Waits for stream activity and performs event processing on
101 * connections with data to read.
105 protected function handleEventsAsync()
107 $hostmasks = $this->driver->getActiveReadSockets($this->sec, $this->usec);
111 $connections = $this->connections->getConnections($hostmasks);
112 foreach ($connections as $connection) {
113 $this->driver->setConnection($connection);
114 $this->plugins->setConnection($connection);
115 $this->plugins->onTick();
117 if ($event = $this->driver->getEvent()) {
118 $this->ui->onEvent($event, $connection);
119 $this->plugins->setEvent($event);
121 if (!$this->plugins->preEvent()) {
125 $this->plugins->{'on' . ucfirst($event->getType())}();
128 $this->processEvents($connection);
133 * Perform application tick event on all plugins and connections.
137 protected function doTick()
139 foreach ($this->connections as $connection) {
140 $this->plugins->setConnection($connection);
141 $this->plugins->onTick();
142 $this->processEvents($connection);
147 * Obtains and processes incoming events, then sends resulting outgoing
152 public function handleEvents()
155 if ($this->lastTick == 0 || ($this->lastTick + $this->wait <= $time)) {
157 $this->lastTick = $time;
159 $this->handleEventsAsync();