Projet

Général

Profil

Paste
Télécharger (37,1 ko) Statistiques
| Branche: | Révision:

root / drupal7 / sites / all / modules / feeds / includes / FeedsSource.inc @ ed9a13f1

1
<?php
2

    
3
/**
4
 * @file
5
 * Definition of FeedsSourceInterface, FeedsState and FeedsSource class.
6
 */
7

    
8
/**
9
 * Distinguish exceptions occurring when handling locks.
10
 */
11
class FeedsLockException extends Exception {}
12

    
13
/**
14
 * Denote a import or clearing stage. Used for multi page processing.
15
 */
16
define('FEEDS_START', 'start_time');
17
define('FEEDS_FETCH', 'fetch');
18
define('FEEDS_PARSE', 'parse');
19
define('FEEDS_PROCESS', 'process');
20
define('FEEDS_PROCESS_CLEAR', 'process_clear');
21
define('FEEDS_PROCESS_EXPIRE', 'process_expire');
22

    
23
/**
24
 * Defines an interface for a feed source.
25
 */
26
interface FeedsSourceInterface {
27

    
28
  /**
29
   * Returns if a plugin handles source specific configuration.
30
   *
31
   * Crutch: for ease of use, we implement FeedsSourceInterface for every
32
   * plugin, but then we need to have a handle which plugin actually implements
33
   * source configuration.
34
   *
35
   * @see FeedsPlugin
36
   *
37
   * @return bool
38
   *   TRUE if a plugin handles source specific configuration, FALSE otherwise.
39
   */
40
  public function hasSourceConfig();
41

    
42
  /**
43
   * Return an associative array of default values.
44
   */
45
  public function sourceDefaults();
46

    
47
  /**
48
   * Returns a Form API form array that defines a form configuring values.
49
   *
50
   * Keys correspond to the keys of the return value of sourceDefaults().
51
   */
52
  public function sourceForm($source_config);
53

    
54
  /**
55
   * Validate user entered values submitted by sourceForm().
56
   */
57
  public function sourceFormValidate(&$source_config);
58

    
59
  /**
60
   * A source is being saved.
61
   */
62
  public function sourceSave(FeedsSource $source);
63

    
64
  /**
65
   * A source is being deleted.
66
   */
67
  public function sourceDelete(FeedsSource $source);
68

    
69
}
70

    
71
/**
72
 * Status of an import or clearing operation on a source.
73
 */
74
class FeedsState {
75

    
76
  /**
77
   * Floating point number denoting the progress made.
78
   *
79
   * 0.0 meaning no progress.
80
   * 1.0 = FEEDS_BATCH_COMPLETE, meaning finished.
81
   *
82
   * @var float
83
   */
84
  public $progress;
85

    
86
  /**
87
   * Used as a pointer to store where left off. Must be serializable.
88
   *
89
   * @var mixed
90
   */
91
  public $pointer;
92

    
93
  /**
94
   * Natural numbers denoting more details about the progress being made.
95
   *
96
   * @var int
97
   */
98
  public $total;
99
  public $created;
100
  public $updated;
101
  public $deleted;
102
  public $unpublished;
103
  public $blocked;
104
  public $skipped;
105
  public $failed;
106

    
107
  /**
108
   * IDs of entities to be removed.
109
   *
110
   * @var array
111
   */
112
  public $removeList;
113

    
114
  /**
115
   * Constructor, initialize variables.
116
   */
117
  public function __construct() {
118
    $this->progress = FEEDS_BATCH_COMPLETE;
119
    $this->total
120
      = $this->created
121
      = $this->updated
122
      = $this->deleted
123
      = $this->unpublished
124
      = $this->blocked
125
      = $this->skipped
126
      = $this->failed
127
      = 0;
128
  }
129

    
130
  /**
131
   * Safely report progress.
132
   *
133
   * When $total == $progress, the state of the task tracked by this state is
134
   * regarded to be complete.
135
   *
136
   * Handles the following cases gracefully:
137
   *
138
   * - $total is 0
139
   * - $progress is larger than $total
140
   * - $progress approximates $total so that $finished rounds to 1.0
141
   *
142
   * @param int $total
143
   *   A natural number that is the total to be worked off.
144
   * @param int $progress
145
   *   A natural number that is the progress made on $total.
146
   */
147
  public function progress($total, $progress) {
148
    if ($progress > $total) {
149
      $this->progress = FEEDS_BATCH_COMPLETE;
150
    }
151
    elseif ($total) {
152
      $this->progress = (float) $progress / $total;
153
      if ($this->progress == FEEDS_BATCH_COMPLETE && $total != $progress) {
154
        $this->progress = 0.99;
155
      }
156
    }
157
    else {
158
      $this->progress = FEEDS_BATCH_COMPLETE;
159
    }
160
  }
161

    
162
}
163

    
164
/**
165
 * Holds the source of a feed to import.
166
 *
167
 * This class encapsulates a source of a feed. It stores where the feed can be
168
 * found and how to import it.
169
 *
170
 * Information on how to import a feed is encapsulated in a FeedsImporter object
171
 * which is identified by the common id of the FeedsSource and the
172
 * FeedsImporter. More than one FeedsSource can use the same FeedsImporter
173
 * therefore a FeedsImporter never holds a pointer to a FeedsSource object, nor
174
 * does it hold any other information for a particular FeedsSource object.
175
 *
176
 * Classes extending FeedsPlugin can implement a sourceForm to expose
177
 * configuration for a FeedsSource object. This is for instance how FeedsFetcher
178
 * exposes a text field for a feed URL or how FeedsCSVParser exposes a select
179
 * field for choosing between colon or semicolon delimiters.
180
 *
181
 * It is important that a FeedsPlugin does not directly hold information about
182
 * a source but leave all storage up to FeedsSource. An instance of a
183
 * FeedsPlugin class only exists once per FeedsImporter configuration, while an
184
 * instance of a FeedsSource class exists once per feed_nid to be imported.
185
 *
186
 * As with FeedsImporter, the idea with FeedsSource is that it can be used
187
 * without actually saving the object to the database.
188
 */
