* @package Hub
* @author Brion Vibber <brion@status.net>
*/
-class HubSub extends Memcached_DataObject
+class HubSub extends Managed_DataObject
{
public $__table = 'hubsub';
public $hashkey; // sha1(topic . '|' . $callback); (topic, callback) key is too long for myisam in utf8
- public $topic;
- public $callback;
+ public $topic; // varchar(191) not 255 because utf8mb4 takes more space
+ public $callback; // varchar(191) not 255 because utf8mb4 takes more space
public $secret;
public $lease;
public $sub_start;
public $created;
public $modified;
- public /*static*/ function staticGet($topic, $callback)
- {
- return parent::staticGet(__CLASS__, 'hashkey', self::hashkey($topic, $callback));
- }
-
protected static function hashkey($topic, $callback)
{
return sha1($topic . '|' . $callback);
}
- /**
- * return table definition for DB_DataObject
- *
- * DB_DataObject needs to know something about the table to manipulate
- * instances. This method provides all the DB_DataObject needs to know.
- *
- * @return array array of column definitions
- */
- function table()
+ public static function getByHashkey($topic, $callback)
{
- return array('hashkey' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
- 'topic' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
- 'callback' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
- 'secret' => DB_DATAOBJECT_STR,
- 'lease' => DB_DATAOBJECT_INT,
- 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
- 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
- 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL,
- 'modified' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL);
+ return self::getKV('hashkey', self::hashkey($topic, $callback));
}
- static function schemaDef()
+ public static function schemaDef()
{
- return array(new ColumnDef('hashkey', 'char',
- /*size*/40,
- /*nullable*/false,
- /*key*/'PRI'),
- new ColumnDef('topic', 'varchar',
- /*size*/255,
- /*nullable*/false,
- /*key*/'MUL'),
- new ColumnDef('callback', 'varchar',
- 255, false),
- new ColumnDef('secret', 'text',
- null, true),
- new ColumnDef('lease', 'int',
- null, true),
- new ColumnDef('sub_start', 'datetime',
- null, true),
- new ColumnDef('sub_end', 'datetime',
- null, true),
- new ColumnDef('created', 'datetime',
- null, false),
- new ColumnDef('modified', 'datetime',
- null, false));
- }
-
- function keys()
- {
- return array_keys($this->keyTypes());
- }
-
- function sequenceKey()
- {
- return array(false, false, false);
- }
-
- /**
- * return key definitions for DB_DataObject
- *
- * DB_DataObject needs to know about keys that the table has; this function
- * defines them.
- *
- * @return array key definitions
- */
- function keyTypes()
- {
- return array('hashkey' => 'K');
+ return array(
+ 'fields' => array(
+ 'hashkey' => array('type' => 'char', 'not null' => true, 'length' => 40, 'description' => 'HubSub hashkey'),
+ 'topic' => array('type' => 'varchar', 'not null' => true, 'length' => 191, 'description' => 'HubSub topic'),
+ 'callback' => array('type' => 'varchar', 'not null' => true, 'length' => 191, 'description' => 'HubSub callback'),
+ 'secret' => array('type' => 'text', 'description' => 'HubSub stored secret'),
+ 'lease' => array('type' => 'int', 'description' => 'HubSub leasetime'),
+ 'sub_start' => array('type' => 'datetime', 'description' => 'subscription start'),
+ 'sub_end' => array('type' => 'datetime', 'description' => 'subscription end'),
+ 'created' => array('type' => 'datetime', 'not null' => true, 'description' => 'date this record was created'),
+ 'modified' => array('type' => 'timestamp', 'not null' => true, 'description' => 'date this record was modified'),
+ ),
+ 'primary key' => array('hashkey'),
+ 'indexes' => array(
+ 'hubsub_topic_idx' => array('topic'),
+ ),
+ );
}
/**
}
$data = array('sub' => clone($this),
'mode' => $mode,
- 'token' => $token,
+ 'token' => $token, // let's put it in there if remote uses PuSH <0.4
'retries' => $retries);
$qm = QueueManager::get();
$qm->enqueue($data, 'hubconf');
{
assert($mode == 'subscribe' || $mode == 'unsubscribe');
- $challenge = common_good_rand(32);
+ $challenge = common_random_hexstr(32);
$params = array('hub.mode' => $mode,
'hub.topic' => $this->topic,
'hub.challenge' => $challenge);
if ($mode == 'subscribe') {
$params['hub.lease_seconds'] = $this->lease;
}
- if ($token !== null) {
- $params['hub.verify_token'] = $token;
+ if ($token !== null) { // TODO: deprecated in PuSH 0.4
+ $params['hub.verify_token'] = $token; // let's put it in there if remote uses PuSH <0.4
}
// Any existing query string parameters must be preserved
$status = $response->getStatus();
if ($status >= 200 && $status < 300) {
- common_log(LOG_INFO, "Verified $mode of $this->callback:$this->topic");
+ common_log(LOG_INFO, "Verified {$mode} of {$this->callback}:{$this->topic}");
} else {
// TRANS: Client exception. %s is a HTTP status code.
throw new ClientException(sprintf(_m('Hub subscriber verification returned HTTP %s.'),$status));
}
- $old = HubSub::staticGet($this->topic, $this->callback);
+ $old = HubSub::getByHashkey($this->topic, $this->callback);
if ($mode == 'subscribe') {
- if ($old) {
+ if ($old instanceof HubSub) {
$this->update($old);
} else {
$ok = $this->insert();
}
} else if ($mode == 'unsubscribe') {
- if ($old) {
+ if ($old instanceof HubSub) {
$old->delete();
} else {
// That's ok, we're already unsubscribed.
return parent::insert();
}
- /**
- * Update wrapper; transparently update modified column.
- * @return boolean success
- */
- function update($old=null)
- {
- $this->modified = common_sql_now();
- return parent::update($old);
- }
-
/**
* Schedule delivery of a 'fat ping' to the subscriber's callback
* endpoint. If queues are disabled, this will run immediately.
$retries = intval(common_config('ostatus', 'hub_retries'));
}
- if (common_config('ostatus', 'local_push_bypass')) {
- // If target is a local site, bypass the web server and drop the
- // item directly into the target's input queue.
- $url = parse_url($this->callback);
- $wildcard = common_config('ostatus', 'local_wildcard');
- $site = Status_network::getFromHostname($url['host'], $wildcard);
-
- if ($site) {
- if ($this->secret) {
- $hmac = 'sha1=' . hash_hmac('sha1', $atom, $this->secret);
- } else {
- $hmac = '';
- }
-
- // Hack: at the moment we stick the subscription ID in the callback
- // URL so we don't have to look inside the Atom to route the subscription.
- // For now this means we need to extract that from the target URL
- // so we can include it in the data.
- $parts = explode('/', $url['path']);
- $subId = intval(array_pop($parts));
-
- $data = array('feedsub_id' => $subId,
- 'post' => $atom,
- 'hmac' => $hmac);
- common_log(LOG_DEBUG, "Cross-site PuSH bypass enqueueing straight to $site->nickname feed $subId");
- $qm = QueueManager::get();
- $qm->enqueue($data, 'pushin', $site->nickname);
- return;
- }
- }
-
// We dare not clone() as when the clone is discarded it'll
// destroy the result data for the parent query.
// @fixme use clone() again when it's safe to copy an
// individual item from a multi-item query again.
- $sub = HubSub::staticGet($this->topic, $this->callback);
+ $sub = HubSub::getByHashkey($this->topic, $this->callback);
$data = array('sub' => $sub,
'atom' => $atom,
'retries' => $retries);
* @param string $atom well-formed Atom feed
* @param array $pushCallbacks list of callback URLs
*/
- function bulkDistribute($atom, $pushCallbacks)
+ function bulkDistribute($atom, array $pushCallbacks)
{
+ if (empty($pushCallbacks)) {
+ common_log(LOG_ERR, 'Callback list empty for bulkDistribute.');
+ return false;
+ }
$data = array('atom' => $atom,
'topic' => $this->topic,
'pushCallbacks' => $pushCallbacks);
count($pushCallbacks) . " sites");
$qm = QueueManager::get();
$qm->enqueue($data, 'hubprep');
+ return true;
}
/**