1 |
85ad3d82
|
Assos Assos
|
<?php
|
2 |
|
|
|
3 |
|
|
/**
|
4 |
|
|
* @file
|
5 |
|
|
* Queue functionality.
|
6 |
|
|
*/
|
7 |
|
|
|
8 |
|
|
/**
|
9 |
|
|
* @defgroup queue Queue operations
|
10 |
|
|
* @{
|
11 |
|
|
* Queue items to allow later processing.
|
12 |
|
|
*
|
13 |
|
|
* The queue system allows placing items in a queue and processing them later.
|
14 |
|
|
* The system tries to ensure that only one consumer can process an item.
|
15 |
|
|
*
|
16 |
|
|
* Before a queue can be used it needs to be created by
|
17 |
|
|
* DrupalQueueInterface::createQueue().
|
18 |
|
|
*
|
19 |
|
|
* Items can be added to the queue by passing an arbitrary data object to
|
20 |
|
|
* DrupalQueueInterface::createItem().
|
21 |
|
|
*
|
22 |
|
|
* To process an item, call DrupalQueueInterface::claimItem() and specify how
|
23 |
|
|
* long you want to have a lease for working on that item. When finished
|
24 |
|
|
* processing, the item needs to be deleted by calling
|
25 |
|
|
* DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
|
26 |
|
|
* made available again by the DrupalQueueInterface implementation once the
|
27 |
|
|
* lease expires. Another consumer will then be able to receive it when calling
|
28 |
|
|
* DrupalQueueInterface::claimItem(). Due to this, the processing code should
|
29 |
|
|
* be aware that an item might be handed over for processing more than once.
|
30 |
|
|
*
|
31 |
|
|
* The $item object used by the DrupalQueueInterface can contain arbitrary
|
32 |
|
|
* metadata depending on the implementation. Systems using the interface should
|
33 |
|
|
* only rely on the data property which will contain the information passed to
|
34 |
|
|
* DrupalQueueInterface::createItem(). The full queue item returned by
|
35 |
|
|
* DrupalQueueInterface::claimItem() needs to be passed to
|
36 |
|
|
* DrupalQueueInterface::deleteItem() once processing is completed.
|
37 |
|
|
*
|
38 |
|
|
* There are two kinds of queue backends available: reliable, which preserves
|
39 |
|
|
* the order of messages and guarantees that every item will be executed at
|
40 |
|
|
* least once. The non-reliable kind only does a best effort to preserve order
|
41 |
|
|
* in messages and to execute them at least once but there is a small chance
|
42 |
|
|
* that some items get lost. For example, some distributed back-ends like
|
43 |
|
|
* Amazon SQS will be managing jobs for a large set of producers and consumers
|
44 |
|
|
* where a strict FIFO ordering will likely not be preserved. Another example
|
45 |
|
|
* would be an in-memory queue backend which might lose items if it crashes.
|
46 |
|
|
* However, such a backend would be able to deal with significantly more writes
|
47 |
|
|
* than a reliable queue and for many tasks this is more important. See
|
48 |
|
|
* aggregator_cron() for an example of how to effectively utilize a
|
49 |
|
|
* non-reliable queue. Another example is doing Twitter statistics -- the small
|
50 |
|
|
* possibility of losing a few items is insignificant next to power of the
|
51 |
|
|
* queue being able to keep up with writes. As described in the processing
|
52 |
|
|
* section, regardless of the queue being reliable or not, the processing code
|
53 |
|
|
* should be aware that an item might be handed over for processing more than
|
54 |
|
|
* once (because the processing code might time out before it finishes).
|
55 |
|
|
*/
|
56 |
|
|
|
57 |
|
|
/**
|
58 |
|
|
* Factory class for interacting with queues.
|
59 |
|
|
*/
|
60 |
|
|
class DrupalQueue {
|
61 |
|
|
/**
|
62 |
|
|
* Returns the queue object for a given name.
|
63 |
|
|
*
|
64 |
|
|
* The following variables can be set by variable_set or $conf overrides:
|
65 |
|
|
* - queue_class_$name: the class to be used for the queue $name.
|
66 |
|
|
* - queue_default_class: the class to use when queue_class_$name is not
|
67 |
|
|
* defined. Defaults to SystemQueue, a reliable backend using SQL.
|
68 |
|
|
* - queue_default_reliable_class: the class to use when queue_class_$name is
|
69 |
|
|
* not defined and the queue_default_class is not reliable. Defaults to
|
70 |
|
|
* SystemQueue.
|
71 |
|
|
*
|
72 |
|
|
* @param $name
|
73 |
|
|
* Arbitrary string. The name of the queue to work with.
|
74 |
|
|
* @param $reliable
|
75 |
|
|
* TRUE if the ordering of items and guaranteeing every item executes at
|
76 |
|
|
* least once is important, FALSE if scalability is the main concern.
|
77 |
|
|
*
|
78 |
|
|
* @return
|
79 |
|
|
* The queue object for a given name.
|
80 |
|
|
*/
|
81 |
|
|
public static function get($name, $reliable = FALSE) {
|
82 |
|
|
static $queues;
|
83 |
|
|
if (!isset($queues[$name])) {
|
84 |
|
|
$class = variable_get('queue_class_' . $name, NULL);
|
85 |
|
|
if (!$class) {
|
86 |
|
|
$class = variable_get('queue_default_class', 'SystemQueue');
|
87 |
|
|
}
|
88 |
|
|
$object = new $class($name);
|
89 |
|
|
if ($reliable && !$object instanceof DrupalReliableQueueInterface) {
|
90 |
|
|
$class = variable_get('queue_default_reliable_class', 'SystemQueue');
|
91 |
|
|
$object = new $class($name);
|
92 |
|
|
}
|
93 |
|
|
$queues[$name] = $object;
|
94 |
|
|
}
|
95 |
|
|
return $queues[$name];
|
96 |
|
|
}
|
97 |
|
|
}
|
98 |
|
|
|
99 |
|
|
interface DrupalQueueInterface {
|
100 |
|
|
|
101 |
|
|
/**
|
102 |
|
|
* Add a queue item and store it directly to the queue.
|
103 |
|
|
*
|
104 |
|
|
* @param $data
|
105 |
|
|
* Arbitrary data to be associated with the new task in the queue.
|
106 |
|
|
* @return
|
107 |
|
|
* TRUE if the item was successfully created and was (best effort) added
|
108 |
|
|
* to the queue, otherwise FALSE. We don't guarantee the item was
|
109 |
|
|
* committed to disk etc, but as far as we know, the item is now in the
|
110 |
|
|
* queue.
|
111 |
|
|
*/
|
112 |
|
|
public function createItem($data);
|
113 |
|
|
|
114 |
|
|
/**
|
115 |
|
|
* Retrieve the number of items in the queue.
|
116 |
|
|
*
|
117 |
|
|
* This is intended to provide a "best guess" count of the number of items in
|
118 |
|
|
* the queue. Depending on the implementation and the setup, the accuracy of
|
119 |
|
|
* the results of this function may vary.
|
120 |
|
|
*
|
121 |
|
|
* e.g. On a busy system with a large number of consumers and items, the
|
122 |
|
|
* result might only be valid for a fraction of a second and not provide an
|
123 |
|
|
* accurate representation.
|
124 |
|
|
*
|
125 |
|
|
* @return
|
126 |
|
|
* An integer estimate of the number of items in the queue.
|
127 |
|
|
*/
|
128 |
|
|
public function numberOfItems();
|
129 |
|
|
|
130 |
|
|
/**
|
131 |
|
|
* Claim an item in the queue for processing.
|
132 |
|
|
*
|
133 |
|
|
* @param $lease_time
|
134 |
|
|
* How long the processing is expected to take in seconds, defaults to an
|
135 |
|
|
* hour. After this lease expires, the item will be reset and another
|
136 |
|
|
* consumer can claim the item. For idempotent tasks (which can be run
|
137 |
|
|
* multiple times without side effects), shorter lease times would result
|
138 |
|
|
* in lower latency in case a consumer fails. For tasks that should not be
|
139 |
|
|
* run more than once (non-idempotent), a larger lease time will make it
|
140 |
|
|
* more rare for a given task to run multiple times in cases of failure,
|
141 |
|
|
* at the cost of higher latency.
|
142 |
|
|
* @return
|
143 |
|
|
* On success we return an item object. If the queue is unable to claim an
|
144 |
|
|
* item it returns false. This implies a best effort to retrieve an item
|
145 |
|
|
* and either the queue is empty or there is some other non-recoverable
|
146 |
|
|
* problem.
|
147 |
|
|
*/
|
148 |
|
|
public function claimItem($lease_time = 3600);
|
149 |
|
|
|
150 |
|
|
/**
|
151 |
|
|
* Delete a finished item from the queue.
|
152 |
|
|
*
|
153 |
|
|
* @param $item
|
154 |
|
|
* The item returned by DrupalQueueInterface::claimItem().
|
155 |
|
|
*/
|
156 |
|
|
public function deleteItem($item);
|
157 |
|
|
|
158 |
|
|
/**
|
159 |
|
|
* Release an item that the worker could not process, so another
|
160 |
|
|
* worker can come in and process it before the timeout expires.
|
161 |
|
|
*
|
162 |
|
|
* @param $item
|
163 |
|
|
* @return boolean
|
164 |
|
|
*/
|
165 |
|
|
public function releaseItem($item);
|
166 |
|
|
|
167 |
|
|
/**
|
168 |
|
|
* Create a queue.
|
169 |
|
|
*
|
170 |
|
|
* Called during installation and should be used to perform any necessary
|
171 |
|
|
* initialization operations. This should not be confused with the
|
172 |
|
|
* constructor for these objects, which is called every time an object is
|
173 |
|
|
* instantiated to operate on a queue. This operation is only needed the
|
174 |
|
|
* first time a given queue is going to be initialized (for example, to make
|
175 |
|
|
* a new database table or directory to hold tasks for the queue -- it
|
176 |
|
|
* depends on the queue implementation if this is necessary at all).
|
177 |
|
|
*/
|
178 |
|
|
public function createQueue();
|
179 |
|
|
|
180 |
|
|
/**
|
181 |
|
|
* Delete a queue and every item in the queue.
|
182 |
|
|
*/
|
183 |
|
|
public function deleteQueue();
|
184 |
|
|
}
|
185 |
|
|
|
186 |
|
|
/**
|
187 |
|
|
* Reliable queue interface.
|
188 |
|
|
*
|
189 |
|
|
* Classes implementing this interface preserve the order of messages and
|
190 |
|
|
* guarantee that every item will be executed at least once.
|
191 |
|
|
*/
|
192 |
|
|
interface DrupalReliableQueueInterface extends DrupalQueueInterface {
|
193 |
|
|
}
|
194 |
|
|
|
195 |
|
|
/**
|
196 |
|
|
* Default queue implementation.
|
197 |
|
|
*/
|
198 |
|
|
class SystemQueue implements DrupalReliableQueueInterface {
|
199 |
|
|
/**
|
200 |
|
|
* The name of the queue this instance is working with.
|
201 |
|
|
*
|
202 |
|
|
* @var string
|
203 |
|
|
*/
|
204 |
|
|
protected $name;
|
205 |
|
|
|
206 |
|
|
public function __construct($name) {
|
207 |
|
|
$this->name = $name;
|
208 |
|
|
}
|
209 |
|
|
|
210 |
|
|
public function createItem($data) {
|
211 |
|
|
// During a Drupal 6.x to 7.x update, drupal_get_schema() does not contain
|
212 |
|
|
// the queue table yet, so we cannot rely on drupal_write_record().
|
213 |
|
|
$query = db_insert('queue')
|
214 |
|
|
->fields(array(
|
215 |
|
|
'name' => $this->name,
|
216 |
|
|
'data' => serialize($data),
|
217 |
|
|
// We cannot rely on REQUEST_TIME because many items might be created
|
218 |
|
|
// by a single request which takes longer than 1 second.
|
219 |
|
|
'created' => time(),
|
220 |
|
|
));
|
221 |
|
|
return (bool) $query->execute();
|
222 |
|
|
}
|
223 |
|
|
|
224 |
|
|
public function numberOfItems() {
|
225 |
|
|
return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
|
226 |
|
|
}
|
227 |
|
|
|
228 |
|
|
public function claimItem($lease_time = 30) {
|
229 |
|
|
// Claim an item by updating its expire fields. If claim is not successful
|
230 |
|
|
// another thread may have claimed the item in the meantime. Therefore loop
|
231 |
|
|
// until an item is successfully claimed or we are reasonably sure there
|
232 |
|
|
// are no unclaimed items left.
|
233 |
|
|
while (TRUE) {
|
234 |
|
|
$item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
|
235 |
|
|
if ($item) {
|
236 |
|
|
// Try to update the item. Only one thread can succeed in UPDATEing the
|
237 |
|
|
// same row. We cannot rely on REQUEST_TIME because items might be
|
238 |
|
|
// claimed by a single consumer which runs longer than 1 second. If we
|
239 |
|
|
// continue to use REQUEST_TIME instead of the current time(), we steal
|
240 |
|
|
// time from the lease, and will tend to reset items before the lease
|
241 |
|
|
// should really expire.
|
242 |
|
|
$update = db_update('queue')
|
243 |
|
|
->fields(array(
|
244 |
|
|
'expire' => time() + $lease_time,
|
245 |
|
|
))
|
246 |
|
|
->condition('item_id', $item->item_id)
|
247 |
|
|
->condition('expire', 0);
|
248 |
|
|
// If there are affected rows, this update succeeded.
|
249 |
|
|
if ($update->execute()) {
|
250 |
|
|
$item->data = unserialize($item->data);
|
251 |
|
|
return $item;
|
252 |
|
|
}
|
253 |
|
|
}
|
254 |
|
|
else {
|
255 |
|
|
// No items currently available to claim.
|
256 |
|
|
return FALSE;
|
257 |
|
|
}
|
258 |
|
|
}
|
259 |
|
|
}
|
260 |
|
|
|
261 |
|
|
public function releaseItem($item) {
|
262 |
|
|
$update = db_update('queue')
|
263 |
|
|
->fields(array(
|
264 |
|
|
'expire' => 0,
|
265 |
|
|
))
|
266 |
|
|
->condition('item_id', $item->item_id);
|
267 |
|
|
return $update->execute();
|
268 |
|
|
}
|
269 |
|
|
|
270 |
|
|
public function deleteItem($item) {
|
271 |
|
|
db_delete('queue')
|
272 |
|
|
->condition('item_id', $item->item_id)
|
273 |
|
|
->execute();
|
274 |
|
|
}
|
275 |
|
|
|
276 |
|
|
public function createQueue() {
|
277 |
|
|
// All tasks are stored in a single database table (which is created when
|
278 |
|
|
// Drupal is first installed) so there is nothing we need to do to create
|
279 |
|
|
// a new queue.
|
280 |
|
|
}
|
281 |
|
|
|
282 |
|
|
public function deleteQueue() {
|
283 |
|
|
db_delete('queue')
|
284 |
|
|
->condition('name', $this->name)
|
285 |
|
|
->execute();
|
286 |
|
|
}
|
287 |
|
|
}
|
288 |
|
|
|
289 |
|
|
/**
|
290 |
|
|
* Static queue implementation.
|
291 |
|
|
*
|
292 |
|
|
* This allows "undelayed" variants of processes relying on the Queue
|
293 |
|
|
* interface. The queue data resides in memory. It should only be used for
|
294 |
|
|
* items that will be queued and dequeued within a given page request.
|
295 |
|
|
*/
|
296 |
|
|
class MemoryQueue implements DrupalQueueInterface {
|
297 |
|
|
/**
|
298 |
|
|
* The queue data.
|
299 |
|
|
*
|
300 |
|
|
* @var array
|
301 |
|
|
*/
|
302 |
|
|
protected $queue;
|
303 |
|
|
|
304 |
|
|
/**
|
305 |
|
|
* Counter for item ids.
|
306 |
|
|
*
|
307 |
|
|
* @var int
|
308 |
|
|
*/
|
309 |
|
|
protected $id_sequence;
|
310 |
|
|
|
311 |
|
|
/**
|
312 |
|
|
* Start working with a queue.
|
313 |
|
|
*
|
314 |
|
|
* @param $name
|
315 |
|
|
* Arbitrary string. The name of the queue to work with.
|
316 |
|
|
*/
|
317 |
|
|
public function __construct($name) {
|
318 |
|
|
$this->queue = array();
|
319 |
|
|
$this->id_sequence = 0;
|
320 |
|
|
}
|
321 |
|
|
|
322 |
|
|
public function createItem($data) {
|
323 |
|
|
$item = new stdClass();
|
324 |
|
|
$item->item_id = $this->id_sequence++;
|
325 |
|
|
$item->data = $data;
|
326 |
|
|
$item->created = time();
|
327 |
|
|
$item->expire = 0;
|
328 |
|
|
$this->queue[$item->item_id] = $item;
|
329 |
|
|
}
|
330 |
|
|
|
331 |
|
|
public function numberOfItems() {
|
332 |
|
|
return count($this->queue);
|
333 |
|
|
}
|
334 |
|
|
|
335 |
|
|
public function claimItem($lease_time = 30) {
|
336 |
|
|
foreach ($this->queue as $key => $item) {
|
337 |
|
|
if ($item->expire == 0) {
|
338 |
|
|
$item->expire = time() + $lease_time;
|
339 |
|
|
$this->queue[$key] = $item;
|
340 |
|
|
return $item;
|
341 |
|
|
}
|
342 |
|
|
}
|
343 |
|
|
return FALSE;
|
344 |
|
|
}
|
345 |
|
|
|
346 |
|
|
public function deleteItem($item) {
|
347 |
|
|
unset($this->queue[$item->item_id]);
|
348 |
|
|
}
|
349 |
|
|
|
350 |
|
|
public function releaseItem($item) {
|
351 |
|
|
if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
|
352 |
|
|
$this->queue[$item->item_id]->expire = 0;
|
353 |
|
|
return TRUE;
|
354 |
|
|
}
|
355 |
|
|
return FALSE;
|
356 |
|
|
}
|
357 |
|
|
|
358 |
|
|
public function createQueue() {
|
359 |
|
|
// Nothing needed here.
|
360 |
|
|
}
|
361 |
|
|
|
362 |
|
|
public function deleteQueue() {
|
363 |
|
|
$this->queue = array();
|
364 |
|
|
$this->id_sequence = 0;
|
365 |
|
|
}
|
366 |
|
|
}
|
367 |
|
|
|
368 |
|
|
/**
|
369 |
|
|
* @} End of "defgroup queue".
|
370 |
|
|
*/ |