189
class FeedsSource extends FeedsConfigurable {
190

    
191
  /**
192
   * Contains the node id of the feed this source info object is attached to.
193
   *
194
   * Equals 0 if not attached to any node - for example when used on a
195
   * standalone import form within Feeds or by other API users.
196
   *
197
   * @var int
198
   */
199
  protected $feed_nid;
200

    
201
  /**
202
   * The FeedsImporter object that this source is expected to be used with.
203
   *
204
   * @var FeedsImporter
205
   */
206
  protected $importer;
207

    
208
  /**
209
   * Holds the current state of an import, clear or expire task.
210
   *
211
   * Array keys can be:
212
   * - FEEDS_START
213
   *   Timestamp of when a task has started.
214
   * - FEEDS_FETCH
215
   *   A FeedsState object holding the state of the fetch stage, used during
216
   *   imports.
217
   * - FEEDS_PARSE
218
   *   A FeedsState object holding the state of the parse stage, used during
219
   *   imports.
220
   * - FEEDS_PROCESS
221
   *   A FeedsState object holding the state of the process stage, used during
222
   *   imports.
223
   * - FEEDS_PROCESS_CLEAR
224
   *   A FeedsState object holding the state of the clear task.
225
   * - FEEDS_PROCESS_EXPIRE
226
   *   A FeedsState object holding the state of the expire task.
227
   *
228
   * @var FeedsState[]|array|null
229
   */
230
  protected $state;
231

    
232
  /**
233
   * Fetcher result, used to cache fetcher result when batching.
234
   *
235
   * @var FeedsFetcherResult
236
   */
237
  protected $fetcher_result;
238

    
239
  /**
240
   * Timestamp of when this source was imported the last time.
241
   *
242
   * @var int
243
   */
244
  protected $imported;
245

    
246
  /**
247
   * Holds an exception object in case an exception occurs during importing.
248
   *
249
   * @var Exception|null
250
   */
251
  protected $exception;
252

    
253
  /**
254
   * The account switcher.
255
   *
256
   * @var FeedsAccountSwitcherInterface
257
   */
258
  protected $accountSwitcher;
259

    
260
  /**
261
   * Instantiates an unique FeedsSource per class, importer ID and Feed node ID.
262
   *
263
   * Don't use this method directly, use feeds_source() instead.
264
   *
265
   * @param string $importer_id
266
   *   The machine name of the importer.
267
   * @param int $feed_nid
268
   *   The node id of a feed node if the source is attached to a feed node.
269
   * @param FeedsAccountSwitcherInterface $account_switcher
270
   *   The account switcher to use to be able to perform actions as a different
271
   *   user.
272
   */
273
  public static function instance($importer_id, $feed_nid, FeedsAccountSwitcherInterface $account_switcher = NULL) {
274
    $class = variable_get('feeds_source_class', 'FeedsSource');
275

    
276
    $instances = &drupal_static(__METHOD__, array());
277

    
278
    if (!isset($instances[$class][$importer_id][$feed_nid])) {
279
      $instances[$class][$importer_id][$feed_nid] = new $class($importer_id, $feed_nid, $account_switcher);
280
    }
281
    return $instances[$class][$importer_id][$feed_nid];
282
  }
283

    
284
  /**
285
   * Constructor.
286
   *
287
   * @param string $importer_id
288
   *   The machine name of the importer.
289
   * @param int $feed_nid
290
   *   The feed node ID for this Feeds source. This should be '0' if the
291
   *   importer is not attached to a content type.
292
   * @param FeedsAccountSwitcherInterface $account_switcher
293
   *   The account switcher to use to be able to perform actions as a different
294
   *   user.
295
   */
296
  protected function __construct($importer_id, $feed_nid, FeedsAccountSwitcherInterface $account_switcher = NULL) {
297
    $this->feed_nid = $feed_nid;
298
    $this->importer = feeds_importer($importer_id);
299
    if (is_null($account_switcher)) {
300
      $this->accountSwitcher = new FeedsAccountSwitcher();
301
    }
302
    else {
303
      $this->accountSwitcher = $account_switcher;
304
    }
305
    parent::__construct($importer_id);
306
    $this->load();
307
  }
308

    
309
  /**
310
   * Returns the FeedsImporter object for this source.
311
   *
312
   * @return FeedsImporter
313
   *   The importer associated with this Feeds source.
314
   */
315
  public function importer() {
316
    return $this->importer;
317
  }
318

    
319
  /**
320
   * Preview = fetch and parse a feed.
321
   *
322
   * @return FeedsParserResult
323
   *   A FeedsParserResult instance.
324
   *
325
   * @throws Exception
326
   *   If an error occurs when fetching or parsing.
327
   */
328
  public function preview() {
329
    $result = $this->importer->fetcher->fetch($this);
330
    $result = $this->importer->parser->parse($this, $result);
331
    module_invoke_all('feeds_after_parse', $this, $result);
332
    return $result;
333
  }
334

    
335
  /**
336
   * Start importing a source.
337
   *
338
   * This method starts an import job. Depending on the configuration of the
339
   * importer of this source, a Batch API job or a background job with Job
340
   * Scheduler will be created.
341
   *
342
   * @throws Exception
343
   *   If processing in background is enabled, the first batch chunk of the
344
   *   import will be executed on the current page request. This means that this
345
   *   method may throw the same exceptions as FeedsSource::import().
346
   */
347
  public function startImport() {
348
    $config = $this->importer->getConfig();
349
    if ($config['process_in_background']) {
350
      $this->startBackgroundJob('import');
351
    }
352
    else {
353
      $this->startBatchAPIJob(t('Importing'), 'import');
354
    }
355
  }
356

    
357
  /**
358
   * Start deleting all imported items of a source.
359
   *
360
   * This method starts a clear job. Depending on the configuration of the
361
   * importer of this source, a Batch API job or a background job with Job
362
   * Scheduler will be created.
363
   *
364
   * @throws Exception
365
   *   If processing in background is enabled, the first batch chunk of the
366
   *   clear task will be executed on the current page request. This means that
367
   *   this method may throw the same exceptions as FeedsSource::clear().
368
   */
369
  public function startClear() {
370
    $config = $this->importer->getConfig();
371
    if ($config['process_in_background']) {
372
      $this->startBackgroundJob('clear');
373
    }
374
    else {
375
      $this->startBatchAPIJob(t('Deleting'), 'clear');
376
    }
377
  }
378

    
379
  /**
380
   * Schedule all periodic tasks for this source, even when scheduled before.
381
   */
382
  public function schedule() {
383
    $this->scheduleImport();
384
    $this->scheduleExpire();
385
  }
386

    
387
  /**
388
   * Schedule all periodic tasks for this source if not already scheduled.
389
   */
390
  public function ensureSchedule() {
391
    $this->scheduleImport(FALSE);
392
    $this->scheduleExpire(FALSE);
393
  }
394

    
395
  /**
396
   * Schedule periodic or background import tasks.
397
   *
398
   * @param bool $force
399
   *   (optional) If true, forces the scheduling to happen.
400
   *   Defaults to true.
401
   */
402
  public function scheduleImport($force = TRUE) {
403
    // Check whether any fetcher is overriding the import period.
404
    $period = $this->importer->config['import_period'];
405
    $fetcher_period = $this->importer->fetcher->importPeriod($this);
406
    if (is_numeric($fetcher_period)) {
407
      $period = $fetcher_period;
408
    }
409
    $job = array(
410
      'type' => $this->id,
411
      'id' => $this->feed_nid,
412
      'period' => $period,
413
      'periodic' => TRUE,
414
    );
415
    if ($period == FEEDS_SCHEDULE_NEVER && $this->progressImporting() === FEEDS_BATCH_COMPLETE) {
416
      JobScheduler::get('feeds_source_import')->remove($job);
417
    }
418
    elseif ($this->progressImporting() === FEEDS_BATCH_COMPLETE) {
419
      // Check for an existing job first.
420
      $existing = JobScheduler::get('feeds_source_import')->check($job);
421
      if (!$existing || $force) {
422
        // If there is no existing job, schedule a new job.
423
        JobScheduler::get('feeds_source_import')->set($job);
424
      }
425
      elseif ($existing['scheduled']) {
426
        // If the previous job is still marked as 'running', reschedule it.
427
        JobScheduler::get('feeds_source_import')->reschedule($existing);
428
      }
429
    }
430
    elseif (!$this->isQueued()) {
431
      // Feed is not fully imported yet, so we put this job back in the queue
432
      // immediately for further processing.
433
      $queue = DrupalQueue::get('feeds_source_import');
434
      $queue->createItem($job);
435
    }
436
  }
437

    
438
  /**
439
   * Schedule background expire tasks.
440
   *
441
   * @param bool $force
442
   *   (optional) If true, forces the scheduling to happen.
443
   *   Defaults to true.
444
   */
445
  public function scheduleExpire($force = TRUE) {
446
    // Schedule as soon as possible if a batch is active.
447
    $period = $this->progressExpiring() === FEEDS_BATCH_COMPLETE ? 3600 : 0;
448

    
449
    $job = array(
450
      'type' => $this->id,
451
      'id' => $this->feed_nid,
452
      'period' => $period,
453
      'periodic' => TRUE,
454
    );
455
    if ($this->importer->processor->expiryTime() == FEEDS_EXPIRE_NEVER) {
456
      JobScheduler::get('feeds_source_expire')->remove($job);
457
    }
458
    else {
459
      // Check for an existing job first.
460
      $existing = JobScheduler::get('feeds_source_expire')->check($job);
461
      if (!$existing || $force) {
462
        // If there is no existing job, schedule a new job.
463
        JobScheduler::get('feeds_source_expire')->set($job);
464
      }
465
      elseif ($existing['scheduled']) {
466
        // If the previous job is still marked as 'running', reschedule it.
467
        JobScheduler::get('feeds_source_expire')->reschedule($existing);
468
      }
469
    }
470
  }
471

    
472
  /**
473
   * Schedule background clearing tasks.
474
   */
475
  public function scheduleClear() {
476
    $job = array(
477
      'type' => $this->id,
478
      'id' => $this->feed_nid,
479
    );
480

    
481
    if ($this->progressClearing() !== FEEDS_BATCH_COMPLETE) {
482
      // Feed is not fully cleared yet, so we put this job back in the queue
483
      // immediately for further processing.
484
      $queue = DrupalQueue::get('feeds_source_clear');
485
      $queue->createItem($job);
486
    }
487
  }
488

    
489
  /**
490
   * Import a source: execute fetching, parsing and processing stage.
491
   *
492
   * This method only executes the current batch chunk, then returns. If you are
493
   * looking to import an entire source, use FeedsSource::startImport() instead.
494
   *
495
   * @return float
496
   *   FEEDS_BATCH_COMPLETE if the import process finished. A decimal between
497
   *   0.0 and 0.9 periodic if import is still in progress.
498
   *
499
   * @throws Exception
500
   *   In case an error occurs when importing.
501
   */
502
  public function import() {
503
    $this->acquireLock();
504
    try {
505
      // If fetcher result is empty, we are starting a new import, log.
506
      if (empty($this->fetcher_result)) {
507
        module_invoke_all('feeds_before_import', $this);
508
        if (module_exists('rules')) {
509
          rules_invoke_event('feeds_before_import', $this);
510
        }
511
        $this->state[FEEDS_START] = time();
512
      }
513

    
514
      // Fetch.
515
      if (empty($this->fetcher_result) || FEEDS_BATCH_COMPLETE == $this->progressParsing()) {
516
        $this->fetcher_result = $this->importer->fetcher->fetch($this);
517
        // Clean the parser's state, we are parsing an entirely new file.
518
        unset($this->state[FEEDS_PARSE]);
519
      }
520

    
521
      // Parse.
522
      $parser_result = $this->importer->parser->parse($this, $this->fetcher_result);
523
      module_invoke_all('feeds_after_parse', $this, $parser_result);
524

    
525
      // Process.
526
      $this->importer->processor->process($this, $parser_result);
527

    
528
      // Import finished without exceptions, so unset any potentially previously
529
      // recorded exceptions.
530
      unset($this->exception);
531
    }
532
    catch (Exception $e) {
533
      // $e is stored and re-thrown once we've had a chance to log our progress.
534
      // Set the exception so that other modules can check if an exception
535
      // occurred in hook_feeds_after_import().
536
      $this->exception = $e;
537
    }
538

    
539
    // Clean up.
540
    $result = $this->progressImporting();
541
    if ($result == FEEDS_BATCH_COMPLETE || isset($e)) {
542
      $this->finishImport();
543
    }
544

    
545
    $this->save();
546
    $this->releaseLock();
547

    
548
    if (isset($e)) {
549
      throw $e;
550
    }
551

    
552
    return $result;
553
  }
554

    
555
  /**
556
   * Imports a fetcher result all at once in memory.
557
   *
558
   * @param FeedsFetcherResult $fetcher_result
559
   *   The fetcher result to process.
560
   *
561
   * @throws Exception
562
   *   Thrown if an error occurs when importing.
563
   */
564
  public function pushImport(FeedsFetcherResult $fetcher_result) {
565
    // Since locks only work during a request, check if an import is active.
566
    if (!empty($this->fetcher_result) || !empty($this->state)) {
567
      throw new RuntimeException('The feed is currently importing.');
568
    }
569

    
570
    $this->acquireLock();
571
    $this->state[FEEDS_START] = time();
572

    
573
    try {
574
      module_invoke_all('feeds_before_import', $this);
575

    
576
      // Parse.
577
      do {
578
        $parser_result = $this->importer->parser->parse($this, $fetcher_result);
579
        module_invoke_all('feeds_after_parse', $this, $parser_result);
580

    
581
        // Process.
582
        $this->importer->processor->process($this, $parser_result);
583

    
584
      } while ($this->progressParsing() !== FEEDS_BATCH_COMPLETE);
585
    }
586
    catch (Exception $e) {
587
      // $e is stored and re-thrown once we've had a chance to log our progress.
588
      // Set the exception so that other modules can check if an exception
589
      // occurred in hook_feeds_after_import().
590
      $this->exception = $e;
591
    }
592

    
593
    $this->finishImport();
594

    
595
    $this->save();
596
    $this->releaseLock();
597

    
598
    if (isset($e)) {
599
      throw $e;
600
    }
601
  }
602

    
603
  /**
604
   * Cleans up after an import.
605
   */
606
  protected function finishImport() {
607
    $this->imported = time();
608
    $this->log('import', 'Imported in @s seconds.', array('@s' => $this->imported - $this->state[FEEDS_START]), WATCHDOG_INFO);
609

    
610
    // Allow fetcher to react on finishing importing.
611
    $this->importer->fetcher->afterImport($this);
612

    
613
    // Allow other modules to react upon finishing importing.
614
    module_invoke_all('feeds_after_import', $this);
615
    if (module_exists('rules')) {
616
      rules_invoke_event('feeds_after_import', $this);
617
    }
618

    
619
    $this->clearStates();
620
  }
621

    
622
  /**
623
   * Remove all items from a feed.
624
   *
625
   * This method only executes the current batch chunk, then returns. If you are
626
   * looking to delete all items of a source, use FeedsSource::startClear()
627
   * instead.
628
   *
629
   * @return float
630
   *   FEEDS_BATCH_COMPLETE if the clearing process finished. A decimal between
631
   *   0.0 and 0.9 periodic if clearing is still in progress.
632
   *
633
   * @throws Exception
634
   *   In case an error occurs when clearing.
635
   */
636
  public function clear() {
637
    $this->acquireLock();
638
    try {
639
      $this->importer->fetcher->clear($this);
640
      $this->importer->parser->clear($this);
641
      $this->importer->processor->clear($this);
642
    }
643
    catch (Exception $e) {
644
      // $e is stored and re-thrown once we've had a chance to log our progress.
645
    }
646
    $this->releaseLock();
647

    
648
    // Clean up.
649
    $result = $this->progressClearing();
650
    if ($result == FEEDS_BATCH_COMPLETE || isset($e)) {
651
      module_invoke_all('feeds_after_clear', $this);
652
      $this->clearStates();
653
    }
654
    $this->save();
655
    if (isset($e)) {
656
      throw $e;
657
    }
658
    return $result;
659
  }
660

    
661
  /**
662
   * Removes all expired items from a feed.
663
   */
664
  public function expire() {
665
    $this->acquireLock();
666
    try {
667
      $result = $this->importer->processor->expire($this);
668
    }
669
    catch (Exception $e) {
670
      // Will throw after the lock is released.
671
    }
672
    $this->releaseLock();
673

    
674
    if (isset($e)) {
675
      throw $e;
676
    }
677

    
678
    return $result;
679
  }
680

    
681
  /**
682
   * Report progress as float between 0 and 1. 1 = FEEDS_BATCH_COMPLETE.
683
   */
684
  public function progressParsing() {
685
    return $this->state(FEEDS_PARSE)->progress;
686
  }
687

    
688
  /**
689
   * Report progress as float between 0 and 1. 1 = FEEDS_BATCH_COMPLETE.
690
   */
691
  public function progressImporting() {
692
    $fetcher = $this->state(FEEDS_FETCH);
693
    $parser = $this->state(FEEDS_PARSE);
694
    if ($fetcher->progress == FEEDS_BATCH_COMPLETE && $parser->progress == FEEDS_BATCH_COMPLETE) {
695
      return FEEDS_BATCH_COMPLETE;
696
    }
697
    // Fetching envelops parsing.
698
    // @todo: this assumes all fetchers neatly use total. May not be the case.
699
    $fetcher_fraction = $fetcher->total ? 1.0 / $fetcher->total : 1.0;
700
    $parser_progress = $parser->progress * $fetcher_fraction;
701
    $result = $fetcher->progress - $fetcher_fraction + $parser_progress;
702
    if ($result == FEEDS_BATCH_COMPLETE) {
703
      return 0.99;
704
    }
705
    return $result;
706
  }
707

    
708
  /**
709
   * Report progress on clearing.
710
   */
711
  public function progressClearing() {
712
    return $this->state(FEEDS_PROCESS_CLEAR)->progress;
713
  }
714

    
715
  /**
716
   * Report progress on expiry.
717
   */
718
  public function progressExpiring() {
719
    return $this->state(FEEDS_PROCESS_EXPIRE)->progress;
720
  }
721

    
722
  /**
723
   * Return a state object for a given stage. Lazy instantiates new states.
724
   *
725
   * @param string $stage
726
   *   One of FEEDS_FETCH, FEEDS_PARSE, FEEDS_PROCESS or FEEDS_PROCESS_CLEAR.
727
   *
728
   * @return FeedsState|mixed
729
   *   The FeedsState object for the given stage.
730
   *   In theory, this could return something else, if $this->state has been
731
   *   polluted with e.g. integer timestamps.
732
   *
733
   * @see FeedsSource::$state
734
   */
735
  public function state($stage) {
736
    if (!is_array($this->state)) {
737
      $this->state = array();
738
    }
739
    if (!isset($this->state[$stage])) {
740
      $this->state[$stage] = new FeedsState();
741
    }
742
    return $this->state[$stage];
743
  }
744

    
745
  /**
746
   * Clears states.
747
   */
748
  protected function clearStates() {
749
    $this->state = array();
750
    $this->fetcher_result = NULL;
751
  }
752

    
753
  /**
754
   * Count items imported by this source.
755
   */
756
  public function itemCount() {
757
    return $this->importer->processor->itemCount($this);
758
  }
759

    
760
  /**
761
   * Returns the next time that the feed will be imported.
762
   *
763
   * @return int|null
764
   *   The next time the feed will be imported as a UNIX timestamp.
765
   *   NULL if not known.
766
   */
767
  public function getNextImportTime() {
768
    $details = $this->getNextImportTimeDetails();
769
    if (isset($details['time'])) {
770
      return $details['time'];
771
    }
772
  }
773

    
774
  /**
775
   * Returns the next time that the feed will be imported.
776
   *
777
   * @param array $methods
778
   *   (optional) Methods to check.
779
   *
780
   * @return array|null
781
   *   Information about when the next time the feed will be imported:
782
   *   - time: the next time the feed will be imported as a UNIX timestamp.
783
   *   - method: via which scheduler the job will ran.
784
   *   - message: If set, time and method should be ignored.
785
   *   Null if no information is available.
786
   */
787
  public function getNextImportTimeDetails(array $methods = array()) {
788
    if (empty($methods)) {
789
      $methods = array(
790
        'queue',
791
        'feeds_reschedule',
792
        'job_scheduler',
793
      );
794
    }
795

    
796
    if (in_array('queue', $methods)) {
797
      // Check queue.
798
      $serialized_job_type = db_like(strtr('s:4:"type";s:!length:"!type";', array(
799
        '!length' => strlen($this->id),
800
        '!type' => $this->id,
801
      )));
802
      $serialized_job_id_as_string = db_like(strtr('s:2:"id";s:!length:"!id";', array(
803
        '!length' => strlen($this->feed_nid),
804
        '!id' => $this->feed_nid,
805
      )));
806
      $serialized_job_id_as_integer = db_like(strtr('s:2:"id";i:!id;', array(
807
        '!id' => $this->feed_nid,
808
      )));
809

    
810
      $queue_created = db_select('queue')
811
        ->fields('queue', array('created'))
812
        ->condition('name', 'feeds_source_import')
813
        ->condition('data', '%' . $serialized_job_type . '%', 'LIKE')
814
        ->condition(db_or()
815
          ->condition('data', '%' . $serialized_job_id_as_string . '%', 'LIKE')
816
          ->condition('data', '%' . $serialized_job_id_as_integer . '%', 'LIKE')
817
        )
818
        ->condition('expire', 0)
819
        ->execute()
820
        ->fetchField();
821

    
822
      if ($queue_created) {
823
        return array(
824
          'time' => $queue_created,
825
          'method' => t('Queue'),
826
        );
827
      }
828

    
829
      // Special case for PostgreSQL: if using that database type, we cannot
830
      // search in the data column of the queue table, because the Drupal
831
      // database layer adds '::text' to bytea columns, which results into the
832
      // data column becoming unreadable in conditions. So instead, we check for
833
      // the first 10 records in the queue to see if the given importer ID +
834
      // feed NID is amongst them.
835
      if (Database::getConnection()->databaseType() == 'pgsql') {
836
        $items = db_query("SELECT data, created FROM {queue} WHERE name = :name AND expire = 0 LIMIT 10", array(
837
          ':name' => 'feeds_source_import',
838
        ));
839
        foreach ($items as $item) {
840
          if (is_string($item->data)) {
841
            $item->data = unserialize($item->data);
842
          }
843
          if ($item->data['type'] == $this->id && $item->data['id'] == $this->feed_nid) {
844
            return array(
845
              'time' => $item->created,
846
              'method' => t('Queue'),
847
            );
848
          }
849
        }
850

    
851
        // If not found by now, count how many items there are in the
852
        // feeds_source_import queue. We use this number later to indicate that
853
        // the job *could* be in the queue.
854
        $number_of_queue_items = db_query('SELECT COUNT(name) FROM {queue} WHERE name = :name AND expire = 0', array(
855
          ':name' => 'feeds_source_import',
856
        ))->fetchField();
857
      }
858
    }
859

    
860
    if (in_array('feeds_reschedule', $methods)) {
861
      if (!$this->doesExist()) {
862
        if ($this->importer->config['import_period'] == FEEDS_SCHEDULE_NEVER) {
863
          // Just not scheduled.
864
          return NULL;
865
        }
866
        // Scheduling information cannot exist yet.
867
        return array(
868
          'time' => NULL,
869
          'method' => NULL,
870
          'message' => t('not scheduled yet, because there is no source'),
871
        );
872
      }
873

    
874
      // Check if the importer is in the process of being rescheduled.
875
      $importers = feeds_reschedule();
876
      if (isset($importers[$this->id])) {
877
        return array(
878
          'time' => NULL,
879
          'method' => NULL,
880
          'message' => t('to be rescheduled'),
881
        );
882
      }
883
    }
884

    
885
    if (in_array('job_scheduler', $methods)) {
886
      // Check job scheduler.
887
      $job = db_select('job_schedule')
888
        ->fields('job_schedule', array('next', 'scheduled'))
889
        ->condition('name', 'feeds_source_import')
890
        ->condition('type', $this->id)
891
        ->condition('id', $this->feed_nid)
892
        ->execute()
893
        ->fetch();
894

    
895
      if (isset($job->next)) {
896
        $details = array(
897
          'time' => $job->next,
898
          'method' => t('Job scheduler'),
899
        );
900
        if (!empty($job->scheduled)) {
901
          if (isset($number_of_queue_items) && $number_of_queue_items > 10) {
902
            // When using PostgreSQL we were not able to efficiently search the
903
            // queue table, so it could still be in that table.
904
            $details['message'] = t('unknown, could still be in the queue');
905
          }
906
          else {
907
            $details['message'] = t('possibly stuck');
908
          }
909
        }
910
        return $details;
911
      }
912
    }
913
  }
914

    
915
  /**
916
   * Checks if a source is queued for import.
917
   *
918
   * @return bool
919
   *   True if the source is queued to be imported.
920
   *   False otherwise.
921
   */
922
  public function isQueued() {
923
    $details = $this->getNextImportTimeDetails(array('queue'));
924
    if ($details) {
925
      return TRUE;
926
    }
927
    return FALSE;
928
  }
929

    
930
  /**
931
   * Unlocks a feed.
932
   */
933
  public function unlock() {
934
    $this->clearStates();
935
    $this->save();
936
    try {
937
      $this->releaseLock();
938
    }
939
    catch (FeedsAccountSwitcherException $exception) {
940
      // Ignore switch back exceptions.
941
    }
942
  }
943

    
944
  /**
945
   * Save configuration.
946
   */
947
  public function save() {
948
    // Alert implementers of FeedsSourceInterface to the fact that we're saving.
949
    foreach ($this->importer->plugin_types as $type) {
950
      $this->importer->$type->sourceSave($this);
951
    }
952
    $config = $this->getConfig();
953

    
954
    // Store the source property of the fetcher in a separate column so that we
955
    // can do fast lookups on it.
956
    $source = '';
957
    if (isset($config[get_class($this->importer->fetcher)]['source'])) {
958
      $source = $config[get_class($this->importer->fetcher)]['source'];
959
    }
960
    $object = array(
961
      'id' => $this->id,
962
      'feed_nid' => $this->feed_nid,
963
      'imported' => $this->imported,
964
      'config' => $config,
965
      'source' => $source,
966
      'state' => isset($this->state) ? $this->state : FALSE,
967
      'fetcher_result' => isset($this->fetcher_result) ? $this->fetcher_result : FALSE,
968
    );
969
    if (db_query_range("SELECT 1 FROM {feeds_source} WHERE id = :id AND feed_nid = :nid", 0, 1, array(':id' => $this->id, ':nid' => $this->feed_nid))->fetchField()) {
970
      drupal_write_record('feeds_source', $object, array('id', 'feed_nid'));
971
    }
972
    else {
973
      drupal_write_record('feeds_source', $object);
974
    }
975
  }
976

    
977
  /**
978
   * Load configuration and unpack.
979
   *
980
   * @todo Patch CTools to move constants from export.inc to ctools.module.
981
   */
982
  public function load() {
983
    $record = db_query("SELECT imported, config, state, fetcher_result FROM {feeds_source} WHERE id = :id AND feed_nid = :nid", array(
984
      ':id' => $this->id,
985
      ':nid' => $this->feed_nid,
986
    ))->fetchObject();
987
    if ($record) {
988
      // While FeedsSource cannot be exported, we still use CTool's export.inc
989
      // export definitions.
990
      ctools_include('export');
991
      $this->export_type = EXPORT_IN_DATABASE;
992
      $this->imported = $record->imported;
993
      $this->config = unserialize($record->config);
994
      if (!empty($record->state)) {
995
        $this->state = unserialize($record->state);
996
      }
997
      if (!is_array($this->state)) {
998
        $this->state = array();
999
      }
1000
      if (!empty($record->fetcher_result)) {
1001
        $this->fetcher_result = unserialize($record->fetcher_result);
1002
      }
1003
    }
1004
  }
1005

    
1006
  /**
1007
   * Removes the feed source from the database.
1008
   */
1009
  public function delete() {
1010
    // Alert implementers of FeedsSourceInterface to the fact that we're
1011
    // deleting.
1012
    foreach ($this->importer->plugin_types as $type) {
1013
      $this->importer->$type->sourceDelete($this);
1014
    }
1015
    db_delete('feeds_source')
1016
      ->condition('id', $this->id)
1017
      ->condition('feed_nid', $this->feed_nid)
1018
      ->execute();
1019
    // Remove from schedule.
1020
    $job = array(
1021
      'type' => $this->id,
1022
      'id' => $this->feed_nid,
1023
    );
1024
    JobScheduler::get('feeds_source_import')->remove($job);
1025
    JobScheduler::get('feeds_source_expire')->remove($job);
1026
  }
1027

    
1028
  /**
1029
   * Checks whether or not the source configuration is valid.
1030
   *
1031
   * @return bool
1032
   *   True if it is valid.
1033
   *   False otherwise.
1034
   */
1035
  public function hasValidConfiguration() {
1036
    // If there is no feed nid given, there must be no content type specified.
1037
    $standalone = empty($this->feed_nid) && empty($this->importer->config['content_type']);
1038

    
1039
    // If there is a feed nid given, there must be a content type specified.
1040
    $attached = !empty($this->feed_nid) && !empty($this->importer->config['content_type']);
1041

    
1042
    if ($standalone || $attached) {
1043
      return TRUE;
1044
    }
1045
    return FALSE;
1046
  }
1047

    
1048
  /**
1049
   * Overrides FeedsConfigurable::doesExist().
1050
   *
1051
   * Checks the following:
1052
   * - If the importer is persistent (= defined in code or DB).
1053
   * - If the source is persistent (= defined in DB).
1054
   */
1055
  public function doesExist() {
1056
    return $this->importer->doesExist() && parent::doesExist();
1057
  }
1058

    
1059
  /**
1060
   * Only return source if configuration is persistent and valid.
1061
   *
1062
   * @see FeedsConfigurable::existing()
1063
   */
1064
  public function existing() {
1065
    // Ensure that the source configuration is valid.
1066
    if (!$this->hasValidConfiguration()) {
1067
      throw new FeedsNotExistingException(t('Source configuration not valid.'));
1068
    }
1069

    
1070
    // Ensure that the importer is persistent (= defined in code or DB).
1071
    $this->importer->existing();
1072

    
1073
    // Ensure that the source is persistent (= defined in DB).
1074
    return parent::existing();
1075
  }
1076

    
1077
  /**
1078
   * Returns the configuration for a specific client class.
1079
   *
1080
   * @param FeedsSourceInterface $client
1081
   *   An object that is an implementer of FeedsSourceInterface.
1082
   *
1083
   * @return array
1084
   *   An array stored for $client.
1085
   */
1086
  public function getConfigFor(FeedsSourceInterface $client) {
1087
    $class = get_class($client);
1088
    return isset($this->config[$class]) ? $this->config[$class] : $client->sourceDefaults();
1089
  }
1090

    
1091
  /**
1092
   * Sets the configuration for a specific client class.
1093
   *
1094
   * @param FeedsSourceInterface $client
1095
   *   An object that is an implementer of FeedsSourceInterface.
1096
   * @param array $config
1097
   *   The configuration for $client.
1098
   */
1099
  public function setConfigFor(FeedsSourceInterface $client, array $config) {
1100
    $this->config[get_class($client)] = $config;
1101
  }
1102

    
1103
  /**
1104
   * Return defaults for feed configuration.
1105
   *
1106
   * @return array
1107
   *   The default feed configuration, keyed per Feeds plugin.
1108
   */
1109
  public function configDefaults() {
1110
    // Collect information from plugins.
1111
    $defaults = array();
1112
    foreach ($this->importer->plugin_types as $type) {
1113
      if ($this->importer->$type->hasSourceConfig()) {
1114
        $defaults[get_class($this->importer->$type)] = $this->importer->$type->sourceDefaults();
1115
      }
1116
    }
1117
    return $defaults;
1118
  }
1119

    
1120
  /**
1121
   * Override parent::configForm().
1122
   */
1123
  public function configForm(&$form_state) {
1124
    // Collect information from plugins.
1125
    $form = array();
1126
    foreach ($this->importer->plugin_types as $type) {
1127
      if ($this->importer->$type->hasSourceConfig()) {
1128
        $class = get_class($this->importer->$type);
1129
        $config = isset($this->config[$class]) ? $this->config[$class] : array();
1130
        $form[$class] = $this->importer->$type->sourceForm($config);
1131
        $form[$class]['#tree'] = TRUE;
1132
      }
1133
    }
1134
    return $form;
1135
  }
1136

    
1137
  /**
1138
   * Override parent::configFormValidate().
1139
   */
1140
  public function configFormValidate(&$values) {
1141
    foreach ($this->importer->plugin_types as $type) {
1142
      $class = get_class($this->importer->$type);
1143
      if (isset($values[$class]) && $this->importer->$type->hasSourceConfig()) {
1144
        $this->importer->$type->sourceFormValidate($values[$class]);
1145
      }
1146
    }
1147
  }
1148

    
1149
  /**
1150
   * Writes to feeds log.
1151
   */
1152
  public function log($type, $message, $variables = array(), $severity = WATCHDOG_NOTICE) {
1153
    feeds_log($this->id, $this->feed_nid, $type, $message, $variables, $severity);
1154
  }
1155

    
1156
  /**
1157
   * Background job helper. Starts a background job using the Drupal queue.
1158
   *
1159
   * @param string $method
1160
   *   Method to execute on importer; one of 'import' or 'clear'.
1161
   *
1162
   * @see FeedsSource::startImport()
1163
   * @see FeedsSource::startClear()
1164
   */
1165
  protected function startBackgroundJob($method) {
1166
    $job = array(
1167
      'type' => $this->id,
1168
      'id' => $this->feed_nid,
1169
    );
1170
    $queue = DrupalQueue::get('feeds_source_' . $method);
1171
    $queue->createItem($job);
1172

    
1173
    switch ($method) {
1174
      case 'import':
1175
        $state = $this->state(FEEDS_PARSE);
1176
        break;
1177

    
1178
      case 'clear':
1179
        $state = $this->state(FEEDS_PROCESS_CLEAR);
1180
        break;
1181

    
1182
      case 'expire':
1183
        $state = $this->state(FEEDS_PROCESS_EXPIRE);
1184
        break;
1185
    }
1186

    
1187
    if (isset($state)) {
1188
      $state->progress = 0;
1189
      $this->save();
1190
    }
1191
  }
1192

    
1193
  /**
1194
   * Batch API helper. Starts a Batch API job.
1195
   *
1196
   * @param string $title
1197
   *   Title to show to user when executing batch.
1198
   * @param string $method
1199
   *   Method to execute on importer; one of 'import' or 'clear'.
1200
   *
1201
   * @see FeedsSource::startImport()
1202
   * @see FeedsSource::startClear()
1203
   * @see feeds_batch()
1204
   */
1205
  protected function startBatchAPIJob($title, $method) {
1206
    $batch = array(
1207
      'title' => $title,
1208
      'operations' => array(
1209
        array('feeds_batch', array($method, $this->id, $this->feed_nid)),
1210
      ),
1211
      'progress_message' => '',
1212
    );
1213
    batch_set($batch);
1214
  }
1215

    
1216
  /**
1217
   * Acquires a lock for this source.
1218
   *
1219
   * @throws FeedsLockException
1220
   *   If a lock for the requested job could not be acquired.
1221
   */
1222
  protected function acquireLock() {
1223
    if (!lock_acquire("feeds_source_{$this->id}_{$this->feed_nid}", 60.0)) {
1224
      throw new FeedsLockException(t('Cannot acquire lock for source @id / @feed_nid.', array('@id' => $this->id, '@feed_nid' => $this->feed_nid)));
1225
    }
1226

    
1227
    // Switch account.
1228
    $this->switchAccount();
1229
  }
1230

    
1231
  /**
1232
   * Releases a lock for this source.
1233
   */
1234
  protected function releaseLock() {
1235
    lock_release("feeds_source_{$this->id}_{$this->feed_nid}");
1236

    
1237
    // Switch back to original account.
1238
    $this->switchBack();
1239
  }
1240

    
1241
  /**
1242
   * Switches account to the feed owner or user 1.
1243
   *
1244
   * To the feed owner is switched if the importer is attached to a content
1245
   * type. When using the standalone form, there is no feed owner, so then a
1246
   * switch to user 1 happens instead.
1247
   */
1248
  protected function switchAccount() {
1249
    // Use author of feed node.
1250
    if ($this->feed_nid) {
1251
      $node = node_load($this->feed_nid);
1252
      if (!empty($node->uid)) {
1253
        $account = user_load($node->uid);
1254
      }
1255
    }
1256

    
1257
    // If the owner of the feed node is anonymous or if the importer is not
1258
    // attached to a content type, pick user 1 instead.
1259
    if (empty($account)) {
1260
      $account = user_load(1);
1261
    }
1262

    
1263
    $this->accountSwitcher->switchTo($account);
1264
  }
1265

    
1266
  /**
1267
   * Switches back to the original user.
1268
   */
1269
  protected function switchBack() {
1270
    $this->accountSwitcher->switchBack();
1271
  }
1272

    
1273
  /**
1274
   * Implements FeedsConfigurable::dependencies().
1275
   */
1276
  public function dependencies() {
1277
    $dependencies = parent::dependencies();
1278
    return array_merge($dependencies, $this->importer()->dependencies());
1279
  }
1280

    
1281
}