]> git.mxchange.org Git - friendica-addons.git/commitdiff
Use workerqueue for mailstream jobs instead of custom table
authorMatthew Exon <git.mexon@spamgourmet.com>
Sat, 8 May 2021 19:27:13 +0000 (21:27 +0200)
committerMatthew Exon <git.mexon@spamgourmet.com>
Thu, 10 Jun 2021 16:11:26 +0000 (18:11 +0200)
mailstream/mailstream.php

index c22516b3e342de0f9b1fb3461f29e5c2a9a0251a..d877f1eb86d60d0607ff72baf71ddde4133ee5ec 100644 (file)
@@ -2,7 +2,7 @@
 /**
  * Name: Mail Stream
  * Description: Mail all items coming into your network feed to an email address
- * Version: 1.1
+ * Version: 2.0
  * Author: Matthew Exon <http://mat.exon.name>
  */
 
@@ -14,7 +14,9 @@ use Friendica\Database\DBA;
 use Friendica\DI;
 use Friendica\Model\Item;
 use Friendica\Model\Post;
+use Friendica\Model\User;
 use Friendica\Protocol\Activity;
+use Friendica\Util\DateTimeFormat;
 
 /**
  * Sets up the addon hooks and the database table
@@ -26,43 +28,32 @@ function mailstream_install()
        Hook::register('post_local_end', 'addon/mailstream/mailstream.php', 'mailstream_post_hook');
        Hook::register('post_remote_end', 'addon/mailstream/mailstream.php', 'mailstream_post_hook');
        Hook::register('cron', 'addon/mailstream/mailstream.php', 'mailstream_cron');
+       Hook::register('mailstream_send_hook', 'addon/mailstream/mailstream.php', 'mailstream_send_hook');
 
-       if (DI::config()->get('mailstream', 'dbversion') == '0.1') {
-               q('ALTER TABLE `mailstream_item` DROP INDEX `uid`');
-               q('ALTER TABLE `mailstream_item` DROP INDEX `contact-id`');
-               q('ALTER TABLE `mailstream_item` DROP INDEX `plink`');
-               q('ALTER TABLE `mailstream_item` CHANGE `plink` `uri` char(255) NOT NULL');
-               DI::config()->set('mailstream', 'dbversion', '0.2');
-       }
-       if (DI::config()->get('mailstream', 'dbversion') == '0.2') {
-               q('DELETE FROM `pconfig` WHERE `cat` = "mailstream" AND `k` = "delay"');
-               DI::config()->set('mailstream', 'dbversion', '0.3');
-       }
-       if (DI::config()->get('mailstream', 'dbversion') == '0.3') {
-               q('ALTER TABLE `mailstream_item` CHANGE `created` `created` timestamp NOT NULL DEFAULT now()');
-               q('ALTER TABLE `mailstream_item` CHANGE `completed` `completed` timestamp NULL DEFAULT NULL');
-               DI::config()->set('mailstream', 'dbversion', '0.4');
-       }
-       if (DI::config()->get('mailstream', 'dbversion') == '0.4') {
-               q('ALTER TABLE `mailstream_item` CONVERT TO CHARACTER SET utf8 COLLATE utf8_bin');
-               DI::config()->set('mailstream', 'dbversion', '0.5');
-       }
-       if (DI::config()->get('mailstream', 'dbversion') == '0.5') {
-               DI::config()->set('mailstream', 'dbversion', '1.0');
-       }
+       Logger::info("mailstream: installed");
+}
 
-       if (DI::config()->get('retriever', 'dbversion') != '1.0') {
-               $schema = file_get_contents(dirname(__file__).'/database.sql');
-               $arr = explode(';', $schema);
-               foreach ($arr as $a) {
-                       $r = q($a);
-               }
-               DI::config()->set('mailstream', 'dbversion', '1.0');
+/**
+ * Enforces that mailstream_install has set up the current version
+ */
+function mailstream_check_version()
+{
+       if (!is_null(DI::config()->get('mailstream', 'dbversion'))) {
+               DI::config()->delete('mailstream', 'dbversion');
+               Logger::info("mailstream_check_version: old version detected, reinstalling");
+               mailstream_install();
+               Hook::loadHooks();
+               Hook::add(
+                       'mailstream_convert_table_entries',
+                       'addon/mailstream/mailstream.php',
+                       'mailstream_convert_table_entries'
+               );
+               Hook::fork(PRIORITY_LOW, 'mailstream_convert_table_entries');
        }
 }
 
 /**
- * This funciton indicates a module that can be wrapped in the LegacyModule class
+ * This function indicates a module that can be wrapped in the LegacyModule class
  */
 function mailstream_module()
 {
@@ -114,16 +105,40 @@ function mailstream_generate_id($uri)
        return $message_id;
 }
 
