It'd continue trying xmpp transports forever, for example...
* @param mixed $transports name of a single queue or array of queues to pull from
* If not specified, checks all queues in the system.
*/
- static function top($transports=null) {
+ static function top($transports=null, array $ignored_transports=array()) {
$qi = new Queue_item();
if ($transports) {
$qi->transport = $transports;
}
}
+ if (!empty($ignored_transports)) {
+ // @fixme use safer escaping
+ $list = implode("','", array_map(array($qi, 'escape'), $ignored_transports));
+ $qi->whereAdd("transport NOT IN ('$list')");
+ }
$qi->orderBy('created');
$qi->whereAdd('claimed is null');
public function poll()
{
//$this->_log(LOG_DEBUG, 'Checking for notices...');
- $qi = Queue_item::top($this->activeQueues());
+ $qi = Queue_item::top($this->activeQueues(), $this->getIgnoredTransports());
if (!$qi instanceof Queue_item) {
//$this->_log(LOG_DEBUG, 'No notices waiting; idling.');
return false;
protected $handlers = array();
protected $groups = array();
protected $activeGroups = array();
+ protected $ignoredTransports = array();
/**
* Factory function to pull the appropriate QueueManager object
return array_keys($queues);
}
+ function getIgnoredTransports()
+ {
+ return array_keys($this->ignoredTransports);
+ }
+
+ function ignoreTransport($transport)
+ {
+ // key is used for uniqueness, value doesn't mean anything
+ $this->ignoredTransports[$transport] = true;
+ }
+
/**
* Initialize the list of queue handlers for the current site.
*
// OpportunisticQM shouldn't discard items it can't handle, we're
// only here to take care of what we _can_ handle!
protected function noHandlerFound(Queue_item $qi, $rep=null) {
- $this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Releasing claim for queue item without a handler");
+ $this->_log(LOG_WARNING, "[{$qi->transport}:item {$qi->id}] Releasing claim for queue item without a handler");
$this->_fail($qi, true); // true here means "releaseOnly", so no error statistics since it's not an _error_
}
+ protected function _fail(Queue_item $qi, $releaseOnly=false)
+ {
+ parent::_fail($qi, $releaseOnly);
+ $this->_log(LOG_DEBUG, "[{$qi->transport}:item {$qi->id}] Ignoring this transport for the rest of this execution");
+ $this->ignoreTransport($qi->transport);
+ }
+
/**
* Takes care of running through the queue items, returning when
* the limits setup in __construct are met.