]> git.mxchange.org Git - quix0rs-gnu-social.git/commitdiff
Merge branch '0.8.x' into queuemanager
authorEvan Prodromou <evan@controlyourself.ca>
Thu, 2 Jul 2009 12:51:10 +0000 (08:51 -0400)
committerEvan Prodromou <evan@controlyourself.ca>
Thu, 2 Jul 2009 12:51:10 +0000 (08:51 -0400)
1  2 
lib/queuehandler.php
lib/util.php

diff --combined lib/queuehandler.php
index 045432ae52b6f61c970a6f8632687c834d02de43,c1c4f3309a24011b60e72de070a5fcef0267ad56..ddb47a28e947478a97a3ca988ffa81583459d787
   * along with this program.  If not, see <http://www.gnu.org/licenses/>.
   */
  
 -define('CLAIM_TIMEOUT', 1200);
 -
  if (!defined('LACONICA')) { exit(1); }
  
  require_once(INSTALLDIR.'/lib/daemon.php');
  require_once(INSTALLDIR.'/classes/Queue_item.php');
  require_once(INSTALLDIR.'/classes/Notice.php');
  
 +define('CLAIM_TIMEOUT', 1200);
 +define('QUEUE_HANDLER_MISS_IDLE', 10);
 +define('QUEUE_HANDLER_HIT_IDLE', 10);
 +
  class QueueHandler extends Daemon
  {
      var $_id = 'generic';
  
-     function QueueHandler($id=null)
+     function __construct($id=null, $daemonize=true)
      {
+         parent::__construct($daemonize);
          if ($id) {
              $this->set_id($id);
          }
      }
  
 +    function timeout()
 +    {
 +        return null;
 +    }
 +
      function class_name()
      {
          return ucfirst($this->transport()) . 'Handler';
          return true;
      }
  
 -    function db_dispatch() {
 -        do {
 -            $qi = Queue_item::top($this->transport());
 -            if ($qi) {
 -                $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($qi->created));
 -                $notice = Notice::staticGet($qi->notice_id);
 -                if ($notice) {
 -                    $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
 -                    # XXX: what to do if broadcast fails?
 -                    $result = $this->handle_notice($notice);
 -                    if (!$result) {
 -                        $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
 -                        $orig = $qi;
 -                        $qi->claimed = null;
 -                        $qi->update($orig);
 -                        $this->log(LOG_WARNING, 'Abandoned claim for notice ID = ' . $notice->id);
 -                        continue;
 -                    }
 -                    $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
 -                    $notice->free();
 -                    unset($notice);
 -                    $notice = null;
 -                } else {
 -                    $this->log(LOG_WARNING, 'queue item for notice that does not exist');
 -                }
 -                $qi->delete();
 -                $qi->free();
 -                unset($qi);
 -                $this->idle(0);
 -            } else {
 -                $this->clear_old_claims();
 -                $this->idle(5);
 -            }
 -        } while (true);
 -    }
 -
 -    function stomp_dispatch() {
 -
 -        // use an external message queue system via STOMP
 -        require_once("Stomp.php");
 +    function run()
 +    {
 +        if (!$this->start()) {
 +            return false;
 +        }
  
 -        $server = common_config('queue','stomp_server');
 -        $username = common_config('queue', 'stomp_username');
 -        $password = common_config('queue', 'stomp_password');
 +        $this->log(LOG_INFO, 'checking for queued notices');
  
 -        $con = new Stomp($server);
 +        $queue   = $this->transport();
 +        $timeout = $this->timeout();
  
 -        if (!$con->connect($username, $password)) {
 -            $this->log(LOG_ERR, 'Failed to connect to queue server');
 -            return false;
 -        }
 +        $qm = QueueManager::get();
  
 -        $queue_basename = common_config('queue','queue_basename');
 -        // subscribe to the relevant queue (format: basename-transport)
 -        $con->subscribe('/queue/'.$queue_basename.'-'.$this->transport());
 -
 -        do {
 -            $frame = $con->readFrame();
 -            if ($frame) {
 -                $this->log(LOG_INFO, 'Got item enqueued '.common_exact_date($frame->headers['created']));
 -
 -                // XXX: Now the queue handler receives only the ID of the
 -                // notice, and it has to get it from the DB
 -                // A massive improvement would be avoid DB query by transmitting
 -                // all the notice details via queue server...
 -                $notice = Notice::staticGet($frame->body);
 -
 -                if ($notice) {
 -                    $this->log(LOG_INFO, 'broadcasting notice ID = ' . $notice->id);
 -                    $result = $this->handle_notice($notice);
 -                    if ($result) {
 -                        // if the msg has been handled positively, ack it
 -                        // and the queue server will remove it from the queue
 -                        $con->ack($frame);
 -                        $this->log(LOG_INFO, 'finished broadcasting notice ID = ' . $notice->id);
 -                    }
 -                    else {
 -                        // no ack
 -                        $this->log(LOG_WARNING, 'Failed broadcast for notice ID = ' . $notice->id);
 -                    }
 -                    $notice->free();
 -                    unset($notice);
 -                    $notice = null;
 +        while (true) {
 +            $this->log(LOG_DEBUG, 'Checking for notices...');
 +            $notice = $qm->nextItem($queue, $timeout);
 +            if (empty($notice)) {
 +                $this->log(LOG_DEBUG, 'No notices waiting; idling.');
 +                // Nothing in the queue. Do you
 +                // have other tasks, like servicing your
 +                // XMPP connection, to do?
 +                $this->idle(QUEUE_HANDLER_MISS_IDLE);
 +            } else {
 +                $this->log(LOG_INFO, 'Got notice '. $notice->id);
 +                // Yay! Got one!
 +                if ($this->handle_notice($notice)) {
 +                    $this->log(LOG_INFO, 'Successfully handled notice '. $notice->id);
 +                    $qm->done($notice, $queue);
                  } else {
 -                    $this->log(LOG_WARNING, 'queue item for notice that does not exist');
 +                    $this->log(LOG_INFO, 'Failed to handle notice '. $notice->id);
 +                    $qm->fail($notice, $queue);
                  }
 +                // Chance to e.g. service your XMPP connection
 +                $this->log(LOG_DEBUG, 'Idling after success.');
 +                $this->idle(QUEUE_HANDLER_HIT_IDLE);
              }
 -        } while (true);
 -
 -        $con->disconnect();
 -    }
 -
 -    function run()
 -    {
 -        if (!$this->start()) {
 -            return false;
 -        }
 -        $this->log(LOG_INFO, 'checking for queued notices');
 -        if (common_config('queue','subsystem') == 'stomp') {
 -            $this->stomp_dispatch();
 -        }
 -        else {
 -            $this->db_dispatch();
 +            // XXX: when do we give up?
          }
 +
          if (!$this->finish()) {
              return false;
          }
  
      function idle($timeout=0)
      {
 -        if ($timeout>0) {
 +        if ($timeout > 0) {
              sleep($timeout);
          }
      }
  
 -    function clear_old_claims()
 -    {
 -        $qi = new Queue_item();
 -        $qi->transport = $this->transport();
 -        $qi->whereAdd('now() - claimed > '.CLAIM_TIMEOUT);
 -        $qi->update(DB_DATAOBJECT_WHEREADD_ONLY);
 -        $qi->free();
 -        unset($qi);
 -    }
 -
      function log($level, $msg)
      {
          common_log($level, $this->class_name() . ' ('. $this->get_id() .'): '.$msg);
diff --combined lib/util.php
index 656374516182ba15d5faacec5a5860b19fad78f1,461ca15c15dbb0cf20c4c67ecbc5021520e90462..86a0316ea6c8a5def9d74652b1a5b34fd803fb13
@@@ -114,7 -114,7 +114,7 @@@ function common_check_user($nickname, $
          return false;
      }
      $user = User::staticGet('nickname', $nickname);
-     if (is_null($user)) {
+     if (is_null($user) || $user === false) {
          return false;
      } else {
          if (0 == strcmp(common_munge_password($password, $user->id),
@@@ -145,7 -145,6 +145,6 @@@ function common_ensure_session(
      }
      if (!common_have_session()) {
          if (common_config('sessions', 'handle')) {
-             common_log(LOG_INFO, "Using our own session handler");
              Session::setSaveHandler();
          }
          @session_start();
@@@ -500,17 -499,19 +499,19 @@@ function common_linkify($url) 
      // It comes in special'd, so we unspecial it before passing to the stringifying
      // functions
      $url = htmlspecialchars_decode($url);
-     $display = File_redirection::_canonUrl($url);
+     $canon = File_redirection::_canonUrl($url);
      $longurl_data = File_redirection::where($url);
      if (is_array($longurl_data)) {
          $longurl = $longurl_data['url'];
      } elseif (is_string($longurl_data)) {
          $longurl = $longurl_data;
      } else {
-         die('impossible to linkify');
+         throw new ServerException("Can't linkify url '$url'");
      }
  
-     $attrs = array('href' => $longurl, 'rel' => 'external');
+     $attrs = array('href' => $canon, 'rel' => 'external');
  
      $is_attachment = false;
      $attachment_id = null;
          }
      }
  
- // if this URL is an attachment, then we set class='attachment' and id='attahcment-ID'
- // where ID is the id of the attachment for the given URL.
- //
- // we need a better test telling what can be shown as an attachment
- // we're currently picking up oembeds only.
- // I think the best option is another file_view table in the db
- // and associated dbobject.
    // if this URL is an attachment, then we set class='attachment' and id='attahcment-ID'
    // where ID is the id of the attachment for the given URL.
    //
    // we need a better test telling what can be shown as an attachment
    // we're currently picking up oembeds only.
    // I think the best option is another file_view table in the db
    // and associated dbobject.
  
      $query = "select file_oembed.file_id as file_id from file join file_oembed on file.id = file_oembed.file_id where file.url='$longurl'";
      $file = new File;
          $attrs['id'] = "attachment-{$attachment_id}";
      }
  
-     return XMLStringer::estring('a', $attrs, $display);
+     return XMLStringer::estring('a', $attrs, $url);
  }
  
  function common_shorten_links($text)
@@@ -861,45 -862,165 +862,45 @@@ function common_redirect($url, $code=30
  
  function common_broadcast_notice($notice, $remote=false)
  {
 -    if (common_config('queue', 'enabled')) {
 -        // Do it later!
 -        return common_enqueue_notice($notice);
 -    } else {
 -        return common_real_broadcast($notice, $remote);
 -    }
 +    return common_enqueue_notice($notice);
  }
  
  // Stick the notice on the queue
  
  function common_enqueue_notice($notice)
  {
 -    $transports = array('omb', 'sms', 'public', 'twitter', 'facebook', 'ping');
 -
 -    if (common_config('xmpp', 'enabled'))
 -    {
 -        $transports[] = 'jabber';
 -    }
 -
 -    if (common_config('queue','subsystem') == 'stomp') {
 -        common_enqueue_notice_stomp($notice, $transports);
 -    }
 -    else {
 -        common_enqueue_notice_db($notice, $transports);
 -    }
 -    return $result;
 -}
 -
 -function common_enqueue_notice_stomp($notice, $transports)
 -{
 -    // use an external message queue system via STOMP
 -    require_once("Stomp.php");
 +    static $localTransports = array('omb',
 +                                    'twitter',
 +                                    'facebook',
 +                                    'ping');
 +    static $allTransports = array('sms');
  
 -    $server = common_config('queue','stomp_server');
 -    $username = common_config('queue', 'stomp_username');
 -    $password = common_config('queue', 'stomp_password');
 +    $transports = $allTransports;
  
 -    $con = new Stomp($server);
 +    $xmpp = common_config('xmpp', 'enabled');
  
 -    if (!$con->connect($username, $password)) {
 -        common_log(LOG_ERR, 'Failed to connect to queue server');
 -        return false;
 +    if ($xmpp) {
 +        $transports[] = 'jabber';
      }
  
 -    $queue_basename = common_config('queue','queue_basename');
 -
 -    foreach ($transports as $transport) {
 -        $result = $con->send('/queue/'.$queue_basename.'-'.$transport, // QUEUE
 -                             $notice->id,             // BODY of the message
 -                             array ('created' => $notice->created));
 -        if (!$result) {
 -            common_log(LOG_ERR, 'Error sending to '.$transport.' queue');
 -            return false;
 +    if ($notice->is_local == NOTICE_LOCAL_PUBLIC ||
 +        $notice->is_local == NOTICE_LOCAL_NONPUBLIC) {
 +        $transports = array_merge($transports, $localTransports);
 +        if ($xmpp) {
 +            $transports[] = 'public';
          }
 -        common_log(LOG_DEBUG, 'complete remote queueing notice ID = ' . $notice->id . ' for ' . $transport);
      }
  
 -    //send tags as headers, so they can be used as JMS selectors
 -    common_log(LOG_DEBUG, 'searching for tags ' . $notice->id);
 -    $tags = array();
 -    $tag = new Notice_tag();
 -    $tag->notice_id = $notice->id;
 -    if ($tag->find()) {
 -        while ($tag->fetch()) {
 -            common_log(LOG_DEBUG, 'tag found = ' . $tag->tag);
 -            array_push($tags,$tag->tag);
 -        }
 -    }
 -    $tag->free();
 -
 -    $con->send('/topic/laconica.'.$notice->profile_id,
 -               $notice->content,
 -               array(
 -                     'profile_id' => $notice->profile_id,
 -                     'created' => $notice->created,
 -                     'tags' => implode($tags,' - ')
 -                     )
 -               );
 -    common_log(LOG_DEBUG, 'sent to personal topic ' . $notice->id);
 -    $con->send('/topic/laconica.allusers',
 -               $notice->content,
 -               array(
 -                     'profile_id' => $notice->profile_id,
 -                     'created' => $notice->created,
 -                     'tags' => implode($tags,' - ')
 -                     )
 -               );
 -    common_log(LOG_DEBUG, 'sent to catch-all topic ' . $notice->id);
 -    $result = true;
 -}
 +    $qm = QueueManager::get();
  
 -function common_enqueue_notice_db($notice, $transports)
 -{
 -    // in any other case, 'internal'
 -    foreach ($transports as $transport) {
 -        common_enqueue_notice_transport($notice, $transport);
 +    foreach ($transports as $transport)
 +    {
 +        $qm->enqueue($notice, $transport);
      }
 -}
  
 -function common_enqueue_notice_transport($notice, $transport)
 -{
 -    $qi = new Queue_item();
 -    $qi->notice_id = $notice->id;
 -    $qi->transport = $transport;
 -    $qi->created = $notice->created;
 -    $result = $qi->insert();
 -    if (!$result) {
 -        $last_error = &PEAR::getStaticProperty('DB_DataObject','lastError');
 -        common_log(LOG_ERR, 'DB error inserting queue item: ' . $last_error->message);
 -        throw new ServerException('DB error inserting queue item: ' . $last_error->message);
 -    }
 -    common_log(LOG_DEBUG, 'complete queueing notice ID = ' . $notice->id . ' for ' . $transport);
      return true;
  }
  
 -function common_real_broadcast($notice, $remote=false)
 -{
 -    $success = true;
 -    if (!$remote) {
 -        // Make sure we have the OMB stuff
 -        require_once(INSTALLDIR.'/lib/omb.php');
 -        $success = omb_broadcast_remote_subscribers($notice);
 -        if (!$success) {
 -            common_log(LOG_ERR, 'Error in OMB broadcast for notice ' . $notice->id);
 -        }
 -    }
 -    if ($success) {
 -        require_once(INSTALLDIR.'/lib/jabber.php');
 -        $success = jabber_broadcast_notice($notice);
 -        if (!$success) {
 -            common_log(LOG_ERR, 'Error in jabber broadcast for notice ' . $notice->id);
 -        }
 -    }
 -    if ($success) {
 -        require_once(INSTALLDIR.'/lib/mail.php');
 -        $success = mail_broadcast_notice_sms($notice);
 -        if (!$success) {
 -            common_log(LOG_ERR, 'Error in sms broadcast for notice ' . $notice->id);
 -        }
 -    }
 -    if ($success) {
 -        $success = jabber_public_notice($notice);
 -        if (!$success) {
 -            common_log(LOG_ERR, 'Error in public broadcast for notice ' . $notice->id);
 -        }
 -    }
 -    if ($success) {
 -        $success = broadcast_twitter($notice);
 -        if (!$success) {
 -            common_log(LOG_ERR, 'Error in Twitter broadcast for notice ' . $notice->id);
 -        }
 -    }
 -
 -    // XXX: Do a real-time FB broadcast here?
 -
 -    // XXX: broadcast notices to other IM
 -    return $success;
 -}
 -
  function common_broadcast_profile($profile)
  {
      // XXX: optionally use a queue system like http://code.google.com/p/microapps/wiki/NQDQ
@@@ -982,15 -1103,20 +983,20 @@@ function common_ensure_syslog(
      }
  }
  
+ function common_log_line($priority, $msg)
+ {
+     static $syslog_priorities = array('LOG_EMERG', 'LOG_ALERT', 'LOG_CRIT', 'LOG_ERR',
+                                       'LOG_WARNING', 'LOG_NOTICE', 'LOG_INFO', 'LOG_DEBUG');
+     return date('Y-m-d H:i:s') . ' ' . $syslog_priorities[$priority] . ': ' . $msg . "\n";
+ }
  function common_log($priority, $msg, $filename=null)
  {
      $logfile = common_config('site', 'logfile');
      if ($logfile) {
          $log = fopen($logfile, "a");
          if ($log) {
-             static $syslog_priorities = array('LOG_EMERG', 'LOG_ALERT', 'LOG_CRIT', 'LOG_ERR',
-                                               'LOG_WARNING', 'LOG_NOTICE', 'LOG_INFO', 'LOG_DEBUG');
-             $output = date('Y-m-d H:i:s') . ' ' . $syslog_priorities[$priority] . ': ' . $msg . "\n";
+             $output = common_log_line($priority, $msg);
              fwrite($log, $output);
              fclose($log);
          }
@@@ -1221,18 -1347,39 +1227,39 @@@ function common_canonical_sms($sms
  function common_error_handler($errno, $errstr, $errfile, $errline, $errcontext)
  {
      switch ($errno) {
+      case E_ERROR:
+      case E_COMPILE_ERROR:
+      case E_CORE_ERROR:
       case E_USER_ERROR:
-         common_log(LOG_ERR, "[$errno] $errstr ($errfile:$errline)");
-         exit(1);
+      case E_PARSE:
+      case E_RECOVERABLE_ERROR:
+         common_log(LOG_ERR, "[$errno] $errstr ($errfile:$errline) [ABORT]");
+         die();
          break;
  
+      case E_WARNING:
+      case E_COMPILE_WARNING:
+      case E_CORE_WARNING:
       case E_USER_WARNING:
          common_log(LOG_WARNING, "[$errno] $errstr ($errfile:$errline)");
          break;
  
+      case E_NOTICE:
       case E_USER_NOTICE:
          common_log(LOG_NOTICE, "[$errno] $errstr ($errfile:$errline)");
          break;
+      case E_STRICT:
+      case E_DEPRECATED:
+      case E_USER_DEPRECATED:
+         // XXX: config variable to log this stuff, too
+         break;
+      default:
+         common_log(LOG_ERR, "[$errno] $errstr ($errfile:$errline) [UNKNOWN LEVEL, die()'ing]");
+         die();
+         break;
      }
  
      // FIXME: show error page if we're on the Web