+function mailstream_send_hook(&$a, $data)
+{
+       $criteria = array('uid' => $data['uid'], 'contact-id' => $data['contact-id'], 'uri' => $data['uri']);
+       $item = Post::selectFirst([], $criteria);
+       if (empty($item)) {
+               Logger::error('mailstream_send_hook could not find item');
+               return;
+       }
+
+       $user = User::getById($item['uid']);
+       if (empty($user)) {
+                       Logger::error('mailstream_send_hook could not fund user', ['uid' => $item['uid']]);
+               return;
+       }
+
+       if (!mailstream_send($data['message_id'], $item, $user)) {
+               $delayed = date(DateTimeFormat::utc('now + 1 hour'));
+               $data['tries'] += 1;
+               Hook::fork(['priority' => PRIORITY_LOW, 'delayed' => $delayed], 'mailstream_send_hook', $data);
+       }
+}
+
 /**
- * Called when either a local or remote post is created.  Creates a
- * record in the mailstream_item table to track this email, and then
- * immediately attempts to send it
+ * Called when either a local or remote post is created.  If
+ * mailstream is enabled and the necessary data is available, forks a
+ * workerqueue item to send the email.
  *
  * @param Friendica\App $a    App object (unused)
  * @param array         $item content of the item (may or may not already be stored in the item table)
  */
 function mailstream_post_hook(&$a, &$item)
 {
+       mailstream_check_version();
+
        if (!DI::pConfig()->get($item['uid'], 'mailstream', 'enabled')) {
                Logger::debug('mailstream: not enabled for item ' . $item['id']);
                return;
@@ -152,50 +167,13 @@ function mailstream_post_hook(&$a, &$item)
        }
 
        $message_id = mailstream_generate_id($item['uri']);
-       q(
-               "INSERT INTO `mailstream_item` (`uid`, `contact-id`, `uri`, `message-id`) " .
-               "VALUES (%d, '%s', '%s', '%s')",
-               intval($item['uid']),
-               intval($item['contact-id']),
-               DBA::escape($item['uri']),
-               DBA::escape($message_id)
-       );
-       $r = q(
-               'SELECT * FROM `mailstream_item` WHERE `uid` = %d AND `contact-id` = %d AND `uri` = "%s"',
-               intval($item['uid']),
-               intval($item['contact-id']),
-               DBA::escape($item['uri'])
-       );
-       if (count($r) != 1) {
-               Logger::info('mailstream_post_remote_hook: Unexpected number of items returned from mailstream_item');
-               return;
-       }
-       $ms_item = $r[0];
-       Logger::debug('mailstream_post_remote_hook: created mailstream_item ' . $ms_item['id'] .
-                                         ' for item ' . $item['uri'] . ' ' . $item['uid'] . ' ' . $item['contact-id']);
-       $user = mailstream_get_user($item['uid']);
-       if (!$user) {
-               Logger::info('mailstream_post_remote_hook: no user ' . $item['uid']);
-               return;
-       }
-       mailstream_send($ms_item['message-id'], $item, $user);
-}
 
