$server = common_config('site', 'server');
}
- // XXX: protocol
+ $ssl = common_config('attachments', 'ssl');
- return 'http://'.$server.$path.$filename;
+ if (is_null($ssl)) { // null -> guess
+ if (common_config('site', 'ssl') == 'always' &&
+ !common_config('attachments', 'server')) {
+ $ssl = true;
+ } else {
+ $ssl = false;
+ }
+ }
+
+ $protocol = ($ssl) ? 'https' : 'http';
+
+ return $protocol.'://'.$server.$path.$filename;
}
}
if($oembed->modified) $enclosure->modified=$oembed->modified;
unset($oembed->size);
}
+ } else {
+ return false;
}
}
}
$twitter_status['attachments'] = array();
foreach ($attachments as $attachment) {
- if ($attachment->isEnclosure()) {
+ $enclosure_o=$attachment->getEnclosure();
+ if ($enclosure_o) {
$enclosure = array();
- $enclosure['url'] = $attachment->url;
- $enclosure['mimetype'] = $attachment->mimetype;
- $enclosure['size'] = $attachment->size;
+ $enclosure['url'] = $enclosure_o->url;
+ $enclosure['mimetype'] = $enclosure_o->mimetype;
+ $enclosure['size'] = $enclosure_o->size;
$twitter_status['attachments'][] = $enclosure;
}
}
$this->elementStart('feed', array('xmlns' => 'http://www.w3.org/2005/Atom',
'xml:lang' => 'en-US',
'xmlns:thr' => 'http://purl.org/syndication/thread/1.0'));
- Event::handle('StartApiAtom', array($this));
}
function endTwitterAtom()
'subsystem' => 'db', # default to database, or 'stomp'
'stomp_server' => null,
'queue_basename' => '/queue/statusnet/',
- 'control_channel' => '/topic/statusnet-control', // broadcasts to all queue daemons
+ 'control_channel' => '/topic/statusnet/control', // broadcasts to all queue daemons
'stomp_username' => null,
'stomp_password' => null,
'stomp_persistent' => true, // keep items across queue server restart, if persistence is enabled
'spawndelay' => 1, // Wait at least N seconds between (re)spawns of child processes to avoid slamming the queue server with subscription startup
'debug_memory' => false, // true to spit memory usage to log
'inboxes' => true, // true to do inbox distribution & output queueing from in background via 'distrib' queue
+ 'breakout' => array('*' => 'shared'), // set global or per-handler queue breakout
+ // 'shared': use a shared queue for all sites
+ // 'handler': share each/this handler over multiple sites
+ // 'site': break out for each/this handler on this site
+ 'max_retries' => 10, // drop messages after N failed attempts to process (Stomp)
+ 'dead_letter_dir' => false, // set to directory to save dropped messages into (Stomp)
),
'license' =>
array('type' => 'cc', # can be 'cc', 'allrightsreserved', 'private'
'avatar' =>
array('server' => null,
'dir' => INSTALLDIR . '/avatar/',
- 'path' => $_path . '/avatar/'),
+ 'path' => $_path . '/avatar/',
+ 'ssl' => null),
'background' =>
array('server' => null,
'dir' => INSTALLDIR . '/background/',
- 'path' => $_path . '/background/'),
+ 'path' => $_path . '/background/',
+ 'ssl' => null),
'public' =>
array('localonly' => true,
'blacklist' => array(),
'theme' =>
array('server' => null,
'dir' => null,
- 'path'=> null),
+ 'path'=> null,
+ 'ssl' => null),
'javascript' =>
array('server' => null,
- 'path'=> null),
+ 'path'=> null,
+ 'ssl' => null),
'throttle' =>
array('enabled' => false, // whether to throttle edits; false by default
'count' => 20, // number of allowed messages in timespan
array('server' => null,
'dir' => INSTALLDIR . '/file/',
'path' => $_path . '/file/',
+ 'ssl' => null,
'supported' => array('image/png',
'image/jpeg',
'image/gif',
'Mapstraction' => null,
'Linkback' => null,
'WikiHashtags' => null,
+ 'PubSubHubBub' => null,
+ 'RSSCloud' => null,
'OpenID' => null),
),
'admin' =>
$this->multiSite = $multiSite;
}
if ($this->multiSite) {
- $this->sites = $this->findAllSites();
+ $this->sites = StatusNet::findAllSites();
} else {
- $this->sites = array(common_config('site', 'server'));
+ $this->sites = array(StatusNet::currentSite());
}
if (empty($this->sites)) {
}
foreach ($this->sites as $site) {
- if ($site != common_config('site', 'server')) {
- StatusNet::init($site);
- }
+ StatusNet::switchSite($site);
$this->initManagers();
}
}
*/
abstract function initManagers();
- /**
- * Pull all local sites from status_network table.
- * @return array of hostnames
- */
- protected function findAllSites()
- {
- $hosts = array();
- $sn = new Status_network();
- $sn->find();
- while ($sn->fetch()) {
- $hosts[] = $sn->getServerName();
- }
- return $hosts;
- }
-
/**
* Instantiate an i/o manager class for the current site.
* If a multi-site capable handler is already present,
* we don't need to build a new one.
*
- * @param string $class
+ * @param mixed $manager class name (to run $class::get()) or object
*/
- protected function instantiate($class)
+ protected function instantiate($manager)
{
- if (is_string($class) && isset($this->singletons[$class])) {
- // Already instantiated a multi-site-capable handler.
- // Just let it know it should listen to this site too!
- $this->singletons[$class]->addSite(common_config('site', 'server'));
- return;
+ if (is_string($manager)) {
+ $manager = call_user_func(array($class, 'get'));
}
- $manager = $this->getManager($class);
-
- if ($this->multiSite) {
- $caps = $manager->multiSite();
- if ($caps == IoManager::SINGLE_ONLY) {
+ $caps = $manager->multiSite();
+ if ($caps == IoManager::SINGLE_ONLY) {
+ if ($this->multiSite) {
throw new Exception("$class can't run with --all; aborting.");
}
- if ($caps == IoManager::INSTANCE_PER_PROCESS) {
- // Save this guy for later!
- // We'll only need the one to cover multiple sites.
- $this->singletons[$class] = $manager;
- $manager->addSite(common_config('site', 'server'));
- }
+ } else if ($caps == IoManager::INSTANCE_PER_PROCESS) {
+ $manager->addSite();
}
- $this->managers[] = $manager;
- }
-
- protected function getManager($class)
- {
- if(is_object($class)){
- return $class;
- } else {
- return call_user_func(array($class, 'get'));
+ if (!in_array($manager, $this->managers, true)) {
+ // Only need to save singletons once
+ $this->managers[] = $manager;
}
}
{
$this->logState('init');
$this->start();
+ $this->checkMemory(false);
while (!$this->shutdown) {
$timeouts = array_values($this->pollTimeouts);
/**
* Check runtime memory usage, possibly triggering a graceful shutdown
* and thread respawn if we've crossed the soft limit.
+ *
+ * @param boolean $respawn if false we'll shut down instead of respawning
*/
- protected function checkMemory()
+ protected function checkMemory($respawn=true)
{
$memoryLimit = $this->softMemoryLimit();
if ($memoryLimit > 0) {
$usage = memory_get_usage();
if ($usage > $memoryLimit) {
common_log(LOG_INFO, "Queue thread hit soft memory limit ($usage > $memoryLimit); gracefully restarting.");
- $this->requestRestart();
+ if ($respawn) {
+ $this->requestRestart();
+ } else {
+ $this->requestShutdown();
+ }
} else if (common_config('queue', 'debug_memory')) {
- common_log(LOG_DEBUG, "Memory usage $usage");
+ $fmt = number_format($usage);
+ common_log(LOG_DEBUG, "Memory usage $fmt");
}
}
}
* for per-queue and per-site records.
*
* @param string $key counter name
- * @param array $owners list of owner keys like 'queue:jabber' or 'site:stat01'
+ * @param array $owners list of owner keys like 'queue:xmpp' or 'site:stat01'
*/
public function stats($key, $owners=array())
{
{
static $qm = null;
- public $master = null;
- public $handlers = array();
- public $groups = array();
+ protected $master = null;
+ protected $handlers = array();
+ protected $groups = array();
+ protected $activeGroups = array();
/**
* Factory function to pull the appropriate QueueManager object
{
if (isset($this->handlers[$queue])) {
$class = $this->handlers[$queue];
- if (class_exists($class)) {
+ if(is_object($class)) {
+ return $class;
+ } else if (class_exists($class)) {
return new $class();
} else {
- common_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
+ $this->_log(LOG_ERR, "Nonexistent handler class '$class' for queue '$queue'");
}
} else {
- common_log(LOG_ERR, "Requested handler for unkown queue '$queue'");
+ $this->_log(LOG_ERR, "Requested handler for unkown queue '$queue'");
}
return null;
}
/**
* Get a list of registered queue transport names to be used
- * for this daemon.
+ * for listening in this daemon.
*
* @return array of strings
*/
- function getQueues()
+ function activeQueues()
{
- $group = $this->activeGroup();
- return array_keys($this->groups[$group]);
+ $queues = array();
+ foreach ($this->activeGroups as $group) {
+ if (isset($this->groups[$group])) {
+ $queues = array_merge($queues, $this->groups[$group]);
+ }
+ }
+
+ return array_keys($queues);
}
/**
- * Initialize the list of queue handlers
+ * Initialize the list of queue handlers for the current site.
*
* @event StartInitializeQueueManager
* @event EndInitializeQueueManager
*/
function initialize()
{
- // @fixme we'll want to be able to listen to particular queues...
+ $this->handlers = array();
+ $this->groups = array();
+ $this->groupsByTransport = array();
+
if (Event::handle('StartInitializeQueueManager', array($this))) {
- $this->connect('plugin', 'PluginQueueHandler');
+ $this->connect('distrib', 'DistribQueueHandler');
$this->connect('omb', 'OmbQueueHandler');
$this->connect('ping', 'PingQueueHandler');
- $this->connect('distrib', 'DistribQueueHandler');
if (common_config('sms', 'enabled')) {
$this->connect('sms', 'SmsQueueHandler');
}
// XMPP output handlers...
- $this->connect('jabber', 'JabberQueueHandler');
- $this->connect('public', 'PublicQueueHandler');
- // @fixme this should get an actual queue
- //$this->connect('confirm', 'XmppConfirmHandler');
+ if (common_config('xmpp', 'enabled')) {
+ // Delivery prep, read by queuedaemon.php:
+ $this->connect('jabber', 'JabberQueueHandler');
+ $this->connect('public', 'PublicQueueHandler');
+
+ // Raw output, read by xmppdaemon.php:
+ $this->connect('xmppout', 'XmppOutQueueHandler', 'xmpp');
+ }
// For compat with old plugins not registering their own handlers.
$this->connect('plugin', 'PluginQueueHandler');
-
- $this->connect('xmppout', 'XmppOutQueueHandler', 'xmppdaemon');
-
}
Event::handle('EndInitializeQueueManager', array($this));
}
* Only registered transports will be reliably picked up!
*
* @param string $transport
- * @param string $class
+ * @param string $class class name or object instance
* @param string $group
*/
- public function connect($transport, $class, $group='queuedaemon')
+ public function connect($transport, $class, $group='main')
{
$this->handlers[$transport] = $class;
$this->groups[$group][$transport] = $class;
+ $this->groupsByTransport[$transport] = $group;
}
/**
- * @return string queue group to use for this request
+ * Set the active group which will be used for listening.
+ * @param string $group
*/
- function activeGroup()
+ function setActiveGroup($group)
{
- $group = 'queuedaemon';
- if ($this->master) {
- // hack hack
- if ($this->master instanceof XmppMaster) {
- return 'xmppdaemon';
- }
+ $this->activeGroups = array($group);
+ }
+
+ /**
+ * Set the active group(s) which will be used for listening.
+ * @param array $groups
+ */
+ function setActiveGroups($groups)
+ {
+ $this->activeGroups = $groups;
+ }
+
+ /**
+ * @return string queue group for this queue
+ */
+ function queueGroup($queue)
+ {
+ if (isset($this->groupsByTransport[$queue])) {
+ return $this->groupsByTransport[$queue];
+ } else {
+ throw new Exception("Requested group for unregistered transport $queue");
}
- return $group;
}
/**
$monitor->stats($key, $owners);
}
}
+
+ protected function _log($level, $msg)
+ {
+ $class = get_class($this);
+ if ($this->activeGroups) {
+ $groups = ' (' . implode(',', $this->activeGroups) . ')';
+ } else {
+ $groups = '';
+ }
+ common_log($level, "$class$groups: $msg");
+ }
}
}
if (!empty($f)) {
- if ($f->isEnclosure()) {
+ if ($f->getEnclosure()) {
$is_attachment = true;
$attachment_id = $f->id;
- } else {
- $foe = File_oembed::staticGet('file_id', $f->id);
- if (!empty($foe)) {
- // if it has OEmbed info, it's an attachment, too
- $is_attachment = true;
- $attachment_id = $f->id;
-
- $thumb = File_thumbnail::staticGet('file_id', $f->id);
- if (!empty($thumb)) {
- $has_thumb = true;
- }
+
+ $thumb = File_thumbnail::staticGet('file_id', $f->id);
+ if (!empty($thumb)) {
+ $has_thumb = true;
}
}
}
if (Event::hasHandler('HandleQueuedNotice')) {
$transports[] = 'plugin';
}
-
$xmpp = common_config('xmpp', 'enabled');
return array($proxy, $ip);
}
+
+ function common_url_to_nickname($url)
+ {
+ static $bad = array('query', 'user', 'password', 'port', 'fragment');
+
+ $parts = parse_url($url);
+
+ # If any of these parts exist, this won't work
+
+ foreach ($bad as $badpart) {
+ if (array_key_exists($badpart, $parts)) {
+ return null;
+ }
+ }
+
+ # We just have host and/or path
+
+ # If it's just a host...
+ if (array_key_exists('host', $parts) &&
+ (!array_key_exists('path', $parts) || strcmp($parts['path'], '/') == 0))
+ {
+ $hostparts = explode('.', $parts['host']);
+
+ # Try to catch common idiom of nickname.service.tld
+
+ if ((count($hostparts) > 2) &&
+ (strlen($hostparts[count($hostparts) - 2]) > 3) && # try to skip .co.uk, .com.au
+ (strcmp($hostparts[0], 'www') != 0))
+ {
+ return common_nicknamize($hostparts[0]);
+ } else {
+ # Do the whole hostname
+ return common_nicknamize($parts['host']);
+ }
+ } else {
+ if (array_key_exists('path', $parts)) {
+ # Strip starting, ending slashes
+ $path = preg_replace('@/$@', '', $parts['path']);
+ $path = preg_replace('@^/@', '', $path);
+ if (strpos($path, '/') === false) {
+ return common_nicknamize($path);
+ }
+ }
+ }
+
+ return null;
+ }
+
+ function common_nicknamize($str)
+ {
+ $str = preg_replace('/\W/', '', $str);
+ return strtolower($str);
+ }