]> git.mxchange.org Git - friendica.git/blob - src/Protocol/ActivityPub/Queue.php
Avoid to provess the same activity
[friendica.git] / src / Protocol / ActivityPub / Queue.php
1 <?php
2 /**
3  * @copyright Copyright (C) 2010-2022, the Friendica project
4  *
5  * @license GNU AGPL version 3 or any later version
6  *
7  * This program is free software: you can redistribute it and/or modify
8  * it under the terms of the GNU Affero General Public License as
9  * published by the Free Software Foundation, either version 3 of the
10  * License, or (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU Affero General Public License for more details.
16  *
17  * You should have received a copy of the GNU Affero General Public License
18  * along with this program.  If not, see <https://www.gnu.org/licenses/>.
19  *
20  */
21
22 namespace Friendica\Protocol\ActivityPub;
23
24 use Friendica\Core\Logger;
25 use Friendica\Core\System;
26 use Friendica\Database\Database;
27 use Friendica\Database\DBA;
28 use Friendica\DI;
29 use Friendica\Model\Post;
30 use Friendica\Util\DateTimeFormat;
31 use Friendica\Util\JsonLD;
32
33 /**
34  * This class handles the processing of incoming posts
35  */
36 class Queue
37 {
38         /**
39          * Add activity to the queue
40          *
41          * @param array $activity
42          * @param string $type
43          * @param integer $uid
44          * @param string $http_signer
45          * @param boolean $push
46          * @return array
47          */
48         public static function add(array $activity, string $type, int $uid, string $http_signer, bool $push, bool $trust_source): array
49         {
50                 $fields = [
51                         'activity-id' => $activity['id'],
52                         'object-id'   => $activity['object_id'],
53                         'type'        => $type,
54                         'object-type' => $activity['object_type'],
55                         'activity'    => json_encode($activity, JSON_UNESCAPED_SLASHES | JSON_UNESCAPED_UNICODE | JSON_PRETTY_PRINT),
56                         'received'    => DateTimeFormat::utcNow(),
57                         'push'        => $push,
58                         'trust'       => $trust_source,
59                 ];
60
61                 if (!empty($activity['reply-to-id'])) {
62                         $fields['in-reply-to-id'] = $activity['reply-to-id'];
63                 }
64
65                 if (!empty($activity['context'])) {
66                         $fields['conversation'] = $activity['context'];
67                 } elseif (!empty($activity['conversation'])) {
68                         $fields['conversation'] = $activity['conversation'];
69                 }
70
71                 if (!empty($activity['object_object_type'])) {
72                         $fields['object-object-type'] = $activity['object_object_type'];
73                 }
74
75                 if (!empty($http_signer)) {
76                         $fields['signer'] = $http_signer;
77                 }
78
79                 DBA::insert('inbox-entry', $fields, Database::INSERT_IGNORE);
80
81                 $queue = DBA::selectFirst('inbox-entry', ['id'], ['activity-id' => $activity['id']]);
82                 if (!empty($queue['id'])) {
83                         $activity['entry-id'] = $queue['id'];
84                         DBA::insert('inbox-entry-receiver', ['queue-id' => $queue['id'], 'uid' => $uid], Database::INSERT_IGNORE);
85                 }
86                 return $activity;
87         }
88
89         /**
90          * Checks if an entry for a given url and type already exists
91          *
92          * @param string $url
93          * @param string $type
94          * @return boolean
95          */
96         public static function exists(string $url, string $type): bool
97         {
98                 return DBA::exists('inbox-entry', ['type' => $type, 'object-id' => $url]);
99         }
100
101         /**
102          * Remove activity from the queue
103          *
104          * @param array $activity
105          * @return void
106          */
107         public static function remove(array $activity = [])
108         {
109                 if (empty($activity['entry-id'])) {
110                         return;
111                 }
112                 DBA::delete('inbox-entry', ['id' => $activity['entry-id']]);
113         }
114
115         /**
116          * Delete all entries that depend on the given worker id
117          *
118          * @param integer $wid
119          * @return void
120          */
121         public static function deleteByWorkerId(int $wid)
122         {
123                 $entries = DBA::select('inbox-entry', ['id'], ['wid' => $wid]);
124                 while ($entry = DBA::fetch($entries)) {
125                         self::deleteById($entry['id']);
126                 }
127                 DBA::close($entries);
128         }
129
130         /**
131          * Delete recursively an entry and all their children
132          *
133          * @param integer $id
134          * @return void
135          */
136         public static function deleteById(int $id)
137         {
138                 $entry = DBA::selectFirst('inbox-entry', ['id', 'object-id'], ['id' => $id]);
139                 if (empty($entry)) {
140                         return;
141                 }
142
143                 DBA::delete('inbox-entry', ['id' => $entry['id']]);
144         }
145
146         /**
147          * Set the worker id for the queue entry
148          *
149          * @param int $entry_id
150          * @param int $wid
151          * @return void
152          */
153         public static function setWorkerId(int $entry_id, int $wid)
154         {
155                 if (empty($entry_id) || empty($wid)) {
156                         return;
157                 }
158                 DBA::update('inbox-entry', ['wid' => $wid], ['id' => $entry_id]);
159         }
160
161         /**
162          * Check if there is an assigned worker task
163          *
164          * @param int $wid
165          *
166          * @return bool
167          */
168         public static function hasWorker(int $wid): bool
169         {
170                 if (empty($wid)) {
171                         return false;
172                 }
173                 return DBA::exists('workerqueue', ['id' => $wid, 'done' => false]);
174         }
175
176         /**
177          * Process the activity with the given id
178          *
179          * @param integer $id
180          * @param bool    $fetch_parents
181          *
182          * @return bool
183          */
184         public static function process(int $id, bool $fetch_parents = true): bool
185         {
186                 $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
187                 if (empty($entry)) {
188                         return false;
189                 }
190
191                 if (!empty($entry['wid'])) {
192                         $worker = DI::app()->getQueue();
193                         $wid = $worker['id'] ?? 0;
194                         if ($entry['wid'] != $wid) {
195                                 $workerqueue = DBA::selectFirst('workerqueue', ['pid'], ['id' => $entry['wid'], 'done' => false]);
196                                 if (!empty($workerqueue['pid']) && posix_kill($workerqueue['pid'], 0)) {
197                                         Logger::notice('Entry is already processed via another process.', ['current' => $wid, 'processor' => $entry['wid']]);
198                                         return false;
199                                 }
200                         }
201                 }
202
203                 Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id'], 'callstack' => System::callstack(20)]);
204
205                 $activity = json_decode($entry['activity'], true);
206                 $type     = $entry['type'];
207                 $push     = $entry['push'];
208
209                 $activity['entry-id']        = $entry['id'];
210                 $activity['worker-id']       = $entry['wid'];
211                 $activity['recursion-depth'] = 0;
212
213                 $receivers = DBA::select('inbox-entry-receiver', ['uid'], ["`queue-id` = ? AND `uid` != ?", $entry['id'], 0]);
214                 while ($receiver = DBA::fetch($receivers)) {
215                         if (!in_array($receiver['uid'], $activity['receiver'])) {
216                                 $activity['receiver'][] = $receiver['uid'];
217                         }
218                 }
219                 DBA::close($receivers);
220
221                 if (!Receiver::routeActivities($activity, $type, $push, $fetch_parents)) {
222                         self::remove($activity);
223                 }
224
225                 return true;
226         }
227
228         /**
229          * Process all activities
230          *
231          * @return void
232          */
233         public static function processAll()
234         {
235                 $entries = DBA::select('inbox-entry', ['id', 'type', 'object-type', 'object-id', 'in-reply-to-id'], ["`trust` AND `wid` IS NULL"], ['order' => ['id' => true]]);
236                 while ($entry = DBA::fetch($entries)) {
237                         // Don't process entries of items that are answer to non existing posts
238                         if (!empty($entry['in-reply-to-id']) && !Post::exists(['uri' => $entry['in-reply-to-id']])) {
239                                 continue;
240                         }
241                         // We don't need to process entries that depend on already existing entries.
242                         if (!empty($entry['in-reply-to-id']) && DBA::exists('inbox-entry', ["`id` != ? AND `object-id` = ?", $entry['id'], $entry['in-reply-to-id']])) {
243                                 continue;
244                         }
245                         Logger::debug('Process leftover entry', $entry);
246                         self::process($entry['id'], false);
247                 }
248                 DBA::close($entries);
249         }
250
251         /**
252          * Clear old activities
253          *
254          * @return void
255          */
256         public static function clear()
257         {
258                 // We delete all entries that aren't associated with a worker entry after seven days.
259                 // The other entries are deleted when the worker deferred for too long.
260                 DBA::delete('inbox-entry', ["`wid` IS NULL AND `received` < ?", DateTimeFormat::utc('now - 7 days')]);
261
262                 // Optimizing this table only last seconds
263                 if (DI::config()->get('system', 'optimize_tables')) {
264                         Logger::info('Optimize start');
265                         DBA::e("OPTIMIZE TABLE `inbox-entry`");
266                         Logger::info('Optimize end');
267                 }
268         }
269
270         /**
271          * Process all activities that are children of a given post url
272          *
273          * @param string $uri
274          * @return int
275          */
276         public static function processReplyByUri(string $uri): int
277         {
278                 $count = 0;
279                 $entries = DBA::select('inbox-entry', ['id'], ["`in-reply-to-id` = ? AND `object-id` != ?", $uri, $uri]);
280                 while ($entry = DBA::fetch($entries)) {
281                         $count += 1;
282                         self::process($entry['id'], false);
283                 }
284                 DBA::close($entries);
285                 return $count;
286         }
287
288         /**
289          * Checks if there are children of the given uri
290          *
291          * @param string $uri
292          *
293          * @return bool
294          */
295         public static function hasChildren(string $uri): bool
296         {
297                 return DBA::exists('inbox-entry', ["`in-reply-to-id` = ? AND `object-id` != ?", $uri, $uri]);
298         }
299
300         /**
301          * Prepare the queue entry.
302          * This is a test function that is used solely for development.
303          *
304          * @param integer $id
305          * @return array
306          */
307         public static function reprepareActivityById(int $id): array
308         {
309                 $entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
310                 if (empty($entry)) {
311                         return [];
312                 }
313
314                 $receiver = DBA::selectFirst('inbox-entry-receiver', ['uid'], ['queue-id' => $id]);
315                 if (!empty($receiver)) {
316                         $uid = $receiver['uid'];
317                 } else {
318                         $uid = 0;
319                 }
320
321                 $trust_source = $entry['trust'];
322
323                 $data     = json_decode($entry['activity'], true);
324                 $activity = json_decode($data['raw'], true);
325
326                 $ldactivity = JsonLD::compact($activity);
327                 return [
328                         'data'  => Receiver::prepareObjectData($ldactivity, $uid, $entry['push'], $trust_source),
329                         'trust' => $trust_source
330                 ];
331         }
332
333         /**
334          * Set the trust for all untrusted entries.
335          * This is a test function that is used solely for development.
336          *
337          * @return void
338          */
339         public static function reprepareAll()
340         {
341                 $entries = DBA::select('inbox-entry', ['id'], ["NOT `trust` AND `wid` IS NULL"], ['order' => ['id' => true]]);
342                 while ($entry = DBA::fetch($entries)) {
343                         $data = self::reprepareActivityById($entry['id'], false);
344                         if ($data['trust']) {
345                                 DBA::update('inbox-entry', ['trust' => true], ['id' => $entry['id']]);
346                         }
347                 }
348                 DBA::close($entries);
349         }
350 }