-/**
- * Converts a user ID into a full user record from the corresponding database table
- *
- * @param int $uid ID of the user to query
- *
- * @return array results from the user table
- */
-function mailstream_get_user($uid)
-{
-       $r = q('SELECT * FROM `user` WHERE `uid` = %d', intval($uid));
-       if (count($r) != 1) {
-               Logger::info('mailstream_post_remote_hook: Unexpected number of users returned');
-               return;
-       }
-       return $r[0];
+       $send_hook_data = array('uid' => $item['uid'],
+                                                               'contact-id' => $item['contact-id'],
+                                                               'uri' => $item['uri'],
+                                                               'message_id' => $message_id,
+                                                               'tries' => 0);
+       Hook::fork(PRIORITY_LOW, 'mailstream_send_hook', $send_hook_data);
 }
 
 /**
@@ -326,10 +304,13 @@ function mailstream_subject($item)
                intval($item['uid'])
        );
        if (!DBA::isResult($r)) {
-               Logger::error(
-                       'mailstream_subject no contact for item',
-                       ['item id' => $item['id'], 'plink' => $item['plink'], 'contact id' => $item['contact-id'], 'uid' => $item['uid']]
-               );
+                       Logger::error(
+                               'mailstream_subject no contact for item',
+                               ['id' => $item['id'],
+                                 'plink' => $item['plink'],
+                                 'contact id' => $item['contact-id'],
+                               'uid' => $item['uid']]
+                       );
                return DI::l10n()->t("Friendica post");
        }
        $contact = $r[0];
@@ -361,14 +342,24 @@ function mailstream_subject($item)
  * @param string $message_id ID of the message (RFC 1036)
  * @param array  $item       content of the item
  * @param array  $user       results from the user table
+ *
+ * @return bool True if this message has been completed.  False if it should be retried.
  */
 function mailstream_send($message_id, $item, $user)
 {
-       if (!$item['visible']) {
+       if (!is_array($item)) {
+               Logger::error('mailstream_send item is empty', ['message_id' => $message_id]);
                return;
        }
+
+       if (!$item['visible']) {
+               Logger::debug('mailstream_send item not yet visible', ['item uri' => $item['uri']]);
+               return false;
+       }
        if (!$message_id) {
-               return;
+               Logger::error('mailstream_send no message ID supplied', ['item uri' => $item['uri'],
+                               'user email' => $user['email']]);
+               return true;
        }
        require_once(dirname(__file__).'/phpmailer/class.phpmailer.php');
 
@@ -418,16 +409,16 @@ function mailstream_send($message_id, $item, $user)
                if (!$mail->Send()) {
                        throw new Exception($mail->ErrorInfo);
                }
-               Logger::debug('mailstream_send sent message ' . $mail->MessageID . ' ' . $mail->Subject);
+               Logger::debug('mailstream_send sent message', ['message ID' => $mail->MessageID,
+                               'subject' => $mail->Subject,
+                               'address' => $address]);
        } catch (phpmailerException $e) {
                Logger::debug('mailstream_send PHPMailer exception sending message ' . $message_id . ': ' . $e->errorMessage());
        } catch (Exception $e) {
                Logger::debug('mailstream_send exception sending message ' . $message_id . ': ' . $e->getMessage());
        }
-       // In case of failure, still set the item to completed.  Otherwise
-       // we'll just try to send it over and over again and it'll fail
-       // every time.
-       q('UPDATE `mailstream_item` SET `completed` = now() WHERE `message-id` = "%s"', DBA::escape($message_id));
+
+       return true;
 }
 
 /**
@@ -447,60 +438,41 @@ function mailstream_html_wrap(&$text)
 }
 
 /**
- * Cron job for the mailstream plugin.  Sends delayed messages and cleans up old successful entries from the table.
+ * Convert v1 mailstream table entries to v2 workerqueue items
  */
