Project

General

Profile

Paste
Download (12.3 KB) Statistics
| Branch: | Revision:

root / drupal7 / modules / system / system.queue.inc @ db2d93dd

1
<?php
2

    
3
/**
4
 * @file
5
 * Queue functionality.
6
 */
7

    
8
/**
9
 * @defgroup queue Queue operations
10
 * @{
11
 * Queue items to allow later processing.
12
 *
13
 * The queue system allows placing items in a queue and processing them later.
14
 * The system tries to ensure that only one consumer can process an item.
15
 *
16
 * Before a queue can be used it needs to be created by
17
 * DrupalQueueInterface::createQueue().
18
 *
19
 * Items can be added to the queue by passing an arbitrary data object to
20
 * DrupalQueueInterface::createItem().
21
 *
22
 * To process an item, call DrupalQueueInterface::claimItem() and specify how
23
 * long you want to have a lease for working on that item. When finished
24
 * processing, the item needs to be deleted by calling
25
 * DrupalQueueInterface::deleteItem(). If the consumer dies, the item will be
26
 * made available again by the DrupalQueueInterface implementation once the
27
 * lease expires. Another consumer will then be able to receive it when calling
28
 * DrupalQueueInterface::claimItem(). Due to this, the processing code should
29
 * be aware that an item might be handed over for processing more than once.
30
 *
31
 * The $item object used by the DrupalQueueInterface can contain arbitrary
32
 * metadata depending on the implementation. Systems using the interface should
33
 * only rely on the data property which will contain the information passed to
34
 * DrupalQueueInterface::createItem(). The full queue item returned by
35
 * DrupalQueueInterface::claimItem() needs to be passed to
36
 * DrupalQueueInterface::deleteItem() once processing is completed.
37
 *
38
 * There are two kinds of queue backends available: reliable, which preserves
39
 * the order of messages and guarantees that every item will be executed at
40
 * least once. The non-reliable kind only does a best effort to preserve order
41
 * in messages and to execute them at least once but there is a small chance
42
 * that some items get lost. For example, some distributed back-ends like
43
 * Amazon SQS will be managing jobs for a large set of producers and consumers
44
 * where a strict FIFO ordering will likely not be preserved. Another example
45
 * would be an in-memory queue backend which might lose items if it crashes.
46
 * However, such a backend would be able to deal with significantly more writes
47
 * than a reliable queue and for many tasks this is more important. See
48
 * aggregator_cron() for an example of how to effectively utilize a
49
 * non-reliable queue. Another example is doing Twitter statistics -- the small
50
 * possibility of losing a few items is insignificant next to power of the
51
 * queue being able to keep up with writes. As described in the processing
52
 * section, regardless of the queue being reliable or not, the processing code
53
 * should be aware that an item might be handed over for processing more than
54
 * once (because the processing code might time out before it finishes).
55
 */
56

    
57
/**
58
 * Factory class for interacting with queues.
59
 */
