*/
class Queue
{
+ /**
+ * Add activity to the queue
+ *
+ * @param array $activity
+ * @param string $type
+ * @param integer $uid
+ * @param string $http_signer
+ * @param boolean $push
+ * @return array
+ */
public static function add(array $activity, string $type, int $uid, string $http_signer, bool $push): array
{
$fields = [
return $activity;
}
+ /**
+ * Remove activity from the queue
+ *
+ * @param array $activity
+ * @return void
+ */
public static function remove(array $activity = [])
{
if (empty($activity['entry-id'])) {
return;
}
DBA::delete('inbox-entry', ['id' => $activity['entry-id']]);
- //echo "Delete ".$activity['entry-id']."\n";
-
}
+ /**
+ * Process the activity with the given id
+ *
+ * @param integer $id
+ * @return void
+ */
public static function process(int $id)
{
$entry = DBA::selectFirst('inbox-entry', [], ['id' => $id]);
if (empty($entry)) {
return;
}
-
+
Logger::debug('Processing queue entry', ['id' => $entry['id'], 'type' => $entry['type'], 'object-type' => $entry['object-type'], 'uri' => $entry['object-id'], 'in-reply-to' => $entry['in-reply-to-id']]);
$activity = json_decode($entry['activity'], true);
}
}
+ /**
+ * Process all activities
+ *
+ * @return void
+ */
public static function processAll()
{
$entries = DBA::select('inbox-entry', ['id', 'type', 'object-type'], [], ['order' => ['id' => true]]);
while ($entry = DBA::fetch($entries)) {
- echo $entry['id'] . "\t" . $entry['type'] . "\t" . $entry['object-type'] . "\n";
self::process($entry['id']);
}
+
+ DBA::delete('inbox-entry', ["`received` < ?", DateTimeFormat::utc('now - 1 days')]);
}
+ /**
+ * Process all activities that are children of a given post url
+ *
+ * @param string $uri
+ * @return void
+ */
public static function processReplyByUri(string $uri)
{
$entries = DBA::select('inbox-entry', ['id'], ['in-reply-to-id' => $uri], ['order' => ['id' => true]]);
use Friendica\Database\DBA;
use Friendica\DI;
use Friendica\Model\Tag;
+use Friendica\Protocol\ActivityPub\Queue;
use Friendica\Protocol\Relay;
class Cron
Tag::setLocalTrendingHashtags(24, 20);
Tag::setGlobalTrendingHashtags(24, 20);
+ // Process pending posts in the queue
+ Queue::processAll();
+
// Search for new contacts in the directory
if (DI::config()->get('system', 'synchronize_directory')) {
Worker::add(PRIORITY_LOW, 'PullDirectory');