-function mailstream_cron()
+function mailstream_convert_table_entries()
 {
-       // Only process items older than an hour in cron.  This is because
-       // we want to give mailstream_post_remote_hook a fair chance to
-       // send the email itself before cron jumps in.  Only if
-       // mailstream_post_remote_hook fails for some reason will this get
-       // used, and in that case it's worth holding off a bit anyway.
        $query = <<< EOT
 SELECT
-  `mailstream_item`.`message-id`,
-  `mailstream_item`.`uri`,
-  `post-user-view`.`id`
+  `message-id`,
+  `uri`,
+  `uid`,
+  `contact-id`
 FROM
    `mailstream_item`
-  JOIN
-   `post-user-view`
-  ON (
-    `mailstream_item`.`uid` = `post-user-view`.`uid` AND
-    `mailstream_item`.`uri` = `post-user-view`.`uri` AND
-    `mailstream_item`.`contact-id` = `post-user-view`.`contact-id`
-  )
 WHERE
-  `mailstream_item`.`completed` IS NULL AND
-  `mailstream_item`.`created` < DATE_SUB(NOW(), INTERVAL 1 HOUR) AND
-  `post-user-view`.`visible` = 1
-ORDER BY `mailstream_item`.`created`
-LIMIT 100
+  `mailstream_item`.`completed` IS NULL
 
 EOT;
        $ms_item_ids = q($query);
        if (DBA::isResult($ms_item_ids)) {
-               Logger::debug('mailstream_cron processing ' . count($ms_item_ids) . ' items');
+               Logger::debug('mailstream_convert_table_entries processing ' . count($ms_item_ids) . ' items');
                foreach ($ms_item_ids as $ms_item_id) {
+                       $send_hook_data = array('uid' => $ms_item_id['uid'],
+                                               'contact-id' => $ms_item_id['contact-id'],
+                                               'uri' => $ms_item_id['uri'],
+                                               'message_id' => $ms_item_id['message-id'],
+                                               'tries' => 0);
                        if (!$ms_item_id['message-id'] || !strlen($ms_item_id['message-id'])) {
-                               Logger::info('mailstream_cron: Item ' . $ms_item_id['id'] .
-                                                                                        ' URI ' . $ms_item_id['uri'] . ' has no message-id');
-                       }
-                       $item = Post::selectFirst([], ['id' => $ms_item_id['id']]);
-                       $users = q("SELECT * FROM `user` WHERE `uid` = %d", intval($item['uid']));
-                       $user = $users[0];
-                       if ($user && $item) {
-                               mailstream_send($ms_item_id['message-id'], $item, $user);
-                       } else {
-                               Logger::info('mailstream_cron: Unable to find item ' . $ms_item_id['id']);
-                               q(
-                                       "UPDATE `mailstream_item` SET `completed` = now() WHERE `message-id` = %d",
-                                       intval($ms_item_id['message-id'])
-                               );
+                               Logger::info('mailstream_cron: Item ' .
+                                                               $ms_item_id['id'] . ' URI ' . $ms_item_id['uri'] . ' has no message-id');
+                                                               continue;
                        }
+                       Logger::info('mailstream_convert_table_entries: convert item to workerqueue', $send_hook_data);
+                       Hook::fork(PRIORITY_LOW, 'mailstream_send_hook', $send_hook_data);
                }
        }
-       mailstream_tidy();
+       q('DROP TABLE `mailstream_item`');
 }
 
 /**
@@ -567,25 +539,3 @@ function mailstream_addon_settings_post()
                DI::pConfig()->delete(local_user(), 'mailstream', 'attachimg');
        }
 }
-
-/**
- * Deletes records from the mailstream_item table older than one year
- */
-function mailstream_tidy()
-{
-       $query = <<< EOT
-SELECT
-  id
-FROM
-  mailstream_item
-WHERE
-  completed IS NOT NULL AND
-  completed < DATE_SUB(NOW(), INTERVAL 1 YEAR)
-
-EOT;
-       $r = q($query);
-       foreach ($r as $rr) {
-               q('DELETE FROM mailstream_item WHERE id = %d', intval($rr['id']));
-       }
-       Logger::debug('mailstream_tidy: deleted ' . count($r) . ' old items');
-}