PuSH subscription flow:
$profile->subscribe()
- generate random verification token
- save to verify_token
sends a sub request to the hub...
main/push/callback
* Higher-level behavior building OStatus stuff on top is handled
* under Ostatus_profile.
*/
-class FeedSub extends Memcached_DataObject
+class FeedSub extends Managed_DataObject
{
public $__table = 'feedsub';
// PuSH subscription data
public $huburi;
public $secret;
- public $verify_token;
- public $sub_state; // subscribe, active, unsubscribe, inactive
+ public $sub_state; // subscribe, active, unsubscribe, inactive, nohub
public $sub_start;
public $sub_end;
public $last_update;
public $created;
public $modified;
- public /*static*/ function staticGet($k, $v=null)
+ public static function schemaDef()
{
- return parent::staticGet(__CLASS__, $k, $v);
+ return array(
+ 'fields' => array(
+ 'id' => array('type' => 'serial', 'not null' => true, 'description' => 'FeedSub local unique id'),
+ 'uri' => array('type' => 'varchar', 'not null' => true, 'length' => 255, 'description' => 'FeedSub uri'),
+ 'huburi' => array('type' => 'text', 'description' => 'FeedSub hub-uri'),
+ 'secret' => array('type' => 'text', 'description' => 'FeedSub stored secret'),
+ 'sub_state' => array('type' => 'enum("subscribe","active","unsubscribe","inactive","nohub")', 'not null' => true, 'description' => 'subscription state'),
+ 'sub_start' => array('type' => 'datetime', 'description' => 'subscription start'),
+ 'sub_end' => array('type' => 'datetime', 'description' => 'subscription end'),
+ 'last_update' => array('type' => 'datetime', 'description' => 'when this record was last updated'),
+ 'created' => array('type' => 'datetime', 'not null' => true, 'description' => 'date this record was created'),
+ 'modified' => array('type' => 'timestamp', 'not null' => true, 'description' => 'date this record was modified'),
+ ),
+ 'primary key' => array('id'),
+ 'unique keys' => array(
+ 'feedsub_uri_key' => array('uri'),
+ ),
+ );
}
/**
- * return table definition for DB_DataObject
- *
- * DB_DataObject needs to know something about the table to manipulate
- * instances. This method provides all the DB_DataObject needs to know.
- *
- * @return array array of column definitions
+ * Get the feed uri (http/https)
*/
- function table()
- {
- return array('id' => DB_DATAOBJECT_INT + DB_DATAOBJECT_NOTNULL,
- 'uri' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
- 'huburi' => DB_DATAOBJECT_STR,
- 'secret' => DB_DATAOBJECT_STR,
- 'verify_token' => DB_DATAOBJECT_STR,
- 'sub_state' => DB_DATAOBJECT_STR + DB_DATAOBJECT_NOTNULL,
- 'sub_start' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
- 'sub_end' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
- 'last_update' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME,
- 'created' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL,
- 'modified' => DB_DATAOBJECT_STR + DB_DATAOBJECT_DATE + DB_DATAOBJECT_TIME + DB_DATAOBJECT_NOTNULL);
- }
-
- static function schemaDef()
+ public function getUri()
{
- return array(new ColumnDef('id', 'integer',
- /*size*/ null,
- /*nullable*/ false,
- /*key*/ 'PRI',
- /*default*/ null,
- /*extra*/ null,
- /*auto_increment*/ true),
- new ColumnDef('uri', 'varchar',
- 255, false, 'UNI'),
- new ColumnDef('huburi', 'text',
- null, true),
- new ColumnDef('verify_token', 'text',
- null, true),
- new ColumnDef('secret', 'text',
- null, true),
- new ColumnDef('sub_state', "enum('subscribe','active','unsubscribe','inactive')",
- null, false),
- new ColumnDef('sub_start', 'datetime',
- null, true),
- new ColumnDef('sub_end', 'datetime',
- null, true),
- new ColumnDef('last_update', 'datetime',
- null, false),
- new ColumnDef('created', 'datetime',
- null, false),
- new ColumnDef('modified', 'datetime',
- null, false));
- }
-
- /**
- * return key definitions for DB_DataObject
- *
- * DB_DataObject needs to know about keys that the table has; this function
- * defines them.
- *
- * @return array key definitions
- */
- function keys()
- {
- return array_keys($this->keyTypes());
+ if (empty($this->uri)) {
+ throw new NoUriException($this);
+ }
+ return $this->uri;
}
/**
- * return key definitions for Memcached_DataObject
- *
- * Our caching system uses the same key definitions, but uses a different
- * method to get them.
+ * Do we have a hub? Then we are a PuSH feed.
+ * https://en.wikipedia.org/wiki/PubSubHubbub
*
- * @return array key definitions
+ * If huburi is empty, then doublecheck that we are not using
+ * a fallback hub. If there is a fallback hub, it is only if the
+ * sub_state is "nohub" that we assume it's not a PuSH feed.
*/
- function keyTypes()
- {
- return array('id' => 'K', 'uri' => 'U');
- }
-
- function sequenceKey()
+ public function isPuSH()
{
- return array('id', true, false);
+ if (empty($this->huburi)
+ && (!common_config('feedsub', 'fallback_hub')
+ || $this->sub_state === 'nohub')) {
+ // Here we have no huburi set. Also, either there is no
+ // fallback hub configured or sub_state is "nohub".
+ return false;
+ }
+ return true;
}
/**
public function localProfile()
{
if ($this->profile_id) {
- return Profile::staticGet('id', $this->profile_id);
+ return Profile::getKV('id', $this->profile_id);
}
return null;
}
public function localGroup()
{
if ($this->group_id) {
- return User_group::staticGet('id', $this->group_id);
+ return User_group::getKV('id', $this->group_id);
}
return null;
}
*/
public static function ensureFeed($feeduri)
{
- $current = self::staticGet('uri', $feeduri);
- if ($current) {
+ $current = self::getKV('uri', $feeduri);
+ if ($current instanceof FeedSub) {
return $current;
}
$discover->discoverFromFeedURL($feeduri);
$huburi = $discover->getHubLink();
- if (!$huburi && !common_config('feedsub', 'fallback_hub')) {
+ if (!$huburi && !common_config('feedsub', 'fallback_hub') && !common_config('feedsub', 'nohub')) {
throw new FeedSubNoHubException();
}
$feedsub->modified = common_sql_now();
$result = $feedsub->insert();
- if (empty($result)) {
+ if ($result === false) {
throw new FeedDBException($feedsub);
}
* Send a subscription request to the hub for this feed.
* The hub will later send us a confirmation POST to /main/push/callback.
*
- * @return bool true on success, false on failure
+ * @return void
* @throws ServerException if feed state is not valid
*/
- public function subscribe($mode='subscribe')
+ public function subscribe()
{
if ($this->sub_state && $this->sub_state != 'inactive') {
- common_log(LOG_WARNING, "Attempting to (re)start PuSH subscription to $this->uri in unexpected state $this->sub_state");
+ common_log(LOG_WARNING, sprintf('Attempting to (re)start PuSH subscription to %s in unexpected state %s', $this->getUri(), $this->sub_state));
}
+
+ if (!Event::handle('FeedSubscribe', array($this))) {
+ // A plugin handled it
+ return;
+ }
+
if (empty($this->huburi)) {
if (common_config('feedsub', 'fallback_hub')) {
// No native hub on this feed?
// Use our fallback hub, which handles polling on our behalf.
} else if (common_config('feedsub', 'nohub')) {
- // Fake it! We're just testing remote feeds w/o hubs.
- // We'll never actually get updates in this mode.
- return true;
+ // For this to actually work, we'll need some polling mechanism.
+ // The FeedPoller plugin should take care of it.
+ return;
} else {
+ // TRANS: Server exception.
throw new ServerException(_m('Attempting to start PuSH subscription for feed with no hub.'));
}
}
- return $this->doSubscribe('subscribe');
+ $this->doSubscribe('subscribe');
}
/**
* the system is using it. Most callers will want garbageCollect() instead,
* which confirms there's no uses left.
*
- * @return bool true on success, false on failure
* @throws ServerException if feed state is not valid
*/
public function unsubscribe() {
if ($this->sub_state != 'active') {
- common_log(LOG_WARNING, "Attempting to (re)end PuSH subscription to $this->uri in unexpected state $this->sub_state");
+ common_log(LOG_WARNING, sprintf('Attempting to (re)end PuSH subscription to %s in unexpected state %s', $this->getUri(), $this->sub_state));
+ }
+
+ if (!Event::handle('FeedUnsubscribe', array($this))) {
+ // A plugin handled it
+ return;
}
+
if (empty($this->huburi)) {
if (common_config('feedsub', 'fallback_hub')) {
// No native hub on this feed?
// Use our fallback hub, which handles polling on our behalf.
} else if (common_config('feedsub', 'nohub')) {
- // Fake it! We're just testing remote feeds w/o hubs.
- // We'll never actually get updates in this mode.
- return true;
+ // We need a feedpolling plugin (like FeedPoller) active so it will
+ // set the 'nohub' state to 'inactive' for us.
+ return;
} else {
+ // TRANS: Server exception.
throw new ServerException(_m('Attempting to end PuSH subscription for feed with no hub.'));
}
}
- return $this->doSubscribe('unsubscribe');
+ $this->doSubscribe('unsubscribe');
}
/**
* make sure it's inactive, unsubscribing if necessary.
*
* @return boolean true if the subscription is now inactive, false if still active.
+ * @throws NoProfileException in FeedSubSubscriberCount for missing Profile entries
+ * @throws Exception if something goes wrong in unsubscribe() method
*/
public function garbageCollect()
{
if ($this->sub_state == '' || $this->sub_state == 'inactive') {
// No active PuSH subscription, we can just leave it be.
return true;
- } else {
- // PuSH subscription is either active or in an indeterminate state.
- // Check if we're out of subscribers, and if so send an unsubscribe.
- $count = 0;
- Event::handle('FeedSubSubscriberCount', array($this, &$count));
-
- if ($count) {
- common_log(LOG_INFO, __METHOD__ . ': ok, ' . $count . ' user(s) left for ' . $this->uri);
- return false;
- } else {
- common_log(LOG_INFO, __METHOD__ . ': unsubscribing, no users left for ' . $this->uri);
- return $this->unsubscribe();
- }
}
+
+ // PuSH subscription is either active or in an indeterminate state.
+ // Check if we're out of subscribers, and if so send an unsubscribe.
+ $count = 0;
+ Event::handle('FeedSubSubscriberCount', array($this, &$count));
+
+ if ($count > 0) {
+ common_log(LOG_INFO, __METHOD__ . ': ok, ' . $count . ' user(s) left for ' . $this->getUri());
+ return false;
+ }
+
+ common_log(LOG_INFO, __METHOD__ . ': unsubscribing, no users left for ' . $this->getUri());
+ // Unsubscribe throws various Exceptions on failure
+ $this->unsubscribe();
+
+ return true;
+ }
+
+ static public function renewalCheck()
+ {
+ $fs = new FeedSub();
+ // the "" empty string check is because we historically haven't saved unsubscribed feeds as NULL
+ $fs->whereAdd('sub_end IS NOT NULL AND sub_end!="" AND sub_end < NOW() - INTERVAL 1 day');
+ if (!$fs->find()) { // find can be both false and 0, depending on why nothing was found
+ throw new NoResultException($fs);
+ }
+ return $fs;
+ }
+
+ public function renew()
+ {
+ $this->subscribe();
}
+ /**
+ * Setting to subscribe means it is _waiting_ to become active. This
+ * cannot be done in a transaction because there is a chance that the
+ * remote script we're calling (as in the case of PuSHpress) performs
+ * the lookup _while_ we're POSTing data, which means the transaction
+ * never completes (PushcallbackAction gets an 'inactive' state).
+ *
+ * @return boolean true when everything is ok (throws Exception on fail)
+ * @throws Exception on failure, can be HTTPClient's or our own.
+ */
protected function doSubscribe($mode)
{
$orig = clone($this);
- $this->verify_token = common_good_rand(16);
if ($mode == 'subscribe') {
- $this->secret = common_good_rand(32);
+ $this->secret = common_random_hexstr(32);
}
$this->sub_state = $mode;
$this->update($orig);
$headers = array('Content-Type: application/x-www-form-urlencoded');
$post = array('hub.mode' => $mode,
'hub.callback' => $callback,
- 'hub.verify' => 'sync',
- 'hub.verify_token' => $this->verify_token,
+ 'hub.verify' => 'async', // TODO: deprecated, remove when noone uses PuSH <0.4 (only 'async' method used there)
+ 'hub.verify_token' => 'Deprecated-since-PuSH-0.4', // TODO: rm!
+
'hub.secret' => $this->secret,
- 'hub.topic' => $this->uri);
+ 'hub.topic' => $this->getUri());
$client = new HTTPClient();
if ($this->huburi) {
$hub = $this->huburi;
$client->setAuth($u, $p);
}
} else {
- throw new FeedSubException('WTF?');
+ throw new FeedSubException('Server could not find a usable PuSH hub.');
}
}
$response = $client->post($hub, $headers, $post);
$status = $response->getStatus();
+ // PuSH specificed response status code
if ($status == 202) {
common_log(LOG_INFO, __METHOD__ . ': sub req ok, awaiting verification callback');
- return true;
- } else if ($status == 204) {
- common_log(LOG_INFO, __METHOD__ . ': sub req ok and verified');
- return true;
+ return;
} else if ($status >= 200 && $status < 300) {
common_log(LOG_ERR, __METHOD__ . ": sub req returned unexpected HTTP $status: " . $response->getBody());
- return false;
} else {
common_log(LOG_ERR, __METHOD__ . ": sub req failed with HTTP $status: " . $response->getBody());
- return false;
}
} catch (Exception $e) {
- // wtf!
- common_log(LOG_ERR, __METHOD__ . ": error \"{$e->getMessage()}\" hitting hub $this->huburi subscribing to $this->uri");
+ common_log(LOG_ERR, __METHOD__ . ": error \"{$e->getMessage()}\" hitting hub {$this->huburi} subscribing to {$this->getUri()}");
+ // Reset the subscription state.
$orig = clone($this);
- $this->verify_token = '';
$this->sub_state = 'inactive';
$this->update($orig);
- unset($orig);
- return false;
+ // Throw the Exception again.
+ throw $e;
}
+ throw new ServerException("{$mode} request failed.");
}
/**
*
* @param int $lease_seconds provided hub.lease_seconds parameter, if given
*/
- public function confirmSubscribe($lease_seconds=0)
+ public function confirmSubscribe($lease_seconds)
{
$original = clone($this);
if ($lease_seconds > 0) {
$this->sub_end = common_sql_date(time() + $lease_seconds);
} else {
- $this->sub_end = null;
+ $this->sub_end = null; // Backwards compatibility to StatusNet (PuSH <0.4 supported permanent subs)
}
$this->modified = common_sql_now();
$original = clone($this);
// @fixme these should all be null, but DB_DataObject doesn't save null values...?????
- $this->verify_token = '';
$this->secret = '';
$this->sub_state = '';
$this->sub_start = '';
*/
public function receive($post, $hmac)
{
- common_log(LOG_INFO, __METHOD__ . ": packet for \"$this->uri\"! $hmac $post");
+ common_log(LOG_INFO, __METHOD__ . ": packet for \"" . $this->getUri() . "\"! $hmac $post");
- if ($this->sub_state != 'active') {
- common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH for inactive feed $this->uri (in state '$this->sub_state')");
+ if (!in_array($this->sub_state, array('active', 'nohub'))) {
+ common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH for inactive feed " . $this->getUri() . " (in state '$this->sub_state')");
return;
}
if ($tempfile) {
file_put_contents($tempfile, $post);
}
- common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bad SHA-1 HMAC: got $their_hmac, expected $our_hmac for feed $this->uri on $this->huburi; saved to $tempfile");
+ common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bad SHA-1 HMAC: got $their_hmac, expected $our_hmac for feed " . $this->getUri() . " on $this->huburi; saved to $tempfile");
} else {
- common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bad SHA-1 HMAC: got $their_hmac, expected $our_hmac for feed $this->uri on $this->huburi");
+ common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bad SHA-1 HMAC: got $their_hmac, expected $our_hmac for feed " . $this->getUri() . " on $this->huburi");
}
} else {
common_log(LOG_ERR, __METHOD__ . ": ignoring PuSH with bogus HMAC '$hmac'");
}
return false;
}
+
+ public function delete($useWhere=false)
+ {
+ try {
+ $oprofile = Ostatus_profile::getKV('feeduri', $this->getUri());
+ if ($oprofile instanceof Ostatus_profile) {
+ // Check if there's a profile. If not, handle the NoProfileException below
+ $profile = $oprofile->localProfile();
+ }
+ } catch (NoProfileException $e) {
+ // If the Ostatus_profile has no local Profile bound to it, let's clean it out at the same time
+ $oprofile->delete();
+ } catch (NoUriException $e) {
+ // FeedSub->getUri() can throw a NoUriException, let's just go ahead and delete it
+ }
+ return parent::delete($useWhere);
+ }
}