+ /**
+ * Send a subscription request to the hub for this feed.
+ * The hub will later send us a confirmation POST to /main/push/callback.
+ *
+ * @return bool true on success, false on failure
+ * @throws ServerException if feed state is not valid
+ */
+ public function subscribe()
+ {
+ $feedsub = FeedSub::ensureFeed($this->feeduri);
+ if ($feedsub->sub_state == 'active' || $feedsub->sub_state == 'subscribe') {
+ return true;
+ } else if ($feedsub->sub_state == '' || $feedsub->sub_state == 'inactive') {
+ return $feedsub->subscribe();
+ } else if ('unsubscribe') {
+ throw new FeedSubException("Unsub is pending, can't subscribe...");
+ }
+ }
+
+ /**
+ * Send a PuSH unsubscription request to the hub for this feed.
+ * The hub will later send us a confirmation POST to /main/push/callback.
+ *
+ * @return bool true on success, false on failure
+ * @throws ServerException if feed state is not valid
+ */
+ public function unsubscribe() {
+ $feedsub = FeedSub::staticGet('uri', $this->feeduri);
+ if ($feedsub->sub_state == 'active') {
+ return $feedsub->unsubscribe();
+ } else if ($feedsub->sub_state == '' || $feedsub->sub_state == 'inactive' || $feedsub->sub_state == 'unsubscribe') {
+ return true;
+ } else if ($feedsub->sub_state == 'subscribe') {
+ throw new FeedSubException("Feed is awaiting subscription, can't unsub...");
+ }
+ }
+
+ /**
+ * Send an Activity Streams notification to the remote Salmon endpoint,
+ * if so configured.
+ *
+ * @param Profile $actor Actor who did the activity
+ * @param string $verb Activity::SUBSCRIBE or Activity::JOIN
+ * @param Object $object object of the action; must define asActivityNoun($tag)
+ */
+ public function notify($actor, $verb, $object=null)
+ {
+ if (!($actor instanceof Profile)) {
+ $type = gettype($actor);
+ if ($type == 'object') {
+ $type = get_class($actor);
+ }
+ throw new ServerException("Invalid actor passed to " . __METHOD__ . ": " . $type);
+ }
+ if ($object == null) {
+ $object = $this;
+ }
+ if ($this->salmonuri) {
+
+ $text = 'update';
+ $id = TagURI::mint('%s:%s:%s',
+ $verb,
+ $actor->getURI(),
+ common_date_iso8601(time()));
+
+ // @fixme consolidate all these NS settings somewhere
+ $attributes = array('xmlns' => Activity::ATOM,
+ 'xmlns:activity' => 'http://activitystrea.ms/spec/1.0/',
+ 'xmlns:thr' => 'http://purl.org/syndication/thread/1.0',
+ 'xmlns:georss' => 'http://www.georss.org/georss',
+ 'xmlns:ostatus' => 'http://ostatus.org/schema/1.0');
+
+ $entry = new XMLStringer();
+ $entry->elementStart('entry', $attributes);
+ $entry->element('id', null, $id);
+ $entry->element('title', null, $text);
+ $entry->element('summary', null, $text);
+ $entry->element('published', null, common_date_w3dtf(common_sql_now()));
+
+ $entry->element('activity:verb', null, $verb);
+ $entry->raw($actor->asAtomAuthor());
+ $entry->raw($actor->asActivityActor());
+ $entry->raw($object->asActivityNoun('object'));
+ $entry->elementEnd('entry');
+
+ $xml = $entry->getString();
+ common_log(LOG_INFO, "Posting to Salmon endpoint $this->salmonuri: $xml");
+
+ $salmon = new Salmon(); // ?
+ $salmon->post($this->salmonuri, $xml);
+ }
+ }
+
+ public function notifyActivity($activity)
+ {
+ if ($this->salmonuri) {
+
+ $xml = $activity->asString(true);
+
+ $salmon = new Salmon(); // ?
+
+ $salmon->post($this->salmonuri, $xml);
+ }
+
+ return;
+ }
+
+ function getBestName()
+ {
+ if ($this->isGroup()) {
+ return $this->localGroup()->getBestName();
+ } else {
+ return $this->localProfile()->getBestName();
+ }
+ }
+
+ function atomFeed($actor)
+ {
+ $feed = new Atom10Feed();
+ // @fixme should these be set up somewhere else?
+ $feed->addNamespace('activity', 'http://activitystrea.ms/spec/1.0/');
+ $feed->addNamespace('thr', 'http://purl.org/syndication/thread/1.0');
+ $feed->addNamespace('georss', 'http://www.georss.org/georss');
+ $feed->addNamespace('ostatus', 'http://ostatus.org/schema/1.0');
+
+ $taguribase = common_config('integration', 'taguri');
+ $feed->setId("tag:{$taguribase}:UserTimeline:{$actor->id}"); // ???
+
+ $feed->setTitle($actor->getBestName() . ' timeline'); // @fixme
+ $feed->setUpdated(time());
+ $feed->setPublished(time());
+
+ $feed->addLink(common_local_url('ApiTimelineUser',
+ array('id' => $actor->id,
+ 'type' => 'atom')),
+ array('rel' => 'self',
+ 'type' => 'application/atom+xml'));
+
+ $feed->addLink(common_local_url('userbyid',
+ array('id' => $actor->id)),
+ array('rel' => 'alternate',
+ 'type' => 'text/html'));
+
+ return $feed;
+ }
+
+ /**
+ * Read and post notices for updates from the feed.
+ * Currently assumes that all items in the feed are new,
+ * coming from a PuSH hub.
+ *
+ * @param DOMDocument $feed
+ */
+ public function processFeed($feed)
+ {
+ $entries = $feed->getElementsByTagNameNS(Activity::ATOM, 'entry');
+ if ($entries->length == 0) {
+ common_log(LOG_ERR, __METHOD__ . ": no entries in feed update, ignoring");
+ return;
+ }
+
+ for ($i = 0; $i < $entries->length; $i++) {
+ $entry = $entries->item($i);
+ $this->processEntry($entry, $feed);
+ }
+ }
+
+ /**
+ * Process a posted entry from this feed source.
+ *
+ * @param DOMElement $entry
+ * @param DOMElement $feed for context
+ */
+ protected function processEntry($entry, $feed)
+ {
+ $activity = new Activity($entry, $feed);
+
+ $debug = var_export($activity, true);
+ common_log(LOG_DEBUG, $debug);
+
+ if ($activity->verb == ActivityVerb::POST) {
+ $this->processPost($activity);
+ } else {
+ common_log(LOG_INFO, "Ignoring activity with unrecognized verb $activity->verb");
+ }
+ }
+
+ /**
+ * Process an incoming post activity from this remote feed.
+ * @param Activity $activity
+ */
+ protected function processPost($activity)
+ {
+ if ($this->isGroup()) {
+ // @fixme validate these profiles in some way!
+ $oprofile = self::ensureActorProfile($activity);
+ } else {
+ $actorUri = self::getActorProfileURI($activity);
+ if ($actorUri == $this->uri) {
+ // @fixme check if profile info has changed and update it