60
class DrupalQueue {
61
  /**
62
   * Returns the queue object for a given name.
63
   *
64
   * The following variables can be set by variable_set or $conf overrides:
65
   * - queue_class_$name: the class to be used for the queue $name.
66
   * - queue_default_class: the class to use when queue_class_$name is not
67
   *   defined. Defaults to SystemQueue, a reliable backend using SQL.
68
   * - queue_default_reliable_class: the class to use when queue_class_$name is
69
   *   not defined and the queue_default_class is not reliable. Defaults to
70
   *   SystemQueue.
71
   *
72
   * @param $name
73
   *   Arbitrary string. The name of the queue to work with.
74
   * @param $reliable
75
   *   TRUE if the ordering of items and guaranteeing every item executes at
76
   *   least once is important, FALSE if scalability is the main concern.
77
   *
78
   * @return
79
   *   The queue object for a given name.
80
   */
81
  public static function get($name, $reliable = FALSE) {
82
    static $queues;
83
    if (!isset($queues[$name])) {
84
      $class = variable_get('queue_class_' . $name, NULL);
85
      if (!$class) {
86
        $class = variable_get('queue_default_class', 'SystemQueue');
87
      }
88
      $object = new $class($name);
89
      if ($reliable && !$object instanceof DrupalReliableQueueInterface) {
90
        $class = variable_get('queue_default_reliable_class', 'SystemQueue');
91
        $object = new $class($name);
92
      }
93
      $queues[$name] = $object;
94
    }
95
    return $queues[$name];
96
  }
97
}
98

    
99
interface DrupalQueueInterface {
100

    
101
  /**
102
   * Add a queue item and store it directly to the queue.
103
   *
104
   * @param $data
105
   *   Arbitrary data to be associated with the new task in the queue.
106
   * @return
107
   *   TRUE if the item was successfully created and was (best effort) added
108
   *   to the queue, otherwise FALSE. We don't guarantee the item was
109
   *   committed to disk etc, but as far as we know, the item is now in the
110
   *   queue.
111
   */
112
  public function createItem($data);
113

    
114
  /**
115
   * Retrieve the number of items in the queue.
116
   *
117
   * This is intended to provide a "best guess" count of the number of items in
118
   * the queue. Depending on the implementation and the setup, the accuracy of
119
   * the results of this function may vary.
120
   *
121
   * e.g. On a busy system with a large number of consumers and items, the
122
   * result might only be valid for a fraction of a second and not provide an
123
   * accurate representation.
124
   *
125
   * @return
126
   *   An integer estimate of the number of items in the queue.
127
   */
128
  public function numberOfItems();
129

    
130
  /**
131
   * Claim an item in the queue for processing.
132
   *
133
   * @param $lease_time
134
   *   How long the processing is expected to take in seconds, defaults to an
135
   *   hour. After this lease expires, the item will be reset and another
136
   *   consumer can claim the item. For idempotent tasks (which can be run
137
   *   multiple times without side effects), shorter lease times would result
138
   *   in lower latency in case a consumer fails. For tasks that should not be
139
   *   run more than once (non-idempotent), a larger lease time will make it
140
   *   more rare for a given task to run multiple times in cases of failure,
141
   *   at the cost of higher latency.
142
   * @return
143
   *   On success we return an item object. If the queue is unable to claim an
144
   *   item it returns false. This implies a best effort to retrieve an item
145
   *   and either the queue is empty or there is some other non-recoverable
146
   *   problem.
147
   */
148
  public function claimItem($lease_time = 3600);
149

    
150
  /**
151
   * Delete a finished item from the queue.
152
   *
153
   * @param $item
154
   *   The item returned by DrupalQueueInterface::claimItem().
155
   */
156
  public function deleteItem($item);
157

    
158
  /**
159
   * Release an item that the worker could not process, so another
160
   * worker can come in and process it before the timeout expires.
161
   *
162
   * @param $item
163
   * @return boolean
164
   */
165
  public function releaseItem($item);
166

    
167
  /**
168
   * Create a queue.
169
   *
170
   * Called during installation and should be used to perform any necessary
171
   * initialization operations. This should not be confused with the
172
   * constructor for these objects, which is called every time an object is
173
   * instantiated to operate on a queue. This operation is only needed the
174
   * first time a given queue is going to be initialized (for example, to make
175
   * a new database table or directory to hold tasks for the queue -- it
176
   * depends on the queue implementation if this is necessary at all).
177
   */
178
  public function createQueue();
179

    
180
  /**
181
   * Delete a queue and every item in the queue.
182
   */
183
  public function deleteQueue();
184
}
185

    
186
/**
187
 * Reliable queue interface.
188
 *
189
 * Classes implementing this interface preserve the order of messages and
190
 * guarantee that every item will be executed at least once.
191
 */
192
interface DrupalReliableQueueInterface extends DrupalQueueInterface {
193
}
194

    
195
/**
196
 * Default queue implementation.
197
 */
