]> git.mxchange.org Git - friendica.git/commitdiff
Merge pull request #3473 from Quix0r/rewrites/coding-convention-split2-6-2
authorHypolite Petovan <mrpetovan@gmail.com>
Sun, 18 Jun 2017 22:44:34 +0000 (18:44 -0400)
committerGitHub <noreply@github.com>
Sun, 18 Jun 2017 22:44:34 +0000 (18:44 -0400)
Coding convention applied split 2-6-2 (of 2-14-2)

15 files changed:
boot.php
doc/database/db_workerqueue.md
doc/htconfig.md
include/cron.php
include/dba.php
include/dbstructure.php
include/diaspora.php
include/items.php
include/notifier.php
include/poller.php
include/pubsubpublish.php
mod/display.php
src/App.php
src/Util/Lock.php
update.php

index 5417e0fa23646708d77ac1a252f2470fc1593bb6..cfb82f5381b36c2390f4878c64e81c6dc335195f 100644 (file)
--- a/boot.php
+++ b/boot.php
@@ -22,6 +22,7 @@ require_once(__DIR__ . DIRECTORY_SEPARATOR . 'vendor' . DIRECTORY_SEPARATOR . 'a
 
 use Friendica\App;
 use Friendica\Core\Config;
+use Friendica\Util\Lock;
 
 require_once 'include/config.php';
 require_once 'include/network.php';
@@ -41,7 +42,7 @@ define ( 'FRIENDICA_PLATFORM',     'Friendica');
 define ( 'FRIENDICA_CODENAME',     'Asparagus');
 define ( 'FRIENDICA_VERSION',      '3.5.3-dev' );
 define ( 'DFRN_PROTOCOL_VERSION',  '2.23'    );
-define ( 'DB_UPDATE_VERSION',      1229      );
+define ( 'DB_UPDATE_VERSION',      1230      );
 
 /**
  * @brief Constant with a HTML line break.
@@ -1069,6 +1070,7 @@ function proc_run($cmd) {
 
        $priority = PRIORITY_MEDIUM;
        $dont_fork = get_config("system", "worker_dont_fork");
+       $created = datetime_convert();
 
        if (is_int($run_parameter)) {
                $priority = $run_parameter;
@@ -1076,6 +1078,9 @@ function proc_run($cmd) {
                if (isset($run_parameter['priority'])) {
                        $priority = $run_parameter['priority'];
                }
+               if (isset($run_parameter['created'])) {
+                       $created = $run_parameter['created'];
+               }
                if (isset($run_parameter['dont_fork'])) {
                        $dont_fork = $run_parameter['dont_fork'];
                }
@@ -1088,7 +1093,7 @@ function proc_run($cmd) {
        $found = dba::select('workerqueue', array('id'), array('parameter' => $parameters), array('limit' => 1));
 
        if (!dbm::is_result($found)) {
-               dba::insert('workerqueue', array('parameter' => $parameters, 'created' => datetime_convert(), 'priority' => $priority));
+               dba::insert('workerqueue', array('parameter' => $parameters, 'created' => $created, 'priority' => $priority));
        }
 
        // Should we quit and wait for the poller to be called as a cronjob?
@@ -1096,8 +1101,16 @@ function proc_run($cmd) {
                return;
        }
 
+       // If there is a lock then we don't have to check for too much worker
+       if (!Lock::set('poller_worker', 0)) {
+               return;
+       }
+
        // If there are already enough workers running, don't fork another one
-       if (poller_too_much_workers()) {
+       $quit = poller_too_much_workers();
+       Lock::remove('poller_worker');
+
+       if ($quit) {
                return;
        }
 
index 8ab68466be9f02a9e03467a6a20ec95c00cb8b03..182358a4c468a3452174c574f454a7eec2db2425 100644 (file)
@@ -9,5 +9,6 @@ Table workerqueue
 | created   |                  | datetime            | NO   | MUL | 0001-01-01 00:00:00 |                |
 | pid       |                  | int(11)             | NO   |     | 0                   |                |
 | executed  |                  | datetime            | NO   |     | 0001-01-01 00:00:00 |                |
+| done      | set to 1 if done | tinyint(1)          | NO   |     | 0                   |                |
 
 Return to [database documentation](help/database)
index 9c556bb7cd49a647a588f173a508d86c1c3d046f..a452ecfc14f7bc5a70dde7163340f41d699388a4 100644 (file)
@@ -37,7 +37,6 @@ Example: To set the directory value please add this line to your .htconfig.php:
 * **dbclean** (Boolean) - Enable the automatic database cleanup process
 * **dbclean-expire-days** (Integer) - Days after which remote items will be deleted. Own items, and marked or filed items are kept.
 * **default_service_class** -
-* **delivery_batch_count** - Number of deliveries per process. Default value is 1. (Disabled when using the worker)
 * **diaspora_test** (Boolean) - For development only. Disables the message transfer.
 * **directory** - The path to global directory. If not set then "http://dir.friendica.social" is used.
 * **disable_email_validation** (Boolean) - Disables the check if a mail address is in a valid format and can be resolved via DNS.
@@ -68,6 +67,8 @@ Example: To set the directory value please add this line to your .htconfig.php:
 * **ostatus_poll_timeframe** - Defines how old an item can be to try to complete the conversation with it.
 * **paranoia** (Boolean) - Log out users if their IP address changed.
 * **permit_crawling** (Boolean) - Restricts the search for not logged in users to one search per minute.
+* **worker_debug** (Boolean) - If enabled, it prints out the number of running processes split by priority.
+* **worker_fetch_limit** - Number of worker tasks that are fetched in a single query. Default is 5.
 * **profiler** (Boolean) - Enable internal timings to help optimize code. Needed for "rendertime" addon. Default is false.
 * **free_crawls** - Number of "free" searches when "permit_crawling" is activated (Default value is 10)
 * **crawl_permit_period** - Period in seconds between allowed searches when the number of free searches is reached and "permit_crawling" is activated (Default value is 60)
index 76f3e410196185514b9c24a12ef8ff3c6d8e8d62..9c2e6aeaa30d5f68f5b3f972befdb0e4845896c3 100644 (file)
@@ -82,6 +82,9 @@ function cron_run(&$argv, &$argc){
                proc_run(PRIORITY_MEDIUM, 'include/dbclean.php');
 
                proc_run(PRIORITY_LOW, "include/cronjobs.php", "update_photo_albums");
+
+               // Delete all done workerqueue entries
+               dba::delete('workerqueue', array('done' => true));
        }
 
        // Poll contacts
index e4846899dc1da62c3ca32ef892def0ae2d9471cb..1c63fa50540168a958235ec9c327e4d6438c28ee 100644 (file)
@@ -21,6 +21,8 @@ class dba {
        private $driver;
        public  $connected = false;
        public  $error = false;
+       public  $errorno = 0;
+       public  $affected_rows = 0;
        private $_server_info = '';
        private static $in_transaction = false;
        private static $dbo;
@@ -551,6 +553,7 @@ class dba {
 
                self::$dbo->error = '';
                self::$dbo->errorno = 0;
+               self::$dbo->affected_rows = 0;
 
                switch (self::$dbo->driver) {
                        case 'pdo':
@@ -573,6 +576,7 @@ class dba {
                                        $retval = false;
                                } else {
                                        $retval = $stmt;
+                                       self::$dbo->affected_rows = $retval->rowCount();
                                }
                                break;
                        case 'mysqli':
@@ -612,6 +616,7 @@ class dba {
                                } else {
                                        $stmt->store_result();
                                        $retval = $stmt;
+                                       self::$dbo->affected_rows = $retval->affected_rows;
                                }
                                break;
                        case 'mysql':
@@ -620,13 +625,28 @@ class dba {
                                if (mysql_errno(self::$dbo->db)) {
                                        self::$dbo->error = mysql_error(self::$dbo->db);
                                        self::$dbo->errorno = mysql_errno(self::$dbo->db);
+                               } else {
+                                       self::$dbo->affected_rows = mysql_affected_rows($retval);
                                }
                                break;
                }
 
                if (self::$dbo->errorno != 0) {
-                       logger('DB Error '.self::$dbo->errorno.': '.self::$dbo->error."\n".
-                               $a->callstack(8))."\n".self::replace_parameters($sql, $args);
+                       $trace = debug_backtrace(DEBUG_BACKTRACE_IGNORE_ARGS, 1);
+                       $called_from = array_shift($trace);
+
+                       // We are having an own error logging in the function "p"
+                       if ($called_from['function'] != 'p') {
+                               // We have to preserve the error code, somewhere in the logging it get lost
+                               $error = self::$dbo->error;
+                               $errorno = self::$dbo->errorno;
+
+                               logger('DB Error '.self::$dbo->errorno.': '.self::$dbo->error."\n".
+                                       $a->callstack(8))."\n".self::replace_parameters($sql, $args);
+
+                               self::$dbo->error = $error;
+                               self::$dbo->errorno = $errorno;
+                       }
                }
 
                $a->save_timestamp($stamp1, 'database');
@@ -662,17 +682,35 @@ class dba {
 
                $args = func_get_args();
 
-               $stmt = call_user_func_array('self::p', $args);
+               // In a case of a deadlock we are repeating the query 20 times
+               $timeout = 20;
 
-               if (is_bool($stmt)) {
-                       $retval = $stmt;
-               } elseif (is_object($stmt)) {
-                       $retval = true;
-               } else {
-                       $retval = false;
-               }
+               do {
+                       $stmt = call_user_func_array('self::p', $args);
 
-               self::close($stmt);
+                       if (is_bool($stmt)) {
+                               $retval = $stmt;
+                       } elseif (is_object($stmt)) {
+                               $retval = true;
+                       } else {
+                               $retval = false;
+                       }
+
+                       self::close($stmt);
+
+               } while ((self::$dbo->errorno == 1213) && (--$timeout > 0));
+
+               if (self::$dbo->errorno != 0) {
+                       // We have to preserve the error code, somewhere in the logging it get lost
+                       $error = self::$dbo->error;
+                       $errorno = self::$dbo->errorno;
+
+                       logger('DB Error '.self::$dbo->errorno.': '.self::$dbo->error."\n".
+                               $a->callstack(8))."\n".self::replace_parameters($sql, $args);
+
+                       self::$dbo->error = $error;
+                       self::$dbo->errorno = $errorno;
+               }
 
                $a->save_timestamp($stamp, "database_write");
 
@@ -723,6 +761,15 @@ class dba {
                return $retval;
        }
 
+       /**
+        * @brief Returns the number of affected rows of the last statement
+        *
+        * @return int Number of rows
+        */
+       static public function affected_rows() {
+               return self::$dbo->affected_rows;
+       }
+
        /**
         * @brief Returns the number of rows of a statement
         *
index 882d736e1c17114475d382264bf1bafdfa88f432..268bbbb66944060cb1f9b7d16b9ca464ad3f6716 100644 (file)
@@ -1740,6 +1740,7 @@ function db_definition() {
                                        "created" => array("type" => "datetime", "not null" => "1", "default" => NULL_DATE),
                                        "pid" => array("type" => "int(11)", "not null" => "1", "default" => "0"),
                                        "executed" => array("type" => "datetime", "not null" => "1", "default" => NULL_DATE),
+                                       "done" => array("type" => "tinyint(1)", "not null" => "1", "default" => "0"),
                                        ),
                        "indexes" => array(
                                        "PRIMARY" => array("id"),
index 1ef2af3576d2f78c39fd47b070b484b415adc9b4..762018b170c4f1f9ec35ff498b143f319427b051 100644 (file)
@@ -4,8 +4,8 @@
  * @brief The implementation of the diaspora protocol
  *
  * The new protocol is described here: http://diaspora.github.io/diaspora_federation/index.html
- * Currently this implementation here interprets the old and the new protocol and sends the old one.
- * This will change in the future.
+ * This implementation here interprets the old and the new protocol and sends the new one.
+ * In the future we will remove most stuff from "valid_posting" and interpret only the new protocol.
  */
 
 use Friendica\App;
@@ -2507,7 +2507,7 @@ class Diaspora {
                switch ($target_type) {
                        case "Comment":
                        case "Like":
-                       case "Post": // "Post" will be supported in a future version
+                       case "Post":
                        case "Reshare":
                        case "StatusMessage":
                                return self::item_retraction($importer, $contact, $data);
@@ -2667,42 +2667,9 @@ class Diaspora {
                return $nick."@".substr(App::get_baseurl(), strpos(App::get_baseurl(),"://") + 3);
        }
 
-       /**
-        * @brief Creates the envelope for the "fetch" endpoint
-        *
-        * @param string $msg The message that is to be transmitted
-        * @param array $user The record of the sender
-        *
-        * @return string The envelope
-        */
-
-       public static function build_magic_envelope($msg, $user) {
-
-               $b64url_data = base64url_encode($msg);
-               $data = str_replace(array("\n", "\r", " ", "\t"), array("", "", "", ""), $b64url_data);
-
-               $key_id = base64url_encode(self::my_handle($user));
-               $type = "application/xml";
-               $encoding = "base64url";
-               $alg = "RSA-SHA256";
-               $signable_data = $data.".".base64url_encode($type).".".base64url_encode($encoding).".".base64url_encode($alg);
-               $signature = rsa_sign($signable_data, $user["prvkey"]);
-               $sig = base64url_encode($signature);
-
-               $xmldata = array("me:env" => array("me:data" => $data,
-                                                       "@attributes" => array("type" => $type),
-                                                       "me:encoding" => $encoding,
-                                                       "me:alg" => $alg,
-                                                       "me:sig" => $sig,
-                                                       "@attributes2" => array("key_id" => $key_id)));
-
-               $namespaces = array("me" => "http://salmon-protocol.org/ns/magic-env");
-
-               return xml::from_array($xmldata, $xml, false, $namespaces);
-       }
 
        /**
-        * @brief Creates the envelope for a public message
+        * @brief Creates the data for a private message in the new format
         *
         * @param string $msg The message that is to be transmitted
         * @param array $user The record of the sender
@@ -2710,129 +2677,72 @@ class Diaspora {
         * @param string $prvkey The private key of the sender
         * @param string $pubkey The public key of the receiver
         *
-        * @return string The envelope
+        * @return string The encrypted data
         */
-       private static function build_public_message($msg, $user, $contact, $prvkey, $pubkey) {
+       public static function encode_private_data($msg, $user, $contact, $prvkey, $pubkey) {
 
                logger("Message: ".$msg, LOGGER_DATA);
 
-               $handle = self::my_handle($user);
-
-               $b64url_data = base64url_encode($msg);
-
-               $data = str_replace(array("\n", "\r", " ", "\t"), array("", "", "", ""), $b64url_data);
-
-               $type = "application/xml";
-               $encoding = "base64url";
-               $alg = "RSA-SHA256";
+               // without a public key nothing will work
+               if (!$pubkey) {
+                       logger("pubkey missing: contact id: ".$contact["id"]);
+                       return false;
+               }
 
-               $signable_data = $data.".".base64url_encode($type).".".base64url_encode($encoding).".".base64url_encode($alg);
+               $aes_key = openssl_random_pseudo_bytes(32);
+               $b_aes_key = base64_encode($aes_key);
+               $iv = openssl_random_pseudo_bytes(16);
+               $b_iv = base64_encode($iv);
 
-               $signature = rsa_sign($signable_data,$prvkey);
-               $sig = base64url_encode($signature);
+               $ciphertext = self::aes_encrypt($aes_key, $iv, $msg);
 
-               $xmldata = array("diaspora" => array("header" => array("author_id" => $handle),
-                                                       "me:env" => array("me:encoding" => $encoding,
-                                                       "me:alg" => $alg,
-                                                       "me:data" => $data,
-                                                       "@attributes" => array("type" => $type),
-                                                       "me:sig" => $sig)));
+               $json = json_encode(array("iv" => $b_iv, "key" => $b_aes_key));
 
-               $namespaces = array("" => "https://joindiaspora.com/protocol",
-                               "me" => "http://salmon-protocol.org/ns/magic-env");
+               $encrypted_key_bundle = "";
+               openssl_public_encrypt($json, $encrypted_key_bundle, $pubkey);
 
-               $magic_env = xml::from_array($xmldata, $xml, false, $namespaces);
+               $json_object = json_encode(array("aes_key" => base64_encode($encrypted_key_bundle),
+                                               "encrypted_magic_envelope" => base64_encode($ciphertext)));
 
-               logger("magic_env: ".$magic_env, LOGGER_DATA);
-               return $magic_env;
+               return $json_object;
        }
 
        /**
-        * @brief Creates the envelope for a private message
+        * @brief Creates the envelope for the "fetch" endpoint and for the new format
         *
         * @param string $msg The message that is to be transmitted
         * @param array $user The record of the sender
-        * @param array $contact Target of the communication
-        * @param string $prvkey The private key of the sender
-        * @param string $pubkey The public key of the receiver
         *
         * @return string The envelope
         */
-       private static function build_private_message($msg, $user, $contact, $prvkey, $pubkey) {
-
-               logger("Message: ".$msg, LOGGER_DATA);
-
-               // without a public key nothing will work
-
-               if (!$pubkey) {
-                       logger("pubkey missing: contact id: ".$contact["id"]);
-                       return false;
-               }
-
-               $inner_aes_key = openssl_random_pseudo_bytes(32);
-               $b_inner_aes_key = base64_encode($inner_aes_key);
-               $inner_iv = openssl_random_pseudo_bytes(16);
-               $b_inner_iv = base64_encode($inner_iv);
-
-               $outer_aes_key = openssl_random_pseudo_bytes(32);
-               $b_outer_aes_key = base64_encode($outer_aes_key);
-               $outer_iv = openssl_random_pseudo_bytes(16);
-               $b_outer_iv = base64_encode($outer_iv);
-
-               $handle = self::my_handle($user);
-
-               $inner_encrypted = self::aes_encrypt($inner_aes_key, $inner_iv, $msg);
-
-               $b64_data = base64_encode($inner_encrypted);
-
+       public static function build_magic_envelope($msg, $user) {
 
-               $b64url_data = base64url_encode($b64_data);
+               $b64url_data = base64url_encode($msg);
                $data = str_replace(array("\n", "\r", " ", "\t"), array("", "", "", ""), $b64url_data);
 
+               $key_id = base64url_encode(self::my_handle($user));
                $type = "application/xml";
                $encoding = "base64url";
                $alg = "RSA-SHA256";
-
                $signable_data = $data.".".base64url_encode($type).".".base64url_encode($encoding).".".base64url_encode($alg);
 
-               $signature = rsa_sign($signable_data,$prvkey);
-               $sig = base64url_encode($signature);
-
-               $xmldata = array("decrypted_header" => array("iv" => $b_inner_iv,
-                                                       "aes_key" => $b_inner_aes_key,
-                                                       "author_id" => $handle));
-
-               $decrypted_header = xml::from_array($xmldata, $xml, true);
-
-               $ciphertext = self::aes_encrypt($outer_aes_key, $outer_iv, $decrypted_header);
-
-               $outer_json = json_encode(array("iv" => $b_outer_iv, "key" => $b_outer_aes_key));
-
-               $encrypted_outer_key_bundle = "";
-               openssl_public_encrypt($outer_json, $encrypted_outer_key_bundle, $pubkey);
-
-               $b64_encrypted_outer_key_bundle = base64_encode($encrypted_outer_key_bundle);
-
-               logger("outer_bundle: ".$b64_encrypted_outer_key_bundle." key: ".$pubkey, LOGGER_DATA);
-
-               $encrypted_header_json_object = json_encode(array("aes_key" => base64_encode($encrypted_outer_key_bundle),
-                                                               "ciphertext" => base64_encode($ciphertext)));
-               $cipher_json = base64_encode($encrypted_header_json_object);
+               // Fallback if the private key wasn't transmitted in the expected field
+               if ($user['uprvkey'] == "")
+                       $user['uprvkey'] = $user['prvkey'];
 
-               $xmldata = array("diaspora" => array("encrypted_header" => $cipher_json,
-                                               "me:env" => array("me:encoding" => $encoding,
-                                                               "me:alg" => $alg,
-                                                               "me:data" => $data,
-                                                               "@attributes" => array("type" => $type),
-                                                               "me:sig" => $sig)));
+               $signature = rsa_sign($signable_data, $user["uprvkey"]);
+               $sig = base64url_encode($signature);
 
-               $namespaces = array("" => "https://joindiaspora.com/protocol",
-                               "me" => "http://salmon-protocol.org/ns/magic-env");
+               $xmldata = array("me:env" => array("me:data" => $data,
+                                                       "@attributes" => array("type" => $type),
+                                                       "me:encoding" => $encoding,
+                                                       "me:alg" => $alg,
+                                                       "me:sig" => $sig,
+                                                       "@attributes2" => array("key_id" => $key_id)));
 
-               $magic_env = xml::from_array($xmldata, $xml, false, $namespaces);
+               $namespaces = array("me" => "http://salmon-protocol.org/ns/magic-env");
 
-               logger("magic_env: ".$magic_env, LOGGER_DATA);
-               return $magic_env;
+               return xml::from_array($xmldata, $xml, false, $namespaces);
        }
 
        /**
@@ -2849,14 +2759,15 @@ class Diaspora {
         */
        private static function build_message($msg, $user, $contact, $prvkey, $pubkey, $public = false) {
 
-               if ($public)
-                       $magic_env =  self::build_public_message($msg,$user,$contact,$prvkey,$pubkey);
-               else
-                       $magic_env =  self::build_private_message($msg,$user,$contact,$prvkey,$pubkey);
+               // The message is put into an envelope with the sender's signature
+               $envelope = self::build_magic_envelope($msg, $user);
+
+               // Private messages are put into a second envelope, encrypted with the receivers public key
+               if (!$public) {
+                       $envelope = self::encode_private_data($envelope, $user, $contact, $prvkey, $pubkey);
+               }
 
-               // The data that will be transmitted is double encoded via "urlencode", strange ...
-               $slap = "xml=".urlencode(urlencode($magic_env));
-               return $slap;
+               return $envelope;
        }
 
        /**
@@ -2882,14 +2793,14 @@ class Diaspora {
         *
         * @param array $owner the array of the item owner
         * @param array $contact Target of the communication
-        * @param string $slap The message that is to be transmitted
+        * @param string $envelope The message that is to be transmitted
         * @param bool $public_batch Is it a public post?
         * @param bool $queue_run Is the transmission called from the queue?
         * @param string $guid message guid
         *
         * @return int Result of the transmission
         */
-       public static function transmit($owner, $contact, $slap, $public_batch, $queue_run=false, $guid = "") {
+       public static function transmit($owner, $contact, $envelope, $public_batch, $queue_run=false, $guid = "") {
 
                $a = get_app();
 
@@ -2910,7 +2821,9 @@ class Diaspora {
                        $return_code = 0;
                } else {
                        if (!intval(get_config("system", "diaspora_test"))) {
-                               post_url($dest_url."/", $slap);
+                               $content_type = (($public_batch) ? "application/magic-envelope+xml" : "application/json");
+
+                               post_url($dest_url."/", $envelope, array("Content-Type: ".$content_type));
                                $return_code = $a->get_curl_code();
                        } else {
                                logger("test_mode");
@@ -2926,14 +2839,14 @@ class Diaspora {
                        $r = q("SELECT `id` FROM `queue` WHERE `cid` = %d AND `network` = '%s' AND `content` = '%s' AND `batch` = %d LIMIT 1",
                                intval($contact["id"]),
                                dbesc(NETWORK_DIASPORA),
-                               dbesc($slap),
+                               dbesc($envelope),
                                intval($public_batch)
                        );
                        if ($r) {
                                logger("add_to_queue ignored - identical item already in queue");
                        } else {
                                // queue message for redelivery
-                               add_to_queue($contact["id"], NETWORK_DIASPORA, $slap, $public_batch);
+                               add_to_queue($contact["id"], NETWORK_DIASPORA, $envelope, $public_batch);
 
                                // The message could not be delivered. We mark the contact as "dead"
                                mark_for_death($contact);
@@ -2957,7 +2870,8 @@ class Diaspora {
         */
        public static function build_post_xml($type, $message) {
 
-               $data = array("XML" => array("post" => array($type => $message)));
+               $data = array($type => $message);
+
                return xml::from_array($data, $xml);
        }
 
@@ -2985,13 +2899,13 @@ class Diaspora {
                if ($owner['uprvkey'] == "")
                        $owner['uprvkey'] = $owner['prvkey'];
 
-               $slap = self::build_message($msg, $owner, $contact, $owner['uprvkey'], $contact['pubkey'], $public_batch);
+               $envelope = self::build_message($msg, $owner, $contact, $owner['uprvkey'], $contact['pubkey'], $public_batch);
 
                if ($spool) {
-                       add_to_queue($contact['id'], NETWORK_DIASPORA, $slap, $public_batch);
+                       add_to_queue($contact['id'], NETWORK_DIASPORA, $envelope, $public_batch);
                        return true;
                } else
-                       $return_code = self::transmit($owner, $contact, $slap, $public_batch, false, $guid);
+                       $return_code = self::transmit($owner, $contact, $envelope, $public_batch, false, $guid);
 
                logger("guid: ".$item["guid"]." result ".$return_code, LOGGER_DEBUG);
 
@@ -3006,14 +2920,37 @@ class Diaspora {
         *
         * @return int The result of the transmission
         */
-       public static function send_share($owner,$contact) {
+       public static function send_share($owner, $contact) {
 
-               $message = array("sender_handle" => self::my_handle($owner),
-                               "recipient_handle" => $contact["addr"]);
+               /**
+                * @todo support the different possible combinations of "following" and "sharing"
+                * Currently, Diaspora only interprets the "sharing" field
+                *
+                * Before switching this code productive, we have to check all "send_share" calls if "rel" is set correctly
+                */
+
+               /*
+               switch ($contact["rel"]) {
+                       case CONTACT_IS_FRIEND:
+                               $following = true;
+                               $sharing = true;
+                       case CONTACT_IS_SHARING:
+                               $following = false;
+                               $sharing = true;
+                       case CONTACT_IS_FOLLOWER:
+                               $following = true;
+                               $sharing = false;
+               }
+               */
+
+               $message = array("author" => self::my_handle($owner),
+                               "recipient" => $contact["addr"],
+                               "following" => "true",
+                               "sharing" => "true");
 
                logger("Send share ".print_r($message, true), LOGGER_DEBUG);
 
-               return self::build_and_transmit($owner, $contact, "request", $message);
+               return self::build_and_transmit($owner, $contact, "contact", $message);
        }
 
        /**
@@ -3024,15 +2961,16 @@ class Diaspora {
         *
         * @return int The result of the transmission
         */
-       public static function send_unshare($owner,$contact) {
+       public static function send_unshare($owner, $contact) {
 
-               $message = array("post_guid" => $owner["guid"],
-                               "diaspora_handle" => self::my_handle($owner),
-                               "type" => "Person");
+               $message = array("author" => self::my_handle($owner),
+                               "recipient" => $contact["addr"],
+                               "following" => "false",
+                               "sharing" => "false");
 
                logger("Send unshare ".print_r($message, true), LOGGER_DEBUG);
 
-               return self::build_and_transmit($owner, $contact, "retraction", $message);
+               return self::build_and_transmit($owner, $contact, "contact", $message);
        }
 
        /**
@@ -3216,13 +3154,13 @@ class Diaspora {
 
                // Detect a share element and do a reshare
                if (!$item['private'] && ($ret = self::is_reshare($item["body"]))) {
-                       $message = array("root_diaspora_id" => $ret["root_handle"],
-                                       "root_guid" => $ret["root_guid"],
+                       $message = array("author" => $myaddr,
                                        "guid" => $item["guid"],
-                                       "diaspora_handle" => $myaddr,
-                                       "public" => $public,
                                        "created_at" => $created,
-                                       "provider_display_name" => $item["app"]);
+                                       "root_author" => $ret["root_handle"],
+                                       "root_guid" => $ret["root_guid"],
+                                       "provider_display_name" => $item["app"],
+                                       "public" => $public);
 
                        $type = "reshare";
                } else {
@@ -3256,13 +3194,13 @@ class Diaspora {
                                $location["lng"] = $coord[1];
                        }
 
-                       $message = array("raw_message" => $body,
-                                       "location" => $location,
+                       $message = array("author" => $myaddr,
                                        "guid" => $item["guid"],
-                                       "diaspora_handle" => $myaddr,
-                                       "public" => $public,
                                        "created_at" => $created,
-                                       "provider_display_name" => $item["app"]);
+                                       "public" => $public,
+                                       "text" => $body,
+                                       "provider_display_name" => $item["app"],
+                                       "location" => $location);
 
                        // Diaspora rejects messages when they contain a location without "lat" or "lng"
                        if (!isset($location["lat"]) || !isset($location["lng"])) {
@@ -3275,7 +3213,7 @@ class Diaspora {
                                        $message['event'] = $event;
 
                                        /// @todo Once Diaspora supports it, we will remove the body
-                                       // $message['raw_message'] = '';
+                                       // $message['text'] = '';
                                }
                        }
 
@@ -3330,12 +3268,12 @@ class Diaspora {
                        $positive = "false";
                }
 
-               return(array("positive" => $positive,
+               return(array("author" => self::my_handle($owner),
                                "guid" => $item["guid"],
-                               "target_type" => $target_type,
                                "parent_guid" => $parent["guid"],
-                               "author_signature" => "",
-                               "diaspora_handle" => self::my_handle($owner)));
+                               "parent_type" => $target_type,
+                               "positive" => $positive,
+                               "author_signature" => ""));
        }
 
        /**
@@ -3407,12 +3345,12 @@ class Diaspora {
                $text = html_entity_decode(bb2diaspora($item["body"]));
                $created = datetime_convert("UTC", "UTC", $item["created"], 'Y-m-d\TH:i:s\Z');
 
-               $comment = array("guid" => $item["guid"],
+               $comment = array("author" => self::my_handle($owner),
+                               "guid" => $item["guid"],
+                               "created_at" => $created,
                                "parent_guid" => $parent["guid"],
-                               "author_signature" => "",
                                "text" => $text,
-                               /// @todo Currently disabled until Diaspora supports it: "created_at" => $created,
-                               "diaspora_handle" => self::my_handle($owner));
+                               "author_signature" => "");
 
                // Send the thread parent guid only if it is a threaded comment
                if ($item['thr-parent'] != $item['parent-uri']) {
@@ -3469,19 +3407,17 @@ class Diaspora {
                $signed_parts = explode(";", $signature['signed_text']);
 
                if ($item["deleted"])
-                       $message = array("parent_author_signature" => "",
+                       $message = array("author" => $signature['signer'],
                                        "target_guid" => $signed_parts[0],
-                                       "target_type" => $signed_parts[1],
-                                       "sender_handle" => $signature['signer'],
-                                       "target_author_signature" => $signature['signature']);
+                                       "target_type" => $signed_parts[1]);
                elseif ($item['verb'] === ACTIVITY_LIKE)
-                       $message = array("positive" => $signed_parts[0],
+                       $message = array("author" => $signed_parts[4],
                                        "guid" => $signed_parts[1],
-                                       "target_type" => $signed_parts[2],
                                        "parent_guid" => $signed_parts[3],
-                                       "parent_author_signature" => "",
+                                       "parent_type" => $signed_parts[2],
+                                       "positive" => $signed_parts[0],
                                        "author_signature" => $signature['signature'],
-                                       "diaspora_handle" => $signed_parts[4]);
+                                       "parent_author_signature" => "");
                else {
                        // Remove the comment guid
                        $guid = array_shift($signed_parts);
@@ -3495,12 +3431,12 @@ class Diaspora {
                        // Glue the parts together
                        $text = implode(";", $signed_parts);
 
-                       $message = array("guid" => $guid,
+                       $message = array("author" => $handle,
+                                       "guid" => $guid,
                                        "parent_guid" => $parent_guid,
-                                       "parent_author_signature" => "",
-                                       "author_signature" => $signature['signature'],
                                        "text" => implode(";", $signed_parts),
-                                       "diaspora_handle" => $handle);
+                                       "author_signature" => $signature['signature'],
+                                       "parent_author_signature" => "");
                }
                return $message;
        }
@@ -3549,10 +3485,12 @@ class Diaspora {
                        if (is_array($msg)) {
                                foreach ($msg AS $field => $data) {
                                        if (!$item["deleted"]) {
-                                               if ($field == "author")
-                                                       $field = "diaspora_handle";
-                                               if ($field == "parent_type")
-                                                       $field = "target_type";
+                                               if ($field == "diaspora_handle") {
+                                                       $field = "author";
+                                               }
+                                               if ($field == "target_type") {
+                                                       $field = "parent_type";
+                                               }
                                        }
 
                                        $message[$field] = $data;
@@ -3583,26 +3521,12 @@ class Diaspora {
 
                $itemaddr = self::handle_from_contact($item["contact-id"], $item["gcontact-id"]);
 
-               // Check whether the retraction is for a top-level post or whether it's a relayable
-               if ($item["uri"] !== $item["parent-uri"]) {
-                       $msg_type = "relayable_retraction";
-                       $target_type = (($item["verb"] === ACTIVITY_LIKE) ? "Like" : "Comment");
-               } else {
-                       $msg_type = "signed_retraction";
-                       $target_type = "StatusMessage";
-               }
+               $msg_type = "retraction";
+               $target_type = "Post";
 
-               if ($relay && ($item["uri"] !== $item["parent-uri"]))
-                       $signature = "parent_author_signature";
-               else
-                       $signature = "target_author_signature";
-
-               $signed_text = $item["guid"].";".$target_type;
-
-               $message = array("target_guid" => $item['guid'],
-                               "target_type" => $target_type,
-                               "sender_handle" => $itemaddr,
-                               $signature => base64_encode(rsa_sign($signed_text,$owner['uprvkey'],'sha256')));
+               $message = array("author" => $itemaddr,
+                               "target_guid" => $item['guid'],
+                               "target_type" => $target_type);
 
                logger("Got message ".print_r($message, true), LOGGER_DEBUG);
 
@@ -3634,40 +3558,35 @@ class Diaspora {
                $cnv = $r[0];
 
                $conv = array(
+                       "author" => $cnv["creator"],
                        "guid" => $cnv["guid"],
                        "subject" => $cnv["subject"],
                        "created_at" => datetime_convert("UTC", "UTC", $cnv['created'], 'Y-m-d\TH:i:s\Z'),
-                       "diaspora_handle" => $cnv["creator"],
-                       "participant_handles" => $cnv["recips"]
+                       "participants" => $cnv["recips"]
                );
 
                $body = bb2diaspora($item["body"]);
                $created = datetime_convert("UTC", "UTC", $item["created"], 'Y-m-d\TH:i:s\Z');
 
-               $signed_text = $item["guid"].";".$cnv["guid"].";".$body.";".$created.";".$myaddr.";".$cnv['guid'];
-               $sig = base64_encode(rsa_sign($signed_text, $owner["uprvkey"], "sha256"));
-
                $msg = array(
+                       "author" => $myaddr,
                        "guid" => $item["guid"],
-                       "parent_guid" => $cnv["guid"],
-                       "parent_author_signature" => $sig,
-                       "author_signature" => $sig,
+                       "conversation_guid" => $cnv["guid"],
                        "text" => $body,
                        "created_at" => $created,
-                       "diaspora_handle" => $myaddr,
-                       "conversation_guid" => $cnv["guid"]
                );
 
                if ($item["reply"]) {
                        $message = $msg;
                        $type = "message";
                } else {
-                       $message = array("guid" => $cnv["guid"],
+                       $message = array(
+                                       "author" => $cnv["creator"],
+                                       "guid" => $cnv["guid"],
                                        "subject" => $cnv["subject"],
                                        "created_at" => datetime_convert("UTC", "UTC", $cnv['created'], 'Y-m-d\TH:i:s\Z'),
-                                       "message" => $msg,
-                                       "diaspora_handle" => $cnv["creator"],
-                                       "participant_handles" => $cnv["recips"]);
+                                       "participants" => $cnv["recips"],
+                                       "message" => $msg);
 
                        $type = "conversation";
                }
@@ -3742,7 +3661,7 @@ class Diaspora {
                        $tags = trim($tags);
                }
 
-               $message = array("diaspora_handle" => $handle,
+               $message = array("author" => $handle,
                                "first_name" => $first,
                                "last_name" => $last,
                                "image_url" => $large,
@@ -3753,6 +3672,7 @@ class Diaspora {
                                "bio" => $about,
                                "location" => $location,
                                "searchable" => $searchable,
+                               "nsfw" => "false",
                                "tag_string" => $tags);
 
                foreach ($recips as $recip) {
index eb307639098d6148419e70a1ca9e23d89e6af694..8649a1bd1ae74b2fb37054952d2b444475731503 100644 (file)
@@ -1139,7 +1139,7 @@ function item_store($arr, $force_parent = false, $notify = false, $dontcache = f
        check_item_notification($current_post, $uid);
 
        if ($notify) {
-               proc_run(PRIORITY_HIGH, "include/notifier.php", $notify_type, $current_post);
+               proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), "include/notifier.php", $notify_type, $current_post);
        }
 
        return $current_post;
@@ -1430,7 +1430,7 @@ function tag_deliver($uid, $item_id) {
        );
        update_thread($item_id);
 
-       proc_run(PRIORITY_HIGH,'include/notifier.php', 'tgroup', $item_id);
+       proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/notifier.php', 'tgroup', $item_id);
 
 }
 
@@ -2076,7 +2076,7 @@ function item_expire($uid, $days, $network = "", $force = false) {
                drop_item($item['id'], false);
        }
 
-       proc_run(PRIORITY_LOW, "include/notifier.php", "expire", $uid);
+       proc_run(array('priority' => PRIORITY_LOW, 'dont_fork' => true), "include/notifier.php", "expire", $uid);
 
 }
 
@@ -2099,7 +2099,7 @@ function drop_items($items) {
        // multiple threads may have been deleted, send an expire notification
 
        if ($uid) {
-               proc_run(PRIORITY_LOW, "include/notifier.php", "expire", $uid);
+               proc_run(array('priority' => PRIORITY_LOW, 'dont_fork' => true), "include/notifier.php", "expire", $uid);
        }
 }
 
@@ -2295,7 +2295,7 @@ function drop_item($id, $interactive = true) {
                $drop_id = intval($item['id']);
                $priority = ($interactive ? PRIORITY_HIGH : PRIORITY_LOW);
 
-               proc_run($priority, "include/notifier.php", "drop", $drop_id);
+               proc_run(array('priority' => $priority, 'dont_fork' => true), "include/notifier.php", "drop", $drop_id);
 
                if (! $interactive) {
                        return $owner;
index 7f6318182f63f673fd6606e22a8bfe5fd6aaa776..c110984dde0f9021f4d40e48809eec77446fd565 100644 (file)
@@ -55,17 +55,6 @@ function notifier_run(&$argv, &$argc){
                return;
        }
 
-       // Inherit the priority
-       $queue = dba::select('workerqueue', array('priority'), array('pid' => getmypid()), array('limit' => 1));
-       if (dbm::is_result($queue)) {
-               $priority = (int)$queue['priority'];
-               logger('inherited priority: '.$priority);
-       } else {
-               // Normally this shouldn't happen.
-               $priority = PRIORITY_HIGH;
-               logger('no inherited priority! Something is wrong.');
-       }
-
        logger('notifier: invoked: ' . print_r($argv,true), LOGGER_DEBUG);
 
        $cmd = $argv[1];
@@ -359,7 +348,7 @@ function notifier_run(&$argv, &$argc){
                        // a delivery fork. private groups (forum_mode == 2) do not uplink
 
                        if ((intval($parent['forum_mode']) == 1) && (! $top_level) && ($cmd !== 'uplink')) {
-                               proc_run($priority, 'include/notifier.php', 'uplink', $item_id);
+                               proc_run($a->queue['priority'], 'include/notifier.php', 'uplink', $item_id);
                        }
 
                        $conversants = array();
@@ -498,7 +487,8 @@ function notifier_run(&$argv, &$argc){
                        }
                        logger("Deliver ".$target_item["guid"]." to ".$contact['url']." via network ".$contact['network'], LOGGER_DEBUG);
 
-                       proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $contact['id']);
+                       proc_run(array('priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true),
+                                       'include/delivery.php', $cmd, $item_id, (int)$contact['id']);
                }
        }
 
@@ -563,7 +553,8 @@ function notifier_run(&$argv, &$argc){
 
                                if ((! $mail) && (! $fsuggest) && (! $followup)) {
                                        logger('notifier: delivery agent: '.$rr['name'].' '.$rr['id'].' '.$rr['network'].' '.$target_item["guid"]);
-                                       proc_run(array('priority' => $priority, 'dont_fork' => true), 'include/delivery.php', $cmd, $item_id, $rr['id']);
+                                       proc_run(array('priority' => $a->queue['priority'], 'created' => $a->queue['created'], 'dont_fork' => true),
+                                                       'include/delivery.php', $cmd, $item_id, (int)$rr['id']);
                                }
                        }
                }
@@ -603,7 +594,8 @@ function notifier_run(&$argv, &$argc){
                }
 
                // Handling the pubsubhubbub requests
-               proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php');
+               proc_run(array('priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true),
+                               'include/pubsubpublish.php');
        }
 
        logger('notifier: calling hooks', LOGGER_DEBUG);
index cc8edce656969ba4e8743adac09d479724d2edee..29a31a96f8b15b6f1139565010790549dd3a4965 100644 (file)
@@ -18,7 +18,9 @@ if (!file_exists("boot.php") && (sizeof($_SERVER["argv"]) != 0)) {
 require_once("boot.php");
 
 function poller_run($argv, $argc){
-       global $a, $db;
+       global $a, $db, $poller_up_start;
+
+       $poller_up_start = microtime(true);
 
        $a = new App(dirname(__DIR__));
 
@@ -47,11 +49,15 @@ function poller_run($argv, $argc){
        // We now start the process. This is done after the load check since this could increase the load.
        $a->start_process();
 
-       // At first we check the number of workers and quit if there are too much of them
-       // This is done at the top to avoid that too much code is executed without a need to do so,
-       // since the poller mostly quits here.
-       if (poller_too_much_workers()) {
+       // Kill stale processes every 5 minutes
+       $last_cleanup = Config::get('system', 'poller_last_cleaned', 0);
+       if (time() > ($last_cleanup + 300)) {
+               Config::set('system', 'poller_last_cleaned', time());
                poller_kill_stale_workers();
+       }
+
+       // Count active workers and compare them with a maximum value that depends on the load
+       if (poller_too_much_workers()) {
                logger('Pre check: Active worker limit reached, quitting.', LOGGER_DEBUG);
                return;
        }
@@ -83,10 +89,15 @@ function poller_run($argv, $argc){
 
        // We fetch the next queue entry that is about to be executed
        while ($r = poller_worker_process()) {
+               foreach ($r AS $entry) {
+                       // Assure that the priority is an integer value
+                       $entry['priority'] = (int)$entry['priority'];
 
-               // If we got that queue entry we claim it for us
-               if (!poller_claim_process($r[0])) {
-                       continue;
+                       // The work will be done
+                       if (!poller_execute($entry)) {
+                               logger('Process execution failed, quitting.', LOGGER_DEBUG);
+                               return;
+                       }
                }
 
                // To avoid the quitting of multiple pollers only one poller at a time will execute the check
@@ -105,14 +116,8 @@ function poller_run($argv, $argc){
                        Lock::remove('poller_worker');
                }
 
-               // finally the work will be done
-               if (!poller_execute($r[0])) {
-                       logger('Process execution failed, quitting.', LOGGER_DEBUG);
-                       return;
-               }
-
-               // Quit the poller once every hour
-               if (time() > ($starttime + 3600)) {
+               // Quit the poller once every 5 minutes
+               if (time() > ($starttime + 300)) {
                        logger('Process lifetime reached, quitting.', LOGGER_DEBUG);
                        return;
                }
@@ -120,6 +125,47 @@ function poller_run($argv, $argc){
        logger("Couldn't select a workerqueue entry, quitting.", LOGGER_DEBUG);
 }
 
+/**
+ * @brief Returns the number of non executed entries in the worker queue
+ *
+ * @return integer Number of non executed entries in the worker queue
+ */
+function poller_total_entries() {
+       $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s' AND NOT `done`", dbesc(NULL_DATE));
+       if (dbm::is_result($s)) {
+               return $s[0]["total"];
+       } else {
+               return 0;
+       }
+}
+
+/**
+ * @brief Returns the highest priority in the worker queue that isn't executed
+ *
+ * @return integer Number of active poller processes
+ */
+function poller_highest_priority() {
+       $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' AND NOT `done` ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
+       if (dbm::is_result($s)) {
+               return $s[0]["priority"];
+       } else {
+               return 0;
+       }
+}
+
+/**
+ * @brief Returns if a process with the given priority is running
+ *
+ * @param integer $priority The priority that should be checked
+ *
+ * @return integer Is there a process running with that priority?
+ */
+function poller_process_with_priority_active($priority) {
+       $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' AND NOT `done` LIMIT 1",
+                       intval($priority), dbesc(NULL_DATE));
+       return dbm::is_result($s);
+}
+
 /**
  * @brief Execute a worker entry
  *
@@ -128,6 +174,7 @@ function poller_run($argv, $argc){
  * @return boolean "true" if further processing should be stopped
  */
 function poller_execute($queue) {
+       global $poller_db_duration;
 
        $a = get_app();
 
@@ -168,9 +215,13 @@ function poller_execute($queue) {
 
        if (function_exists($funcname)) {
                poller_exec_function($queue, $funcname, $argv);
-               dba::delete('workerqueue', array('id' => $queue["id"]));
+
+               $stamp = (float)microtime(true);
+               dba::update('workerqueue', array('done' => true), array('id' => $queue["id"]));
+               $poller_db_duration = (microtime(true) - $stamp);
        } else {
                logger("Function ".$funcname." does not exist");
+               dba::delete('workerqueue', array('id' => $queue["id"]));
        }
 
        return true;
@@ -184,6 +235,7 @@ function poller_execute($queue) {
  * @param array $argv Array of values to be passed to the function
  */
 function poller_exec_function($queue, $funcname, $argv) {
+       global $poller_up_start, $poller_db_duration;
 
        $a = get_app();
 
@@ -214,13 +266,26 @@ function poller_exec_function($queue, $funcname, $argv) {
        // But preserve the old one for the worker
        $old_process_id = $a->process_id;
        $a->process_id = uniqid("wrk", true);
+       $a->queue = $queue;
+
+       $up_duration = number_format(microtime(true) - $poller_up_start, 3);
 
        $funcname($argv, $argc);
 
        $a->process_id = $old_process_id;
+       unset($a->queue);
 
        $duration = number_format(microtime(true) - $stamp, 3);
 
+       $poller_up_start = microtime(true);
+
+       /* With these values we can analyze how effective the worker is.
+        * The database and rest time should be low since this is the unproductive time.
+        * The execution time is the productive time.
+        * By changing parameters like the maximum number of workers we can check the effectivness.
+       */
+       logger('DB: '.number_format($poller_db_duration, 2).' - Rest: '.number_format($up_duration - $poller_db_duration, 2).' - Execution: '.number_format($duration, 2), LOGGER_DEBUG);
+
        if ($duration > 3600) {
                logger("Prio ".$queue["priority"].": ".$queue["parameter"]." - longer than 1 hour (".round($duration/60, 3).")", LOGGER_DEBUG);
        } elseif ($duration > 600) {
@@ -373,7 +438,7 @@ function poller_max_connections_reached() {
  *
  */
 function poller_kill_stale_workers() {
-       $r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > '%s'", dbesc(NULL_DATE));
+       $r = q("SELECT `pid`, `executed`, `priority`, `parameter` FROM `workerqueue` WHERE `executed` > '%s' AND NOT `done`", dbesc(NULL_DATE));
 
        if (!dbm::is_result($r)) {
                // No processing here needed
@@ -442,38 +507,35 @@ function poller_too_much_workers() {
                $slope = $maxworkers / pow($maxsysload, $exponent);
                $queues = ceil($slope * pow(max(0, $maxsysload - $load), $exponent));
 
-               // Create a list of queue entries grouped by their priority
-               $listitem = array();
+               if (Config::get('system', 'worker_debug')) {
+                       // Create a list of queue entries grouped by their priority
+                       $listitem = array();
 
-               // Adding all processes with no workerqueue entry
-               $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid`)");
-               if ($process = dba::fetch($processes)) {
-                       $listitem[0] = "0:".$process["running"];
-               }
-               dba::close($processes);
-
-               // Now adding all processes with workerqueue entries
-               $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` GROUP BY `priority`");
-               while ($entry = dba::fetch($entries)) {
-                       $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` WHERE `priority` = ?", $entry["priority"]);
+                       // Adding all processes with no workerqueue entry
+                       $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` WHERE NOT EXISTS (SELECT id FROM `workerqueue` WHERE `workerqueue`.`pid` = `process`.`pid` AND NOT `done`)");
                        if ($process = dba::fetch($processes)) {
-                               $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"];
+                               $listitem[0] = "0:".$process["running"];
                        }
                        dba::close($processes);
+
+                       // Now adding all processes with workerqueue entries
+                       $entries = dba::p("SELECT COUNT(*) AS `entries`, `priority` FROM `workerqueue` WHERE NOT `done` GROUP BY `priority`");
+                       while ($entry = dba::fetch($entries)) {
+                               $processes = dba::p("SELECT COUNT(*) AS `running` FROM `process` INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done` WHERE `priority` = ?", $entry["priority"]);
+                               if ($process = dba::fetch($processes)) {
+                                       $listitem[$entry["priority"]] = $entry["priority"].":".$process["running"]."/".$entry["entries"];
+                               }
+                               dba::close($processes);
+                       }
+                       dba::close($entries);
+                       $processlist = ' ('.implode(', ', $listitem).')';
                }
-               dba::close($entries);
-               $processlist = ' ('.implode(', ', $listitem).')';
 
-               $s = q("SELECT COUNT(*) AS `total` FROM `workerqueue` WHERE `executed` <= '%s'", dbesc(NULL_DATE));
-               $entries = $s[0]["total"];
+               $entries = poller_total_entries();
 
                if (Config::get("system", "worker_fastlane", false) && ($queues > 0) && ($entries > 0) && ($active >= $queues)) {
-                       $s = q("SELECT `priority` FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority` LIMIT 1", dbesc(NULL_DATE));
-                       $top_priority = $s[0]["priority"];
-
-                       $s = q("SELECT `id` FROM `workerqueue` WHERE `priority` <= %d AND `executed` > '%s' LIMIT 1",
-                               intval($top_priority), dbesc(NULL_DATE));
-                       $high_running = dbm::is_result($s);
+                       $top_priority = poller_highest_priority();
+                       $high_running = poller_process_with_priority_active($top_priority);
 
                        if (!$high_running && ($top_priority > PRIORITY_UNDEFINED) && ($top_priority < PRIORITY_NEGLIGIBLE)) {
                                logger("There are jobs with priority ".$top_priority." waiting but none is executed. Open a fastlane.", LOGGER_DEBUG);
@@ -521,7 +583,7 @@ function poller_passing_slow(&$highest_priority) {
 
        $r = q("SELECT `priority`
                FROM `process`
-               INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid`");
+               INNER JOIN `workerqueue` ON `workerqueue`.`pid` = `process`.`pid` AND NOT `done`");
 
        // No active processes at all? Fine
        if (!dbm::is_result($r)) {
@@ -558,88 +620,67 @@ function poller_passing_slow(&$highest_priority) {
 }
 
 /**
- * @brief Returns the next worker process
+ * @brief Find and claim the next worker process for us
  *
- * @return string SQL statement
+ * @return boolean Have we found something?
  */
-function poller_worker_process() {
-
+function find_worker_processes() {
        // Check if we should pass some low priority process
        $highest_priority = 0;
+       $found = false;
+       $limit = Config::get('system', 'worker_fetch_limit', 5);
 
        if (poller_passing_slow($highest_priority)) {
-               dba::lock('workerqueue');
-
                // Are there waiting processes with a higher priority than the currently highest?
-               $r = q("SELECT * FROM `workerqueue`
-                               WHERE `executed` <= '%s' AND `priority` < %d
-                               ORDER BY `priority`, `created` LIMIT 1",
-                               dbesc(NULL_DATE),
-                               intval($highest_priority));
-               if (dbm::is_result($r)) {
-                       return $r;
+               $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
+                                       WHERE `executed` <= ? AND `priority` < ? AND NOT `done`
+                                       ORDER BY `priority`, `created` LIMIT ".intval($limit),
+                               datetime_convert(), getmypid(), NULL_DATE, $highest_priority);
+               if ($result) {
+                       $found = (dba::affected_rows() > 0);
                }
-               // Give slower processes some processing time
-               $r = q("SELECT * FROM `workerqueue`
-                               WHERE `executed` <= '%s' AND `priority` > %d
-                               ORDER BY `priority`, `created` LIMIT 1",
-                               dbesc(NULL_DATE),
-                               intval($highest_priority));
 
-               if (dbm::is_result($r)) {
-                       return $r;
+               if (!$found) {
+                       // Give slower processes some processing time
+                       $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ?
+                                               WHERE `executed` <= ? AND `priority` > ? AND NOT `done`
+                                               ORDER BY `priority`, `created` LIMIT 1",
+                                       datetime_convert(), getmypid(), NULL_DATE, $highest_priority);
+                       if ($result) {
+                               $found = (dba::affected_rows() > 0);
+                       }
                }
-       } else {
-               dba::lock('workerqueue');
        }
 
        // If there is no result (or we shouldn't pass lower processes) we check without priority limit
-       if (!dbm::is_result($r)) {
-               $r = q("SELECT * FROM `workerqueue` WHERE `executed` <= '%s' ORDER BY `priority`, `created` LIMIT 1", dbesc(NULL_DATE));
-       }
-
-       // We only unlock the tables here, when we got no data
-       if (!dbm::is_result($r)) {
-               dba::unlock();
+       if (!$found) {
+               $result = dba::e("UPDATE `workerqueue` SET `executed` = ?, `pid` = ? WHERE `executed` <= ? AND NOT `done` ORDER BY `priority`, `created` LIMIT ".intval($limit),
+                               datetime_convert(), getmypid(), NULL_DATE);
+               if ($result) {
+                       $found = (dba::affected_rows() > 0);
+               }
        }
-
-       return $r;
+       return $found;
 }
 
 /**
- * @brief Assigns a workerqueue entry to the current process
- *
- * When we are sure that the table locks are working correctly, we can remove the checks from here
- *
- * @param array $queue Workerqueue entry
+ * @brief Returns the next worker process
  *
- * @return boolean "true" if the claiming was successful
+ * @return string SQL statement
  */
-function poller_claim_process($queue) {
-       $mypid = getmypid();
+function poller_worker_process() {
+       global $poller_db_duration;
 
-       $success = dba::update('workerqueue', array('executed' => datetime_convert(), 'pid' => $mypid),
-                       array('id' => $queue["id"], 'pid' => 0));
-       dba::unlock();
+       $stamp = (float)microtime(true);
 
-       if (!$success) {
-               logger("Couldn't update queue entry ".$queue["id"]." - skip this execution", LOGGER_DEBUG);
-               return false;
-       }
+       $found = find_worker_processes();
 
-       // Assure that there are no tasks executed twice
-       $id = q("SELECT `pid`, `executed` FROM `workerqueue` WHERE `id` = %d", intval($queue["id"]));
-       if (!$id) {
-               logger("Queue item ".$queue["id"]." vanished - skip this execution", LOGGER_DEBUG);
-               return false;
-       } elseif ((strtotime($id[0]["executed"]) <= 0) || ($id[0]["pid"] == 0)) {
-               logger("Entry for queue item ".$queue["id"]." wasn't stored - skip this execution", LOGGER_DEBUG);
-               return false;
-       } elseif ($id[0]["pid"] != $mypid) {
-               logger("Queue item ".$queue["id"]." is to be executed by process ".$id[0]["pid"]." and not by me (".$mypid.") - skip this execution", LOGGER_DEBUG);
-               return false;
+       $poller_db_duration += (microtime(true) - $stamp);
+
+       if ($found) {
+               $r = q("SELECT * FROM `workerqueue` WHERE `pid` = %d", intval(getmypid()));
        }
-       return true;
+       return $r;
 }
 
 /**
index 1112969f2705ee3a9f773aceded4c0dfef8558e1..580e3ffce18038f8b86f521f28acdf2d54a80aaf 100644 (file)
@@ -7,6 +7,7 @@ require_once('include/items.php');
 require_once('include/ostatus.php');
 
 function pubsubpublish_run(&$argv, &$argc){
+       global $a;
 
        if ($argc > 1) {
                $pubsubpublish_id = intval($argv[1]);
@@ -17,7 +18,8 @@ function pubsubpublish_run(&$argv, &$argc){
 
                foreach ($r as $rr) {
                        logger("Publish feed to ".$rr["callback_url"], LOGGER_DEBUG);
-                       proc_run(array('priority' => PRIORITY_HIGH, 'dont_fork' => true), 'include/pubsubpublish.php', $rr["id"]);
+                       proc_run(array('priority' => PRIORITY_HIGH, 'created' => $a->queue['created'], 'dont_fork' => true),
+                                       'include/pubsubpublish.php', (int)$rr["id"]);
                }
        }
 
index a9816b06cea4f582da32327406da37bc2352ce21..0d347882e2b16c95120a5fcbd4f16e5c146d96ab 100644 (file)
@@ -17,9 +17,12 @@ function display_init(App $a) {
                if (substr($a->argv[2], -5) == '.atom') {
                        $item_id = substr($a->argv[2], 0, -5);
                        $xml = dfrn::itemFeed($item_id);
+                       if ($xml == '') {
+                               http_status_exit(500);
+                       }
                        header("Content-type: application/atom+xml");
                        echo $xml;
-                       http_status_exit(($xml) ? 200 : 500);
+                       killme();
                }
        }
 
index f6b568ae8ac2c59b5c3e532ef81f8eacce318740..94ca007511297c5b5b73b1e85771dcf290ce738e 100644 (file)
@@ -100,6 +100,7 @@ class App {
         */
        public $template_engine_instance = array();
        public $process_id;
+       public $queue;
        private $ldelim = array(
                'internal' => '',
                'smarty3' => '{{'
index b2c5afc662acf65caeb5fd3d23e2a807b0cf15d2..36f408cf324b4e1603c82e9c6d963579e0ab85cc 100644 (file)
@@ -57,10 +57,11 @@ class Lock {
 
                $memcache = self::connectMemcache();
                if (is_object($memcache)) {
-                       $wait_sec = 0.2;
                        $cachekey = get_app()->get_hostname().";lock:".$fn_name;
 
                        do {
+                               // We only lock to be sure that nothing happens at exactly the same time
+                               dba::lock('locks');
                                $lock = $memcache->get($cachekey);
 
                                if (!is_bool($lock)) {
@@ -76,16 +77,17 @@ class Lock {
                                        $memcache->set($cachekey, getmypid(), MEMCACHE_COMPRESSED, 300);
                                        $got_lock = true;
                                }
+
+                               dba::unlock();
+
                                if (!$got_lock && ($timeout > 0)) {
-                                       usleep($wait_sec * 1000000);
+                                       usleep(rand(10000, 200000));
                                }
                        } while (!$got_lock && ((time() - $start) < $timeout));
 
                        return $got_lock;
                }
 
-               $wait_sec = 2;
-
                do {
                        dba::lock('locks');
                        $lock = dba::select('locks', array('locked', 'pid'), array('name' => $fn_name), array('limit' => 1));
@@ -113,7 +115,7 @@ class Lock {
                        dba::unlock();
 
                        if (!$got_lock && ($timeout > 0)) {
-                               sleep($wait_sec);
+                               usleep(rand(100000, 2000000));
                        }
                } while (!$got_lock && ((time() - $start) < $timeout));
 
index aaa7aa0b9e62161b0cb74f7010d122a60a1d5487..09f11918c5ba026831c8c4928b5d653932a7407e 100644 (file)
@@ -1,6 +1,6 @@
 <?php
 
-define('UPDATE_VERSION' , 1229);
+define('UPDATE_VERSION' , 1230);
 
 /**
  *