1
|
<?php
|
2
|
|
3
|
/**
|
4
|
* @file
|
5
|
* Queue functionality.
|
6
|
*/
|
7
|
|
8
|
/**
|
9
|
* @defgroup queue Queue operations
|
10
|
* @{
|
11
|
* Queue items to allow later processing.
|
12
|
*
|
13
|
* The queue system allows placing items in a queue and processing them later.
|
14
|
* The system tries to ensure that only one consumer can process an item.
|
15
|
*
|
16
|
* Before a queue can be used it needs to be created by
|
17
|
* DrupalQueueInterface::createQueue().
|
18
|
*
|
19
|
* Items can be added to the queue by passing an arbitrary data object to
|
20
|
* DrupalQueueInterface::createItem().
|
21
|
*
|
22
|
* To process an item, call DrupalQueueInterface::claimItem() and specify how
|
23
|
* long you want to have a lease for working on that item. When finished
|
24
|
* processing, the item needs to be deleted by calling
|
25
|
* DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
|
26
|
* made available again by the DrupalQueueInterface implementation once the
|
27
|
* lease expires. Another consumer will then be able to receive it when calling
|
28
|
* DrupalQueueInterface::claimItem(). Due to this, the processing code should
|
29
|
* be aware that an item might be handed over for processing more than once.
|
30
|
*
|
31
|
* The $item object used by the DrupalQueueInterface can contain arbitrary
|
32
|
* metadata depending on the implementation. Systems using the interface should
|
33
|
* only rely on the data property which will contain the information passed to
|
34
|
* DrupalQueueInterface::createItem(). The full queue item returned by
|
35
|
* DrupalQueueInterface::claimItem() needs to be passed to
|
36
|
* DrupalQueueInterface::deleteItem() once processing is completed.
|
37
|
*
|
38
|
* There are two kinds of queue backends available: reliable, which preserves
|
39
|
* the order of messages and guarantees that every item will be executed at
|
40
|
* least once. The non-reliable kind only does a best effort to preserve order
|
41
|
* in messages and to execute them at least once but there is a small chance
|
42
|
* that some items get lost. For example, some distributed back-ends like
|
43
|
* Amazon SQS will be managing jobs for a large set of producers and consumers
|
44
|
* where a strict FIFO ordering will likely not be preserved. Another example
|
45
|
* would be an in-memory queue backend which might lose items if it crashes.
|
46
|
* However, such a backend would be able to deal with significantly more writes
|
47
|
* than a reliable queue and for many tasks this is more important. See
|
48
|
* aggregator_cron() for an example of how to effectively utilize a
|
49
|
* non-reliable queue. Another example is doing Twitter statistics -- the small
|
50
|
* possibility of losing a few items is insignificant next to power of the
|
51
|
* queue being able to keep up with writes. As described in the processing
|
52
|
* section, regardless of the queue being reliable or not, the processing code
|
53
|
* should be aware that an item might be handed over for processing more than
|
54
|
* once (because the processing code might time out before it finishes).
|
55
|
*/
|
56
|
|
57
|
/**
|
58
|
* Factory class for interacting with queues.
|
59
|
*/
|
60
|
class DrupalQueue {
|
61
|
/**
|
62
|
* Returns the queue object for a given name.
|
63
|
*
|
64
|
* The following variables can be set by variable_set or $conf overrides:
|
65
|
* - queue_class_$name: the class to be used for the queue $name.
|
66
|
* - queue_default_class: the class to use when queue_class_$name is not
|
67
|
* defined. Defaults to SystemQueue, a reliable backend using SQL.
|
68
|
* - queue_default_reliable_class: the class to use when queue_class_$name is
|
69
|
* not defined and the queue_default_class is not reliable. Defaults to
|
70
|
* SystemQueue.
|
71
|
*
|
72
|
* @param $name
|
73
|
* Arbitrary string. The name of the queue to work with.
|
74
|
* @param $reliable
|
75
|
* TRUE if the ordering of items and guaranteeing every item executes at
|
76
|
* least once is important, FALSE if scalability is the main concern.
|
77
|
*
|
78
|
* @return
|
79
|
* The queue object for a given name.
|
80
|
*/
|
81
|
public static function get($name, $reliable = FALSE) {
|
82
|
static $queues;
|
83
|
if (!isset($queues[$name])) {
|
84
|
$class = variable_get('queue_class_' . $name, NULL);
|
85
|
if (!$class) {
|
86
|
$class = variable_get('queue_default_class', 'SystemQueue');
|
87
|
}
|
88
|
$object = new $class($name);
|
89
|
if ($reliable && !$object instanceof DrupalReliableQueueInterface) {
|
90
|
$class = variable_get('queue_default_reliable_class', 'SystemQueue');
|
91
|
$object = new $class($name);
|
92
|
}
|
93
|
$queues[$name] = $object;
|
94
|
}
|
95
|
return $queues[$name];
|
96
|
}
|
97
|
}
|
98
|
|
99
|
interface DrupalQueueInterface {
|
100
|
|
101
|
/**
|
102
|
* Add a queue item and store it directly to the queue.
|
103
|
*
|
104
|
* @param $data
|
105
|
* Arbitrary data to be associated with the new task in the queue.
|
106
|
* @return
|
107
|
* TRUE if the item was successfully created and was (best effort) added
|
108
|
* to the queue, otherwise FALSE. We don't guarantee the item was
|
109
|
* committed to disk etc, but as far as we know, the item is now in the
|
110
|
* queue.
|
111
|
*/
|
112
|
public function createItem($data);
|
113
|
|
114
|
/**
|
115
|
* Retrieve the number of items in the queue.
|
116
|
*
|
117
|
* This is intended to provide a "best guess" count of the number of items in
|
118
|
* the queue. Depending on the implementation and the setup, the accuracy of
|
119
|
* the results of this function may vary.
|
120
|
*
|
121
|
* e.g. On a busy system with a large number of consumers and items, the
|
122
|
* result might only be valid for a fraction of a second and not provide an
|
123
|
* accurate representation.
|
124
|
*
|
125
|
* @return
|
126
|
* An integer estimate of the number of items in the queue.
|
127
|
*/
|
128
|
public function numberOfItems();
|
129
|
|
130
|
/**
|
131
|
* Claim an item in the queue for processing.
|
132
|
*
|
133
|
* @param $lease_time
|
134
|
* How long the processing is expected to take in seconds, defaults to an
|
135
|
* hour. After this lease expires, the item will be reset and another
|
136
|
* consumer can claim the item. For idempotent tasks (which can be run
|
137
|
* multiple times without side effects), shorter lease times would result
|
138
|
* in lower latency in case a consumer fails. For tasks that should not be
|
139
|
* run more than once (non-idempotent), a larger lease time will make it
|
140
|
* more rare for a given task to run multiple times in cases of failure,
|
141
|
* at the cost of higher latency.
|
142
|
* @return
|
143
|
* On success we return an item object. If the queue is unable to claim an
|
144
|
* item it returns false. This implies a best effort to retrieve an item
|
145
|
* and either the queue is empty or there is some other non-recoverable
|
146
|
* problem.
|
147
|
*/
|
148
|
public function claimItem($lease_time = 3600);
|
149
|
|
150
|
/**
|
151
|
* Delete a finished item from the queue.
|
152
|
*
|
153
|
* @param $item
|
154
|
* The item returned by DrupalQueueInterface::claimItem().
|
155
|
*/
|
156
|
public function deleteItem($item);
|
157
|
|
158
|
/**
|
159
|
* Release an item that the worker could not process, so another
|
160
|
* worker can come in and process it before the timeout expires.
|
161
|
*
|
162
|
* @param $item
|
163
|
* @return boolean
|
164
|
*/
|
165
|
public function releaseItem($item);
|
166
|
|
167
|
/**
|
168
|
* Create a queue.
|
169
|
*
|
170
|
* Called during installation and should be used to perform any necessary
|
171
|
* initialization operations. This should not be confused with the
|
172
|
* constructor for these objects, which is called every time an object is
|
173
|
* instantiated to operate on a queue. This operation is only needed the
|
174
|
* first time a given queue is going to be initialized (for example, to make
|
175
|
* a new database table or directory to hold tasks for the queue -- it
|
176
|
* depends on the queue implementation if this is necessary at all).
|
177
|
*/
|
178
|
public function createQueue();
|
179
|
|
180
|
/**
|
181
|
* Delete a queue and every item in the queue.
|
182
|
*/
|
183
|
public function deleteQueue();
|
184
|
}
|
185
|
|
186
|
/**
|
187
|
* Reliable queue interface.
|
188
|
*
|
189
|
* Classes implementing this interface preserve the order of messages and
|
190
|
* guarantee that every item will be executed at least once.
|
191
|
*/
|
192
|
interface DrupalReliableQueueInterface extends DrupalQueueInterface {
|
193
|
}
|
194
|
|
195
|
/**
|
196
|
* Default queue implementation.
|
197
|
*/
|
198
|
class SystemQueue implements DrupalReliableQueueInterface {
|
199
|
/**
|
200
|
* The name of the queue this instance is working with.
|
201
|
*
|
202
|
* @var string
|
203
|
*/
|
204
|
protected $name;
|
205
|
|
206
|
public function __construct($name) {
|
207
|
$this->name = $name;
|
208
|
}
|
209
|
|
210
|
public function createItem($data) {
|
211
|
// During a Drupal 6.x to 7.x update, drupal_get_schema() does not contain
|
212
|
// the queue table yet, so we cannot rely on drupal_write_record().
|
213
|
$query = db_insert('queue')
|
214
|
->fields(array(
|
215
|
'name' => $this->name,
|
216
|
'data' => serialize($data),
|
217
|
// We cannot rely on REQUEST_TIME because many items might be created
|
218
|
// by a single request which takes longer than 1 second.
|
219
|
'created' => time(),
|
220
|
));
|
221
|
return (bool) $query->execute();
|
222
|
}
|
223
|
|
224
|
public function numberOfItems() {
|
225
|
return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
|
226
|
}
|
227
|
|
228
|
public function claimItem($lease_time = 30) {
|
229
|
// Claim an item by updating its expire fields. If claim is not successful
|
230
|
// another thread may have claimed the item in the meantime. Therefore loop
|
231
|
// until an item is successfully claimed or we are reasonably sure there
|
232
|
// are no unclaimed items left.
|
233
|
while (TRUE) {
|
234
|
$item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created ASC', 0, 1, array(':name' => $this->name))->fetchObject();
|
235
|
if ($item) {
|
236
|
// Try to update the item. Only one thread can succeed in UPDATEing the
|
237
|
// same row. We cannot rely on REQUEST_TIME because items might be
|
238
|
// claimed by a single consumer which runs longer than 1 second. If we
|
239
|
// continue to use REQUEST_TIME instead of the current time(), we steal
|
240
|
// time from the lease, and will tend to reset items before the lease
|
241
|
// should really expire.
|
242
|
$update = db_update('queue')
|
243
|
->fields(array(
|
244
|
'expire' => time() + $lease_time,
|
245
|
))
|
246
|
->condition('item_id', $item->item_id)
|
247
|
->condition('expire', 0);
|
248
|
// If there are affected rows, this update succeeded.
|
249
|
if ($update->execute()) {
|
250
|
$item->data = unserialize($item->data);
|
251
|
return $item;
|
252
|
}
|
253
|
}
|
254
|
else {
|
255
|
// No items currently available to claim.
|
256
|
return FALSE;
|
257
|
}
|
258
|
}
|
259
|
}
|
260
|
|
261
|
public function releaseItem($item) {
|
262
|
$update = db_update('queue')
|
263
|
->fields(array(
|
264
|
'expire' => 0,
|
265
|
))
|
266
|
->condition('item_id', $item->item_id);
|
267
|
return $update->execute();
|
268
|
}
|
269
|
|
270
|
public function deleteItem($item) {
|
271
|
db_delete('queue')
|
272
|
->condition('item_id', $item->item_id)
|
273
|
->execute();
|
274
|
}
|
275
|
|
276
|
public function createQueue() {
|
277
|
// All tasks are stored in a single database table (which is created when
|
278
|
// Drupal is first installed) so there is nothing we need to do to create
|
279
|
// a new queue.
|
280
|
}
|
281
|
|
282
|
public function deleteQueue() {
|
283
|
db_delete('queue')
|
284
|
->condition('name', $this->name)
|
285
|
->execute();
|
286
|
}
|
287
|
}
|
288
|
|
289
|
/**
|
290
|
* Static queue implementation.
|
291
|
*
|
292
|
* This allows "undelayed" variants of processes relying on the Queue
|
293
|
* interface. The queue data resides in memory. It should only be used for
|
294
|
* items that will be queued and dequeued within a given page request.
|
295
|
*/
|
296
|
class MemoryQueue implements DrupalQueueInterface {
|
297
|
/**
|
298
|
* The queue data.
|
299
|
*
|
300
|
* @var array
|
301
|
*/
|
302
|
protected $queue;
|
303
|
|
304
|
/**
|
305
|
* Counter for item ids.
|
306
|
*
|
307
|
* @var int
|
308
|
*/
|
309
|
protected $id_sequence;
|
310
|
|
311
|
/**
|
312
|
* Start working with a queue.
|
313
|
*
|
314
|
* @param $name
|
315
|
* Arbitrary string. The name of the queue to work with.
|
316
|
*/
|
317
|
public function __construct($name) {
|
318
|
$this->queue = array();
|
319
|
$this->id_sequence = 0;
|
320
|
}
|
321
|
|
322
|
public function createItem($data) {
|
323
|
$item = new stdClass();
|
324
|
$item->item_id = $this->id_sequence++;
|
325
|
$item->data = $data;
|
326
|
$item->created = time();
|
327
|
$item->expire = 0;
|
328
|
$this->queue[$item->item_id] = $item;
|
329
|
}
|
330
|
|
331
|
public function numberOfItems() {
|
332
|
return count($this->queue);
|
333
|
}
|
334
|
|
335
|
public function claimItem($lease_time = 30) {
|
336
|
foreach ($this->queue as $key => $item) {
|
337
|
if ($item->expire == 0) {
|
338
|
$item->expire = time() + $lease_time;
|
339
|
$this->queue[$key] = $item;
|
340
|
return $item;
|
341
|
}
|
342
|
}
|
343
|
return FALSE;
|
344
|
}
|
345
|
|
346
|
public function deleteItem($item) {
|
347
|
unset($this->queue[$item->item_id]);
|
348
|
}
|
349
|
|
350
|
public function releaseItem($item) {
|
351
|
if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
|
352
|
$this->queue[$item->item_id]->expire = 0;
|
353
|
return TRUE;
|
354
|
}
|
355
|
return FALSE;
|
356
|
}
|
357
|
|
358
|
public function createQueue() {
|
359
|
// Nothing needed here.
|
360
|
}
|
361
|
|
362
|
public function deleteQueue() {
|
363
|
$this->queue = array();
|
364
|
$this->id_sequence = 0;
|
365
|
}
|
366
|
}
|
367
|
|
368
|
/**
|
369
|
* @} End of "defgroup queue".
|
370
|
*/
|