]> git.mxchange.org Git - quix0rs-gnu-social.git/blob - lib/dbqueuemanager.php
Merge branch '0.8.x' into queuemanager
[quix0rs-gnu-social.git] / lib / dbqueuemanager.php
1 <?php
2 /**
3  * Laconica, the distributed open-source microblogging tool
4  *
5  * Simple-minded queue manager for storing items in the database
6  *
7  * PHP version 5
8  *
9  * LICENCE: This program is free software: you can redistribute it and/or modify
10  * it under the terms of the GNU Affero General Public License as published by
11  * the Free Software Foundation, either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU Affero General Public License for more details.
18  *
19  * You should have received a copy of the GNU Affero General Public License
20  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
21  *
22  * @category  QueueManager
23  * @package   Laconica
24  * @author    Evan Prodromou <evan@controlyourself.ca>
25  * @author    Sarven Capadisli <csarven@controlyourself.ca>
26  * @copyright 2009 Control Yourself, Inc.
27  * @license   http://www.fsf.org/licensing/licenses/agpl-3.0.html GNU Affero General Public License version 3.0
28  * @link      http://laconi.ca/
29  */
30
31 class DBQueueManager extends QueueManager
32 {
33     var $qis = array();
34
35     function enqueue($object, $queue)
36     {
37         $notice = $object;
38
39         $qi = new Queue_item();
40
41         $qi->notice_id = $notice->id;
42         $qi->transport = $queue;
43         $qi->created   = $notice->created;
44         $result        = $qi->insert();
45
46         if (!$result) {
47             common_log_db_error($qi, 'INSERT', __FILE__);
48             throw new ServerException('DB error inserting queue item');
49         }
50
51         return true;
52     }
53
54     function nextItem($queue, $timeout=null)
55     {
56         $start = time();
57         $result = null;
58
59         do {
60             $qi = Queue_item::top($queue);
61             if (!empty($qi)) {
62                 $notice = Notice::staticGet('id', $qi->notice_id);
63                 if (!empty($notice)) {
64                     $result = $notice;
65                 } else {
66                     $this->_log(LOG_INFO, 'dequeued non-existent notice ' . $notice->id);
67                     $qi->delete();
68                     $qi->free();
69                     $qi = null;
70                 }
71             }
72         } while (empty($result) && (is_null($timeout) || (time() - $start) < $timeout));
73
74         return $result;
75     }
76
77     function done($object, $queue)
78     {
79         // XXX: right now, we only handle notices
80
81         $notice = $object;
82
83         $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
84                                         'transport' => $queue));
85
86         if (empty($qi)) {
87             $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
88         } else {
89             if (empty($qi->claimed)) {
90                 $this->_log(LOG_WARNING, 'Reluctantly releasing unclaimed queue item '.
91                            'for '.$notice->id.', queue '.$queue);
92             }
93             $qi->delete();
94             $qi->free();
95             $qi = null;
96         }
97
98         $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
99
100         $notice->free();
101         $notice = null;
102     }
103
104     function fail($object, $queue)
105     {
106         // XXX: right now, we only handle notices
107
108         $notice = $object;
109
110         $qi = Queue_item::pkeyGet(array('notice_id' => $notice->id,
111                                         'transport' => $queue));
112
113         if (empty($qi)) {
114             $this->_log(LOG_INFO, 'Cannot find queue item for notice '.$notice->id.', queue '.$queue);
115         } else {
116             if (empty($qi->claimed)) {
117                 $this->_log(LOG_WARNING, 'Ignoring failure for unclaimed queue item '.
118                            'for '.$notice->id.', queue '.$queue);
119             } else {
120                 $orig = clone($qi);
121                 $qi->claimed = null;
122                 $qi->update($orig);
123                 $qi = null;
124             }
125         }
126
127         $this->_log(LOG_INFO, 'done with notice ID = ' . $notice->id);
128
129         $notice->free();
130         $notice = null;
131     }
132
133     function _log($level, $msg)
134     {
135         common_log($level, 'DBQueueManager: '.$msg);
136     }
137 }