]> git.mxchange.org Git - friendica.git/blob - src/Protocol/ActivityPub/Queue.php
Optimizing
[friendica.git] / src / Protocol / ActivityPub / Queue.php
1 <?php
2 /**
3  * @copyright Copyright (C) 2010-2022, the Friendica project
4  *
5  * @license GNU AGPL version 3 or any later version
6  *
7  * This program is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU Affero General Public License as
9  * published by the Free Software Foundation, either version 3 of the
10  * License, or (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU Affero General Public License for more details.
16  *
17  * You should have received a copy of the GNU Affero General Public License
18  * along with this program.  If not, see <https://www.gnu.org/licenses/>.
19  *
20  */
21
22 namespace Friendica\Protocol\ActivityPub;
23
24 use Friendica\Core\Logger;
25 use Friendica\Database\Database;
26 use Friendica\Database\DBA;
27 use Friendica\DI;
28 use Friendica\Util\DateTimeFormat;
29
30 /**
31  * This class handles the processing of incoming posts
32  */
33 class Queue
34 {
35         /**
36          * Add activity to the queue
37          *
38          * @param array $activity
39          * @param string $type
40          * @param integer $uid
41          * @param string $http_signer
42          * @param boolean $push
43          * @return array
44          */
45         public static function add(array $activity, string $type, int $uid, string $http_signer, bool $push): array
46         {
47                 $fields = [
48                         'activity-id' => $activity['id'],
49                         'object-id'   => $activity['object_id'],
50                         'type'        => $type,
51                         'object-type' => $activity['object_type'],
52                         'activity'    => json_encode($activity, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT),
53                         'received'    => DateTimeFormat::utcNow(),
54                         'push'        => $push,
55                 ];
56
57                 if (!empty($activity['reply-to-id'])) {
58                         $fields['in-reply-to-id'] = $activity['reply-to-id'];
59                 }
60
61                 if (!empty($activity['context'])) {
62                         $fields['conversation'] = $activity['context'];
63                 } elseif (!empty($activity['conversation'])) {
64                         $fields['conversation'] = $activity['conversation'];
65                 }
66
67                 if (!empty($activity['object_object_type'])) {
68                         $fields['object-object-type'] = $activity['object_object_type'];
69                 }
70
71                 if (!empty($http_signer)) {
72                         $fields['signer'] = $http_signer;
73                 }
74
75                 DBA::insert('inbox-entry', $fields, Database::INSERT_IGNORE);
76
77                 $queue = DBA::selectFirst('inbox-entry', ['id'], ['activity-id' => $activity['id']]);
78                 if (!empty($queue['id'])) {
79                         $activity['entry-id'] = $queue['id'];
80                         DBA::insert('inbox-entry-receiver', ['queue-id' => $queue['id'], 'uid' => $uid], Database::INSERT_IGNORE);
81                 }
82                 return $activity;
83         }
84
85         /**
86          * Remove activity from the queue
87          *
88          * @param array $activity
89          * @return void
90          */
91         public static function remove(array $activity = [])
92         {
93                 if (empty($activity['entry-id'])) {
94                         return;
95                 }
96                 DBA::delete('inbox-entry', ['id' => $activity['entry-id']]);
97         }
98
99         /**
100          * Delete all entries that depend on the given worker id
101          *
102          * @param integer $wid
103          * @return void
104          */
105         public static function deleteByWorkerId(int $wid)
106         {
107                 $entries = DBA::select('inbox-entry', ['id'], ['wid' => $wid]);
108                 while ($entry = DBA::fetch($entries)) {
109                         self::deleteById($entry['id']);
110                 }
111                 DBA::close($entries);
112         }
113
114         /**
115          * Delete recursively an entry and all their children
116          *
117          * @param integer $id
118          * @return void
119          */
120         public static function deleteById(int $id)
121         {
122                 $entry = DBA::selectFirst('inbox-entry', ['id', 'object-id'], ['id' => $id]);
123                 if (empty($entry)) {
124                         return;
125                 }
126
127                 $children = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $entry['object-id']]);
128                 while ($child = DBA::fetch($children)) {
129                         self::deleteById($child['id']);
130                 }
131                 DBA::close($children);
132                 DBA::delete('inbox-entry', ['id' => $entry['id']]);
133         }
134
135         /**
136          * Set the worker id for the queue entry
137          *
138          * @param array $activity
139          * @param int   $wid
140          * @return void
141          */
142         public static function setWorkerId(array $activity, int $wid)
143         {
144                 if (empty($activity['entry-id']) || empty($wid)) {
145                         return;
146                 }
147                 DBA::update('inbox-entry', ['wid' => $wid], ['id' => $activity['entry-id']]);
148         }
149
150         /**
151          * Check if there is an assigned worker task
152          *
153          * @param array $activity
154          * @return bool
155          */
156         public static function hasWorker(array $activity = []): bool
157         {
158                 if (empty($activity['worker-id'])) {
159                         return false;
160                 }
161                 return DBA::exists('workerqueue', ['id' => $activity['worker-id'], 'done' => false]);
162         }
163
164         /**
165          * Process the activity with the given id
166          *
167          * @param integer $id
168          * @return void
169          */
170         public static function process(int $id)
171         {
172                 $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
173                 if (empty($entry)) {
174                         return;
175                 }
176
177                 Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]);
178
179                 $activity = json_decode($entry['activity'], true);
180                 $type     = $entry['type'];
181                 $push     = $entry['push'];
182
183                 $activity['entry-id']        = $entry['id'];
184                 $activity['worker-id']       = $entry['wid'];
185                 $activity['recursion-depth'] = 0;
186
187                 $receivers = DBA::select('inbox-entry-receiver', ['uid'], ['queue-id' => $entry['id']]);
188                 while ($receiver = DBA::fetch($receivers)) {
189                         if (!in_array($receiver['uid'], $activity['receiver'])) {
190                                 $activity['receiver'][] = $receiver['uid'];
191                         }
192                 }
193                 DBA::close($receivers);
194
195                 if (!Receiver::routeActivities($activity, $type, $push)) {
196                         self::remove($activity);
197                 }
198         }
199
200         /**
201          * Process all activities
202          *
203          * @return void
204          */
205         public static function processAll()
206         {
207                 $entries = DBA::select('inbox-entry', ['id', 'type', 'object-type', 'object-id', 'in-reply-to-id'], ["`wid` IS NULL"], ['order' => ['id' => true]]);
208                 while ($entry = DBA::fetch($entries)) {
209                         // We don't need to process entries that depend on already existing entries.
210                         if (!empty($entry['in-reply-to-id']) && DBA::exists('inbox-entry', ["`id` != ? AND `object-id` = ?", $entry['id'], $entry['in-reply-to-id']])) {
211                                 continue;
212                         }
213                         Logger::debug('Process leftover entry', $entry);
214                         self::process($entry['id']);
215                 }
216         }
217
218         /**
219          * Clear old activities
220          *
221          * @return void
222          */
223         public static function clear()
224         {
225                 // We delete all entries that aren't associated with a worker entry after seven days.
226                 // The other entries are deleted when the worker deferred for too long.
227                 DBA::delete('inbox-entry', ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]);
228
229                 // Optimizing this table only last seconds
230                 if (DI::config()->get('system', 'optimize_tables')) {
231                         Logger::info('Optimize start');
232                         DBA::e("OPTIMIZE TABLE `inbox-entry`");
233                         Logger::info('Optimize end');
234                 }
235         }
236
237         /**
238          * Process all activities that are children of a given post url
239          *
240          * @param string $uri
241          * @return void
242          */
243         public static function processReplyByUri(string $uri)
244         {
245                 $entries = DBA::select('inbox-entry', ['id'], ["`in-reply-to-id` = ? AND `object-id` != ?", $uri, $uri]);
246                 while ($entry = DBA::fetch($entries)) {
247                         self::process($entry['id']);
248                 }
249         }
250 }