198
class SystemQueue implements DrupalReliableQueueInterface {
199
  /**
200
   * The name of the queue this instance is working with.
201
   *
202
   * @var string
203
   */
204
  protected $name;
205

    
206
  public function __construct($name) {
207
    $this->name = $name;
208
  }
209

    
210
  public function createItem($data) {
211
    // During a Drupal 6.x to 7.x update, drupal_get_schema() does not contain
212
    // the queue table yet, so we cannot rely on drupal_write_record().
213
    $query = db_insert('queue')
214
      ->fields(array(
215
        'name' => $this->name,
216
        'data' => serialize($data),
217
        // We cannot rely on REQUEST_TIME because many items might be created
218
        // by a single request which takes longer than 1 second.
219
        'created' => time(),
220
      ));
221
    return (bool) $query->execute();
222
  }
223

    
224
  public function numberOfItems() {
225
    return db_query('SELECT COUNT(item_id) FROM {queue} WHERE name = :name', array(':name' => $this->name))->fetchField();
226
  }
227

    
228
  public function claimItem($lease_time = 30) {
229
    // Claim an item by updating its expire fields. If claim is not successful
230
    // another thread may have claimed the item in the meantime. Therefore loop
231
    // until an item is successfully claimed or we are reasonably sure there
232
    // are no unclaimed items left.
233
    while (TRUE) {
234
      $item = db_query_range('SELECT data, item_id FROM {queue} q WHERE expire = 0 AND name = :name ORDER BY created, item_id ASC', 0, 1, array(':name' => $this->name))->fetchObject();
235
      if ($item) {
236
        // Try to update the item. Only one thread can succeed in UPDATEing the
237
        // same row. We cannot rely on REQUEST_TIME because items might be
238
        // claimed by a single consumer which runs longer than 1 second. If we
239
        // continue to use REQUEST_TIME instead of the current time(), we steal
240
        // time from the lease, and will tend to reset items before the lease
241
        // should really expire.
242
        $update = db_update('queue')
243
          ->fields(array(
244
            'expire' => time() + $lease_time,
245
          ))
246
          ->condition('item_id', $item->item_id)
247
          ->condition('expire', 0);
248
        // If there are affected rows, this update succeeded.
249
        if ($update->execute()) {
250
          $item->data = unserialize($item->data);
251
          return $item;
252
        }
253
      }
254
      else {
255
        // No items currently available to claim.
256
        return FALSE;
257
      }
258
    }
259
  }
260

    
261
  public function releaseItem($item) {
262
    $update = db_update('queue')
263
      ->fields(array(
264
        'expire' => 0,
265
      ))
266
      ->condition('item_id', $item->item_id);
267
      return $update->execute();
268
  }
269

    
270
  public function deleteItem($item) {
271
    db_delete('queue')
272
      ->condition('item_id', $item->item_id)
273
      ->execute();
274
  }
275

    
276
  public function createQueue() {
277
    // All tasks are stored in a single database table (which is created when
278
    // Drupal is first installed) so there is nothing we need to do to create
279
    // a new queue.
280
  }
281

    
282
  public function deleteQueue() {
283
    db_delete('queue')
284
      ->condition('name', $this->name)
285
      ->execute();
286
  }
287
}
288

    
289
/**
290
 * Static queue implementation.
291
 *
292
 * This allows "undelayed" variants of processes relying on the Queue
293
 * interface. The queue data resides in memory. It should only be used for
294
 * items that will be queued and dequeued within a given page request.
295
 */
296
class MemoryQueue implements DrupalQueueInterface {
297
  /**
298
   * The queue data.
299
   *
300
   * @var array
301
   */
302
  protected $queue;
303

    
304
  /**
305
   * Counter for item ids.
306
   *
307
   * @var int
308
   */
309
  protected $id_sequence;
310

    
311
  /**
312
   * Start working with a queue.
313
   *
314
   * @param $name
315
   *   Arbitrary string. The name of the queue to work with.
316
   */
317
  public function __construct($name) {
318
    $this->queue = array();
319
    $this->id_sequence = 0;
320
  }
321

    
322
  public function createItem($data) {
323
    $item = new stdClass();
324
    $item->item_id = $this->id_sequence++;
325
    $item->data = $data;
326
    $item->created = time();
327
    $item->expire = 0;
328
    $this->queue[$item->item_id] = $item;
329
  }
330

    
331
  public function numberOfItems() {
332
    return count($this->queue);
333
  }
334

    
335
  public function claimItem($lease_time = 30) {
336
    foreach ($this->queue as $key => $item) {
337
      if ($item->expire == 0) {
338
        $item->expire = time() + $lease_time;
339
        $this->queue[$key] = $item;
340
        return $item;
341
      }
342
    }
343
    return FALSE;
344
  }
345

    
346
  public function deleteItem($item) {
347
    unset($this->queue[$item->item_id]);
348
  }
349

    
350
  public function releaseItem($item) {
351
    if (isset($this->queue[$item->item_id]) && $this->queue[$item->item_id]->expire != 0) {
352
      $this->queue[$item->item_id]->expire = 0;
353
      return TRUE;
354
    }
355
    return FALSE;
356
  }
357

    
358
  public function createQueue() {
359
    // Nothing needed here.
360
  }
361

    
362
  public function deleteQueue() {
363
    $this->queue = array();
364
    $this->id_sequence = 0;
365
  }
366
}
367

    
368
/**
369
 * @} End of "defgroup queue".
370
 */