]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
Work in progress on site streams-aware TwitterDaemon
authorBrion Vibber <brion@pobox.com>
Fri, 29 Oct 2010 20:18:03 +0000 (13:18 -0700)
committerBrion Vibber <brion@pobox.com>
Fri, 29 Oct 2010 20:18:03 +0000 (13:18 -0700)
plugins/TwitterBridge/daemons/twitterdaemon.php

index f97f3179b8dd17d7daff671b9c67a050c9754fbf..851d191dd2296fe5a02f2ae3e84f77288975fd80 100644 (file)
@@ -114,7 +114,7 @@ class TwitterManager extends IoManager
      * @fixme check their last-id and check whether we'll need to do a manual pull.
      * @fixme abstract out the fetching so we can work over multiple sites.
      */
-    function initStreams()
+    protected function initStreams()
     {
         // Pull Twitter user IDs for all users we want to pull data for
         $flink = new Foreign_link();
@@ -146,7 +146,7 @@ class TwitterManager extends IoManager
      *
      * @param $users array of Twitter-side user IDs
      */
-    function spawnStream($users)
+    protected function spawnStream($users)
     {
         $stream = $this->initSiteStream();
         $stream->followUsers($userIds);
@@ -168,7 +168,7 @@ class TwitterManager extends IoManager
      *
      * @return TwitterStreamReader
      */
-    function initSiteStream()
+    protected function initSiteStream()
     {
         $auth = $this->siteStreamAuth();
         $stream = new TwitterSiteStream($auth);
@@ -190,7 +190,7 @@ class TwitterManager extends IoManager
      *
      * @return TwitterOAuthClient
      */
-    function siteStreamAuth()
+    protected function siteStreamAuth()
     {
         $token = common_config('twitter', 'stream_token');
         $secret = common_config('twitter', 'stream_secret');
@@ -205,7 +205,7 @@ class TwitterManager extends IoManager
      *
      * @return array of resources
      */
-    function getSockets()
+    public function getSockets()
     {
         $sockets = array();
         foreach ($this->streams as $stream) {
@@ -223,7 +223,7 @@ class TwitterManager extends IoManager
      * @param resource $socket
      * @return boolean success
      */
-    function handleInput($socket)
+    public function handleInput($socket)
     {
         foreach ($this->streams as $stream) {
             foreach ($stream->getSockets() as $aSocket) {
@@ -236,11 +236,12 @@ class TwitterManager extends IoManager
     }
 
     /**
-     * Start the system up!
+     * Start the i/o system up! Prepare our connections and start opening them.
+     *
      * @fixme do some rate-limiting on the stream setup
      * @fixme do some sensible backoff on failure etc
      */
-    function start()
+    public function start()
     {
         $this->initStreams();
         foreach ($this->streams as $stream) {
@@ -249,7 +250,10 @@ class TwitterManager extends IoManager
         return true;
     }
 
-    function finish()
+    /**
+     * Close down our connections when the daemon wraps up for business.
+     */
+    public function finish()
     {
         foreach ($this->streams as $index => $stream) {
             $stream->close();
@@ -280,33 +284,21 @@ class TwitterManager extends IoManager
 
     /**
      * Event callback notifying that a user has a new message in their home timeline.
+     * We store the incoming message into the queues for processing, keeping our own
+     * daemon running as shiny-fast as possible.
      *
-     * @param object $data JSON data: Twitter status update
-     */
-    protected function onTwitterStatus($data, $context)
-    {
-        $importer = new TwitterImport();
-        $notice = $importer->importStatus($data);
-        if ($notice) {
-            $user = $this->getTwitterUser($context);
-            Inbox::insertNotice($user->id, $notice->id);
-        }
-    }
-
-    /**
-     * @fixme what about handling multiple sites?
+     * @param object $status JSON data: Twitter status update
+     * @fixme in all-sites mode we may need to route queue items into another site's
+     *        destination queues, or multiple sites.
      */
-    function getTwitterUser($context)
+    protected function onTwitterStatus($status, $context)
     {
-        if ($context->source != 'sitestream') {
-            throw new ServerException("Unexpected stream source");
-        }
-        $flink = Foreign_link::getByForeignID(TWITTER_SERVICE, $context->for_user);
-        if ($flink) {
-            return $flink->getUser();
-        } else {
-            throw new ServerException("No local user for this Twitter ID");
-        }
+        $data = array(
+            'status' => $status,
+            'for_user' => $context->for_user,
+        );
+        $qm = QueueManager::get();
+        $qm->enqueue($data, 'tweetin');
     }
 }