Projet

Général

Profil

Paste
Télécharger (36,2 ko) Statistiques
| Branche: | Révision:

root / drupal7 / sites / all / modules / feeds / plugins / FeedsProcessor.inc @ 41cc1b08

1
<?php
2

    
3
/**
4
 * @file
5
 * Contains FeedsProcessor and related classes.
6
 */
7

    
8
// Update mode for existing items.
9
define('FEEDS_SKIP_EXISTING', 0);
10
define('FEEDS_REPLACE_EXISTING', 1);
11
define('FEEDS_UPDATE_EXISTING', 2);
12
// Options for handling content in Drupal but not in source data.
13
define('FEEDS_SKIP_NON_EXISTENT', 'skip');
14
define('FEEDS_DELETE_NON_EXISTENT', 'delete');
15

    
16
// Default limit for creating items on a page load, not respected by all
17
// processors.
18
define('FEEDS_PROCESS_LIMIT', 50);
19

    
20
/**
21
 * Thrown if a validation fails.
22
 */
23
class FeedsValidationException extends Exception {}
24

    
25
/**
26
 * Thrown if a an access check fails.
27
 */
28
class FeedsAccessException extends Exception {}
29

    
30
/**
31
 * Abstract class, defines interface for processors.
32
 */
33
abstract class FeedsProcessor extends FeedsPlugin {
34

    
35
  /**
36
   * Implements FeedsPlugin::pluginType().
37
   */
38
  public function pluginType() {
39
    return 'processor';
40
  }
41

    
42
  /**
43
   * @defgroup entity_api_wrapper Entity API wrapper.
44
   */
45

    
46
  /**
47
   * Entity type this processor operates on.
48
   */
49
  public abstract function entityType();
50

    
51
  /**
52
   * Bundle type this processor operates on.
53
   *
54
   * Defaults to the entity type for entities that do not define bundles.
55
   *
56
   * @return string|NULL
57
   *   The bundle type this processor operates on, or NULL if it is undefined.
58
   */
59
  public function bundle() {
60
    return $this->config['bundle'];
61
  }
62

    
63
  /**
64
   * Provides a list of bundle options for use in select lists.
65
   *
66
   * @return array
67
   *   A keyed array of bundle => label.
68
   */
69
  public function bundleOptions() {
70
    $options = array();
71
    foreach (field_info_bundles($this->entityType()) as $bundle => $info) {
72
      if (!empty($info['label'])) {
73
        $options[$bundle] = $info['label'];
74
      }
75
      else {
76
        $options[$bundle] = $bundle;
77
      }
78
    }
79
    return $options;
80
  }
81

    
82
  /**
83
   * Create a new entity.
84
   *
85
   * @param $source
86
   *   The feeds source that spawns this entity.
87
   *
88
   * @return
89
   *   A new entity object.
90
   */
91
  protected abstract function newEntity(FeedsSource $source);
92

    
93
  /**
94
   * Load an existing entity.
95
   *
96
   * @param $source
97
   *   The feeds source that spawns this entity.
98
   * @param $entity_id
99
   *   The unique id of the entity that should be loaded.
100
   *
101
   * @return
102
   *   A new entity object.
103
   *
104
   * @todo We should be able to batch load these, if we found all of the
105
   *   existing ids first.
106
   */
107
  protected function entityLoad(FeedsSource $source, $entity_id) {
108
    if ($this->config['update_existing'] == FEEDS_UPDATE_EXISTING) {
109
      $entities = entity_load($this->entityType(), array($entity_id));
110
      return reset($entities);
111
    }
112

    
113
    $info = $this->entityInfo();
114

    
115
    $args = array(':entity_id' => $entity_id);
116

    
117
    $table = db_escape_table($info['base table']);
118
    $key = db_escape_field($info['entity keys']['id']);
119

    
120
    return db_query("SELECT * FROM {" . $table . "} WHERE $key = :entity_id", $args)->fetchObject();
121
  }
122

    
123
  /**
124
   * Validate an entity.
125
   *
126
   * @throws FeedsValidationException $e
127
   *   If validation fails.
128
   */
129
  protected function entityValidate($entity) {}
130

    
131
  /**
132
   * Access check for saving an enity.
133
   *
134
   * @param $entity
135
   *   Entity to be saved.
136
   *
137
   * @throws FeedsAccessException $e
138
   *   If the access check fails.
139
   */
140
  protected function entitySaveAccess($entity) {}
141

    
142
  /**
143
   * Save an entity.
144
   *
145
   * @param $entity
146
   *   Entity to be saved.
147
   */
148
  protected abstract function entitySave($entity);
149

    
150
  /**
151
   * Delete a series of entities.
152
   *
153
   * @param $entity_ids
154
   *   Array of unique identity ids to be deleted.
155
   */
156
  protected abstract function entityDeleteMultiple($entity_ids);
157

    
158
  /**
159
   * Wrap entity_get_info() into a method so that extending classes can override
160
   * it and more entity information. Allowed additional keys:
161
   *
162
   * 'label plural' ... the plural label of an entity type.
163
   */
164
  protected function entityInfo() {
165
    return entity_get_info($this->entityType());
166
  }
167

    
168
  /**
169
   * @}
170
   */
171

    
172
  /**
173
   * Process the result of the parsing stage.
174
   *
175
   * @param FeedsSource $source
176
   *   Source information about this import.
177
   * @param FeedsParserResult $parser_result
178
   *   The result of the parsing stage.
179
   */
180
  public function process(FeedsSource $source, FeedsParserResult $parser_result) {
181
    $state = $source->state(FEEDS_PROCESS);
182
    if (!isset($state->removeList) && $parser_result->items) {
183
      $this->initEntitiesToBeRemoved($source, $state);
184
    }
185

    
186
    while ($item = $parser_result->shiftItem()) {
187

    
188
      // Check if this item already exists.
189
      $entity_id = $this->existingEntityId($source, $parser_result);
190
      // If it's included in the feed, it must not be removed on clean.
191
      if ($entity_id) {
192
        unset($state->removeList[$entity_id]);
193
      }
194
      $skip_existing = $this->config['update_existing'] == FEEDS_SKIP_EXISTING;
195

    
196
      module_invoke_all('feeds_before_update', $source, $item, $entity_id);
197

    
198
      // If it exists, and we are not updating, pass onto the next item.
199
      if ($entity_id && $skip_existing) {
200
        continue;
201
      }
202

    
203
      try {
204

    
205
        $hash = $this->hash($item);
206
        $changed = ($hash !== $this->getHash($entity_id));
207
        $force_update = $this->config['skip_hash_check'];
208

    
209
        // Do not proceed if the item exists, has not changed, and we're not
210
        // forcing the update.
211
        if ($entity_id && !$changed && !$force_update) {
212
          continue;
213
        }
214

    
215
        // Load an existing entity.
216
        if ($entity_id) {
217
          $entity = $this->entityLoad($source, $entity_id);
218

    
219
          // The feeds_item table is always updated with the info for the most
220
          // recently processed entity. The only carryover is the entity_id.
221
          $this->newItemInfo($entity, $source->feed_nid, $hash);
222
          $entity->feeds_item->entity_id = $entity_id;
223
          $entity->feeds_item->is_new = FALSE;
224
        }
225

    
226
        // Build a new entity.
227
        else {
228
          $entity = $this->newEntity($source);
229
          $this->newItemInfo($entity, $source->feed_nid, $hash);
230
        }
231

    
232
        // Set property and field values.
233
        $this->map($source, $parser_result, $entity);
234
        $this->entityValidate($entity);
235

    
236
        // Allow modules to alter the entity before saving.
237
        module_invoke_all('feeds_presave', $source, $entity, $item, $entity_id);
238
        if (module_exists('rules')) {
239
          rules_invoke_event('feeds_import_'. $source->importer()->id, $entity);
240
        }
241

    
242
        // Enable modules to skip saving at all.
243
        if (!empty($entity->feeds_item->skip)) {
244
          continue;
245
        }
246

    
247
        // This will throw an exception on failure.
248
        $this->entitySaveAccess($entity);
249
        $this->entitySave($entity);
250

    
251
        // Allow modules to perform operations using the saved entity data.
252
        // $entity contains the updated entity after saving.
253
        module_invoke_all('feeds_after_save', $source, $entity, $item, $entity_id);
254

    
255
        // Track progress.
256
        if (empty($entity_id)) {
257
          $state->created++;
258
        }
259
        else {
260
          $state->updated++;
261
        }
262
      }
263

    
264
      // Something bad happened, log it.
265
      catch (Exception $e) {
266
        $state->failed++;
267
        drupal_set_message($e->getMessage(), 'warning');
268
        list($message, $arguments) = $this->createLogEntry($e, $entity, $item);
269
        $source->log('import', $message, $arguments, WATCHDOG_ERROR);
270
      }
271
    }
272

    
273
    // Set messages if we're done.
274
    if ($source->progressImporting() != FEEDS_BATCH_COMPLETE) {
275
      return;
276
    }
277
    // Remove not included items if needed.
278
    // It depends on the implementation of the clean() method what will happen
279
    // to items that were no longer in the source.
280
    $this->clean($state);
281
    $info = $this->entityInfo();
282
    $tokens = array(
283
      '@entity' => strtolower($info['label']),
284
      '@entities' => strtolower($info['label plural']),
285
    );
286
    $messages = array();
287
    if ($state->created) {
288
      $messages[] = array(
289
       'message' => format_plural(
290
          $state->created,
291
          'Created @number @entity.',
292
          'Created @number @entities.',
293
          array('@number' => $state->created) + $tokens
294
        ),
295
      );
296
    }
297
    if ($state->updated) {
298
      $messages[] = array(
299
       'message' => format_plural(
300
          $state->updated,
301
          'Updated @number @entity.',
302
          'Updated @number @entities.',
303
          array('@number' => $state->updated) + $tokens
304
        ),
305
      );
306
    }
307
    if ($state->unpublished) {
308
      $messages[] = array(
309
        'message' => format_plural(
310
            $state->unpublished,
311
            'Unpublished @number @entity.',
312
            'Unpublished @number @entities.',
313
            array('@number' => $state->unpublished) + $tokens
314
        ),
315
      );
316
    }
317
    if ($state->blocked) {
318
      $messages[] = array(
319
        'message' => format_plural(
320
          $state->blocked,
321
          'Blocked @number @entity.',
322
          'Blocked @number @entities.',
323
          array('@number' => $state->blocked) + $tokens
324
        ),
325
      );
326
    }
327
    if ($state->deleted) {
328
      $messages[] = array(
329
       'message' => format_plural(
330
          $state->deleted,
331
          'Removed @number @entity.',
332
          'Removed @number @entities.',
333
          array('@number' => $state->deleted) + $tokens
334
        ),
335
      );
336
    }
337
    if ($state->failed) {
338
      $messages[] = array(
339
       'message' => format_plural(
340
          $state->failed,
341
          'Failed importing @number @entity.',
342
          'Failed importing @number @entities.',
343
          array('@number' => $state->failed) + $tokens
344
        ),
345
        'level' => WATCHDOG_ERROR,
346
      );
347
    }
348
    if (empty($messages)) {
349
      $messages[] = array(
350
        'message' => t('There are no new @entities.', array('@entities' => strtolower($info['label plural']))),
351
      );
352
    }
353
    foreach ($messages as $message) {
354
      drupal_set_message($message['message']);
355
      $source->log('import', $message['message'], array(), isset($message['level']) ? $message['level'] : WATCHDOG_INFO);
356
    }
357
  }
358

    
359
  /**
360
   * Initialize the array of entities to remove with all existing entities
361
   * previously imported from the source.
362
   *
363
   * @param FeedsSource $source
364
   *   Source information about this import.
365
   * @param FeedsState $state
366
   *   The FeedsState object for the given stage.
367
   */
368
  protected function initEntitiesToBeRemoved(FeedsSource $source, FeedsState $state) {
369
    $state->removeList = array();
370
    // We fill it only if needed.
371
    if (!isset($this->config['update_non_existent']) || $this->config['update_non_existent'] == FEEDS_SKIP_NON_EXISTENT) {
372
      return;
373
    }
374
    // Build base select statement.
375
    $info = $this->entityInfo();
376
    $id_key = db_escape_field($info['entity keys']['id']);
377
    $select = db_select($info['base table'], 'e');
378
    $select->addField('e', $info['entity keys']['id'], 'entity_id');
379
    $select->join(
380
      'feeds_item',
381
      'fi',
382
      'e.' . $id_key . ' = fi.entity_id AND fi.entity_type = :entity_type', array(
383
        ':entity_type' => $this->entityType(),
384
    ));
385

    
386
    $select->condition('fi.id', $this->id);
387
    $select->condition('fi.feed_nid', $source->feed_nid);
388
    // No need to remove item again if same method of removal was already used.
389
    $select->condition('fi.hash', $this->config['update_non_existent'], '<>');
390
    $entities = $select->execute();
391
    // If not found on process, existing entities will be deleted.
392
    foreach ($entities as $entity) {
393
      // Obviously, items which are still included in the source feed will be
394
      // removed from this array when processed.
395
      $state->removeList[$entity->entity_id] = $entity->entity_id;
396
    }
397
  }
398

    
399
  /**
400
   * Deletes entities which were not found during processing.
401
   *
402
   * @todo batch delete?
403
   *
404
   * @param FeedsState $state
405
   *   The FeedsState object for the given stage.
406
   */
407
  protected function clean(FeedsState $state) {
408
    // We clean only if needed.
409
    if (!isset($this->config['update_non_existent']) || $this->config['update_non_existent'] == FEEDS_SKIP_NON_EXISTENT) {
410
      return;
411
    }
412

    
413
    $total = count($state->removeList);
414
    if ($total) {
415
      $this->entityDeleteMultiple($state->removeList);
416
      $state->deleted += $total;
417
    }
418
  }
419

    
420
  /**
421
   * Remove all stored results or stored results up to a certain time for a
422
   * source.
423
   *
424
   * @param FeedsSource $source
425
   *   Source information for this expiry. Implementers should only delete items
426
   *   pertaining to this source. The preferred way of determining whether an
427
   *   item pertains to a certain souce is by using $source->feed_nid. It is the
428
   *   processor's responsibility to store the feed_nid of an imported item in
429
   *   the processing stage.
430
   */
431
  public function clear(FeedsSource $source) {
432
    $state = $source->state(FEEDS_PROCESS_CLEAR);
433

    
434
    // Build base select statement.
435
    $info = $this->entityInfo();
436
    $select = db_select($info['base table'], 'e');
437
    $select->addField('e', $info['entity keys']['id'], 'entity_id');
438
    $select->join(
439
      'feeds_item',
440
      'fi',
441
      "e.{$info['entity keys']['id']} = fi.entity_id AND fi.entity_type = '{$this->entityType()}'");
442
    $select->condition('fi.id', $this->id);
443
    $select->condition('fi.feed_nid', $source->feed_nid);
444

    
445
    // If there is no total, query it.
446
    if (!$state->total) {
447
      $state->total = $select->countQuery()
448
        ->execute()
449
        ->fetchField();
450
    }
451

    
452
    // Delete a batch of entities.
453
    $entities = $select->range(0, $this->getLimit())->execute();
454
    $entity_ids = array();
455
    foreach ($entities as $entity) {
456
      $entity_ids[$entity->entity_id] = $entity->entity_id;
457
    }
458
    $this->entityDeleteMultiple($entity_ids);
459

    
460
    // Report progress, take into account that we may not have deleted as
461
    // many items as we have counted at first.
462
    if (count($entity_ids)) {
463
      $state->deleted += count($entity_ids);
464
      $state->progress($state->total, $state->deleted);
465
    }
466
    else {
467
      $state->progress($state->total, $state->total);
468
    }
469

    
470
    // Report results when done.
471
    if ($source->progressClearing() == FEEDS_BATCH_COMPLETE) {
472
      if ($state->deleted) {
473
        $message = format_plural(
474
          $state->deleted,
475
          'Deleted @number @entity',
476
          'Deleted @number @entities',
477
          array(
478
            '@number' => $state->deleted,
479
            '@entity' => strtolower($info['label']),
480
            '@entities' => strtolower($info['label plural']),
481
          )
482
        );
483
        $source->log('clear', $message, array(), WATCHDOG_INFO);
484
        drupal_set_message($message);
485
      }
486
      else {
487
        drupal_set_message(t('There are no @entities to be deleted.', array('@entities' => $info['label plural'])));
488
      }
489
    }
490
  }
491

    
492
  /*
493
   * Report number of items that can be processed per call.
494
   *
495
   * 0 means 'unlimited'.
496
   *
497
   * If a number other than 0 is given, Feeds parsers that support batching
498
   * will only deliver this limit to the processor.
499
   *
500
   * @see FeedsSource::getLimit()
501
   * @see FeedsCSVParser::parse()
502
   */
503
  public function getLimit() {
504
    return variable_get('feeds_process_limit', FEEDS_PROCESS_LIMIT);
505
  }
506

    
507
  /**
508
   * Deletes feed items older than REQUEST_TIME - $time.
509
   *
510
   * Do not invoke expire on a processor directly, but use
511
   * FeedsSource::expire() instead.
512
   *
513
   * @param FeedsSource $source
514
   *   The source to expire entities for.
515
   *
516
   * @param $time
517
   *   (optional) All items produced by this configuration that are older than
518
   *   REQUEST_TIME - $time should be deleted. If NULL, processor should use
519
   *   internal configuration. Defaults to NULL.
520
   *
521
   * @return float
522
   *   FEEDS_BATCH_COMPLETE if all items have been processed, a float between 0
523
   *   and 0.99* indicating progress otherwise.
524
   *
525
   * @see FeedsSource::expire()
526
   */
527
  public function expire(FeedsSource $source, $time = NULL) {
528
    $state = $source->state(FEEDS_PROCESS_EXPIRE);
529

    
530
    if ($time === NULL) {
531
      $time = $this->expiryTime();
532
    }
533
    if ($time == FEEDS_EXPIRE_NEVER) {
534
      return;
535
    }
536

    
537
    $select = $this->expiryQuery($source, $time);
538

    
539
    // If there is no total, query it.
540
    if (!$state->total) {
541
      $state->total = $select->countQuery()->execute()->fetchField();
542
    }
543

    
544
    // Delete a batch of entities.
545
    $entity_ids = $select->range(0, $this->getLimit())->execute()->fetchCol();
546
    if ($entity_ids) {
547
      $this->entityDeleteMultiple($entity_ids);
548
      $state->deleted += count($entity_ids);
549
      $state->progress($state->total, $state->deleted);
550
    }
551
    else {
552
      $state->progress($state->total, $state->total);
553
    }
554
  }
555

    
556
  /**
557
   * Returns a database query used to select entities to expire.
558
   *
559
   * Processor classes should override this method to set the age portion of the
560
   * query.
561
   *
562
   * @param FeedsSource $source
563
   *   The feed source.
564
   * @param int $time
565
   *   Delete entities older than this.
566
   *
567
   * @return SelectQuery
568
   *   A select query to execute.
569
   *
570
   * @see FeedsNodeProcessor::expiryQuery()
571
   */
572
  protected function expiryQuery(FeedsSource $source, $time) {
573
    // Build base select statement.
574
    $info = $this->entityInfo();
575
    $id_key = db_escape_field($info['entity keys']['id']);
576

    
577
    $select = db_select($info['base table'], 'e');
578
    $select->addField('e', $info['entity keys']['id'], 'entity_id');
579
    $select->join(
580
      'feeds_item',
581
      'fi',
582
      "e.$id_key = fi.entity_id AND fi.entity_type = :entity_type", array(
583
        ':entity_type' => $this->entityType(),
584
    ));
585
    $select->condition('fi.id', $this->id);
586
    $select->condition('fi.feed_nid', $source->feed_nid);
587

    
588
    return $select;
589
  }
590

    
591
  /**
592
   * Counts the number of items imported by this processor.
593
   */
594
  public function itemCount(FeedsSource $source) {
595
    return db_query("SELECT count(*) FROM {feeds_item} WHERE id = :id AND entity_type = :entity_type AND feed_nid = :feed_nid", array(':id' => $this->id, ':entity_type' => $this->entityType(), ':feed_nid' => $source->feed_nid))->fetchField();
596
  }
597

    
598
  /**
599
   * Returns a statically cached version of the target mappings.
600
   *
601
   * @return array
602
   *   The targets for this importer.
603
   */
604
  protected function getCachedTargets() {
605
    $targets = &drupal_static('FeedsProcessor::getCachedTargets', array());
606

    
607
    if (!isset($targets[$this->id])) {
608
      $targets[$this->id] = $this->getMappingTargets();
609
    }
610

    
611
    return $targets[$this->id];
612
  }
613

    
614
  /**
615
   * Returns a statically cached version of the source mappings.
616
   *
617
   * @return array
618
   *   The sources for this importer.
619
   */
620
  protected function getCachedSources() {
621
    $sources = &drupal_static('FeedsProcessor::getCachedSources', array());
622

    
623
    if (!isset($sources[$this->id])) {
624

    
625
      $sources[$this->id] = feeds_importer($this->id)->parser->getMappingSources();
626

    
627
      if (is_array($sources[$this->id])) {
628
        foreach ($sources[$this->id] as $source_key => $source) {
629
          if (empty($source['callback']) || !is_callable($source['callback'])) {
630
            unset($sources[$this->id][$source_key]['callback']);
631
          }
632
        }
633
      }
634
    }
635

    
636
    return $sources[$this->id];
637
  }
638

    
639
  /**
640
   * Execute mapping on an item.
641
   *
642
   * This method encapsulates the central mapping functionality. When an item is
643
   * processed, it is passed through map() where the properties of $source_item
644
   * are mapped onto $target_item following the processor's mapping
645
   * configuration.
646
   *
647
   * For each mapping FeedsParser::getSourceElement() is executed to retrieve
648
   * the source element, then FeedsProcessor::setTargetElement() is invoked
649
   * to populate the target item properly. Alternatively a
650
   * hook_x_targets_alter() may have specified a callback for a mapping target
651
   * in which case the callback is asked to populate the target item instead of
652
   * FeedsProcessor::setTargetElement().
653
   *
654
   * @ingroup mappingapi
655
   *
656
   * @see hook_feeds_parser_sources_alter()
657
   * @see hook_feeds_processor_targets()
658
   * @see hook_feeds_processor_targets_alter()
659
   */
660
  protected function map(FeedsSource $source, FeedsParserResult $result, $target_item = NULL) {
661
    $targets = $this->getCachedTargets();
662

    
663
    if (empty($target_item)) {
664
      $target_item = array();
665
    }
666

    
667
    // Many mappers add to existing fields rather than replacing them. Hence we
668
    // need to clear target elements of each item before mapping in case we are
669
    // mapping on a prepopulated item such as an existing node.
670
    foreach ($this->config['mappings'] as $mapping) {
671
      if (isset($targets[$mapping['target']]['real_target'])) {
672
        $target_item->{$targets[$mapping['target']]['real_target']} = NULL;
673
      }
674
      else {
675
        $target_item->{$mapping['target']} = NULL;
676
      }
677
    }
678

    
679
    // This is where the actual mapping happens: For every mapping we invoke
680
    // the parser's getSourceElement() method to retrieve the value of the
681
    // source element and pass it to the processor's setTargetElement() to stick
682
    // it on the right place of the target item.
683
    foreach ($this->config['mappings'] as $mapping) {
684
      $value = $this->getSourceValue($source, $result, $mapping['source']);
685

    
686
      $this->mapToTarget($source, $mapping['target'], $target_item, $value, $mapping);
687
    }
688

    
689
    return $target_item;
690
  }
691

    
692
  /**
693
   * Returns the values from the parser, or callback.
694
   *
695
   * @param FeedsSource $source
696
   *   The feed source.
697
   * @param FeedsParserResult $result
698
   *   The parser result.
699
   * @param string $source_key
700
   *   The current key being processed.
701
   *
702
   * @return mixed
703
   *   A value, or a list of values.
704
   */
705
  protected function getSourceValue(FeedsSource $source, FeedsParserResult $result, $source_key) {
706
    $sources = $this->getCachedSources();
707

    
708
    if (isset($sources[$source_key]['callback'])) {
709
      return call_user_func($sources[$source_key]['callback'], $source, $result, $source_key);
710
    }
711

    
712
    return feeds_importer($this->id)->parser->getSourceElement($source, $result, $source_key);
713
  }
714

    
715
  /**
716
   * Maps values onto the target item.
717
   *
718
   * @param FeedsSource $source
719
   *   The feed source.
720
   * @param mixed &$target_item
721
   *   The target item to apply values into.
722
   * @param mixed $value
723
   *   A value, or a list of values.
724
   * @param array $mapping
725
   *   The mapping configuration.
726
   */
727
  protected function mapToTarget(FeedsSource $source, $target, &$target_item, $value, array $mapping) {
728
    $targets = $this->getCachedTargets();
729

    
730
    if (isset($targets[$target]['preprocess_callbacks'])) {
731
      foreach ($targets[$target]['preprocess_callbacks'] as $callback) {
732
        call_user_func_array($callback, array($source, $target_item, $target, &$mapping));
733
      }
734
    }
735

    
736
    // Map the source element's value to the target.
737
    // If the mapping specifies a callback method, use the callback instead of
738
    // setTargetElement().
739
    if (isset($targets[$target]['callback'])) {
740

    
741
      // All target callbacks expect an array.
742
      if (!is_array($value)) {
743
        $value = array($value);
744
      }
745

    
746
      call_user_func($targets[$target]['callback'], $source, $target_item, $target, $value, $mapping);
747
    }
748

    
749
    else {
750
      $this->setTargetElement($source, $target_item, $target, $value, $mapping);
751
    }
752
  }
753

    
754
  /**
755
   * Per default, don't support expiry. If processor supports expiry of imported
756
   * items, return the time after which items should be removed.
757
   */
758
  public function expiryTime() {
759
    return FEEDS_EXPIRE_NEVER;
760
  }
761

    
762
  /**
763
   * Declare default configuration.
764
   */
765
  public function configDefaults() {
766
    $info = $this->entityInfo();
767
    $bundle = NULL;
768
    if (empty($info['entity keys']['bundle'])) {
769
      $bundle = $this->entityType();
770
    }
771
    return array(
772
      'mappings' => array(),
773
      'update_existing' => FEEDS_SKIP_EXISTING,
774
      'update_non_existent' => FEEDS_SKIP_NON_EXISTENT,
775
      'input_format' => NULL,
776
      'skip_hash_check' => FALSE,
777
      'bundle' => $bundle,
778
    );
779
  }
780

    
781
  /**
782
   * Overrides parent::configForm().
783
   */
784
  public function configForm(&$form_state) {
785
    $info = $this->entityInfo();
786
    $form = array();
787

    
788
    if (!empty($info['entity keys']['bundle'])) {
789
      $form['bundle'] = array(
790
        '#type' => 'select',
791
        '#options' => $this->bundleOptions(),
792
        '#title' => !empty($info['bundle name']) ? $info['bundle name'] : t('Bundle'),
793
        '#required' => TRUE,
794
        '#default_value' => $this->bundle(),
795
      );
796
    }
797
    else {
798
      $form['bundle'] = array(
799
        '#type' => 'value',
800
        '#value' => $this->entityType(),
801
      );
802
    }
803

    
804
    $tokens = array('@entities' => strtolower($info['label plural']));
805

    
806
    $form['update_existing'] = array(
807
      '#type' => 'radios',
808
      '#title' => t('Update existing @entities', $tokens),
809
      '#description' =>
810
        t('Existing @entities will be determined using mappings that are a "unique target".', $tokens),
811
      '#options' => array(
812
        FEEDS_SKIP_EXISTING => t('Do not update existing @entities', $tokens),
813
        FEEDS_REPLACE_EXISTING => t('Replace existing @entities', $tokens),
814
        FEEDS_UPDATE_EXISTING => t('Update existing @entities', $tokens),
815
      ),
816
      '#default_value' => $this->config['update_existing'],
817
    );
818
    global $user;
819
    $formats = filter_formats($user);
820
    foreach ($formats as $format) {
821
      $format_options[$format->format] = $format->name;
822
    }
823
    $form['skip_hash_check'] = array(
824
      '#type' => 'checkbox',
825
      '#title' => t('Skip hash check'),
826
      '#description' => t('Force update of items even if item source data did not change.'),
827
      '#default_value' => $this->config['skip_hash_check'],
828
    );
829
    $form['input_format'] = array(
830
      '#type' => 'select',
831
      '#title' => t('Text format'),
832
      '#description' => t('Select the default input format for the text fields of the nodes to be created.'),
833
      '#options' => $format_options,
834
      '#default_value' => isset($this->config['input_format']) ? $this->config['input_format'] : 'plain_text',
835
      '#required' => TRUE,
836
    );
837

    
838
    $form['update_non_existent'] = array(
839
      '#type' => 'radios',
840
      '#title' => t('Action to take when previously imported @entities are missing in the feed', $tokens),
841
      '#description' => t('Select how @entities previously imported and now missing in the feed should be updated.', $tokens),
842
      '#options' => array(
843
        FEEDS_SKIP_NON_EXISTENT => t('Skip non-existent @entities', $tokens),
844
        FEEDS_DELETE_NON_EXISTENT => t('Delete non-existent @entities', $tokens),
845
      ),
846
      '#default_value' => $this->config['update_non_existent'],
847
    );
848

    
849
    return $form;
850
  }
851

    
852
  /**
853
   * Get mappings.
854
   */
855
  public function getMappings() {
856
    return isset($this->config['mappings']) ? $this->config['mappings'] : array();
857
  }
858

    
859
  /**
860
   * Declare possible mapping targets that this processor exposes.
861
   *
862
   * @ingroup mappingapi
863
   *
864
   * @return
865
   *   An array of mapping targets. Keys are paths to targets
866
   *   separated by ->, values are TRUE if target can be unique,
867
   *   FALSE otherwise.
868
   */
869
  public function getMappingTargets() {
870

    
871
    // The bundle has not been selected.
872
    if (!$this->bundle()) {
873
      $info = $this->entityInfo();
874
      $bundle_name = !empty($info['bundle name']) ? drupal_strtolower($info['bundle name']) : t('bundle');
875
      $plugin_key = feeds_importer($this->id)->config['processor']['plugin_key'];
876
      $url = url('admin/structure/feeds/' . $this->id . '/settings/' . $plugin_key);
877
      drupal_set_message(t('Please <a href="@url">select a @bundle_name</a>.', array('@url' => $url, '@bundle_name' => $bundle_name)), 'warning', FALSE);
878
    }
879

    
880
    return array(
881
      'url' => array(
882
        'name' => t('URL'),
883
        'description' => t('The external URL of the item. E. g. the feed item URL in the case of a syndication feed. May be unique.'),
884
        'optional_unique' => TRUE,
885
      ),
886
      'guid' => array(
887
        'name' => t('GUID'),
888
        'description' => t('The globally unique identifier of the item. E. g. the feed item GUID in the case of a syndication feed. May be unique.'),
889
        'optional_unique' => TRUE,
890
      ),
891
    );
892
  }
893

    
894
  /**
895
   * Allows other modules to expose targets.
896
   *
897
   * @param array &$targets
898
   *   The existing target array.
899
   */
900
  protected function getHookTargets(array &$targets) {
901
    self::loadMappers();
902

    
903
    $entity_type = $this->entityType();
904
    $bundle = $this->bundle();
905
    $targets += module_invoke_all('feeds_processor_targets', $entity_type, $bundle);
906

    
907
    drupal_alter('feeds_processor_targets', $targets, $entity_type, $bundle);
908
  }
909

    
910
  /**
911
   * Set a concrete target element. Invoked from FeedsProcessor::map().
912
   *
913
   * @ingroup mappingapi
914
   */
915
  public function setTargetElement(FeedsSource $source, $target_item, $target_element, $value) {
916
    switch ($target_element) {
917
      case 'url':
918
      case 'guid':
919
        $target_item->feeds_item->$target_element = $value;
920
        break;
921

    
922
      default:
923
        $target_item->$target_element = $value;
924
        break;
925
    }
926
  }
927

    
928
  /**
929
   * Retrieve the target entity's existing id if available. Otherwise return 0.
930
   *
931
   * @ingroup mappingapi
932
   *
933
   * @param FeedsSource $source
934
   *   The source information about this import.
935
   * @param FeedsParserResult $result
936
   *   A FeedsParserResult object.
937
   *
938
   * @return int
939
   *   The serial id of an entity if found, 0 otherwise.
940
   */
941
  protected function existingEntityId(FeedsSource $source, FeedsParserResult $result) {
942
    $targets = $this->getCachedTargets();
943

    
944
    $entity_id = 0;
945

    
946
    // Iterate through all unique targets and test whether they already exist in
947
    // the database.
948
    foreach ($this->uniqueTargets($source, $result) as $target => $value) {
949
      if ($target === 'guid' || $target === 'url') {
950
        $entity_id = db_select('feeds_item')
951
          ->fields('feeds_item', array('entity_id'))
952
          ->condition('feed_nid', $source->feed_nid)
953
          ->condition('entity_type', $this->entityType())
954
          ->condition('id', $source->id)
955
          ->condition($target, $value)
956
          ->execute()
957
          ->fetchField();
958
      }
959

    
960
      if (!$entity_id && !empty($targets[$target]['unique_callbacks'])) {
961
        if (!is_array($value)) {
962
          $value = array($value);
963
        }
964

    
965
        foreach ($targets[$target]['unique_callbacks'] as $callback) {
966
          if ($entity_id = call_user_func($callback, $source, $this->entityType(), $this->bundle(), $target, $value)) {
967
            // Stop at the first unique ID returned by a callback.
968
            break;
969
          }
970
        }
971
      }
972

    
973
      // Return with the content id found.
974
      if ($entity_id) {
975
        return $entity_id;
976
      }
977
    }
978

    
979
    return $entity_id;
980
  }
981

    
982

    
983
  /**
984
   * Utility function that iterates over a target array and retrieves all
985
   * sources that are unique.
986
   *
987
   * @param $batch
988
   *   A FeedsImportBatch.
989
   *
990
   * @return
991
   *   An array where the keys are target field names and the values are the
992
   *   elements from the source item mapped to these targets.
993
   */
994
  protected function uniqueTargets(FeedsSource $source, FeedsParserResult $result) {
995
    $parser = feeds_importer($this->id)->parser;
996
    $targets = array();
997
    foreach ($this->config['mappings'] as $mapping) {
998
      if (!empty($mapping['unique'])) {
999
        // Invoke the parser's getSourceElement to retrieve the value for this
1000
        // mapping's source.
1001
        $targets[$mapping['target']] = $parser->getSourceElement($source, $result, $mapping['source']);
1002
      }
1003
    }
1004
    return $targets;
1005
  }
1006

    
1007
  /**
1008
   * Adds Feeds specific information on $entity->feeds_item.
1009
   *
1010
   * @param $entity
1011
   *   The entity object to be populated with new item info.
1012
   * @param $feed_nid
1013
   *   The feed nid of the source that produces this entity.
1014
   * @param $hash
1015
   *   The fingerprint of the source item.
1016
   */
1017
  protected function newItemInfo($entity, $feed_nid, $hash = '') {
1018
    $entity->feeds_item = new stdClass();
1019
    $entity->feeds_item->is_new = TRUE;
1020
    $entity->feeds_item->entity_id = 0;
1021
    $entity->feeds_item->entity_type = $this->entityType();
1022
    $entity->feeds_item->id = $this->id;
1023
    $entity->feeds_item->feed_nid = $feed_nid;
1024
    $entity->feeds_item->imported = REQUEST_TIME;
1025
    $entity->feeds_item->hash = $hash;
1026
    $entity->feeds_item->url = '';
1027
    $entity->feeds_item->guid = '';
1028
  }
1029

    
1030
  /**
1031
   * Loads existing entity information and places it on $entity->feeds_item.
1032
   *
1033
   * @param $entity
1034
   *   The entity object to load item info for. Id key must be present.
1035
   *
1036
   * @return
1037
   *   TRUE if item info could be loaded, false if not.
1038
   */
1039
  protected function loadItemInfo($entity) {
1040
    $entity_info = entity_get_info($this->entityType());
1041
    $key = $entity_info['entity keys']['id'];
1042
    if ($item_info = feeds_item_info_load($this->entityType(), $entity->$key)) {
1043
      $entity->feeds_item = $item_info;
1044
      return TRUE;
1045
    }
1046
    return FALSE;
1047
  }
1048

    
1049
  /**
1050
   * Create MD5 hash of item and mappings array.
1051
   *
1052
   * Include mappings as a change in mappings may have an affect on the item
1053
   * produced.
1054
   *
1055
   * @return Always returns a hash, even with empty, NULL, FALSE:
1056
   *  Empty arrays return 40cd750bba9870f18aada2478b24840a
1057
   *  Empty/NULL/FALSE strings return d41d8cd98f00b204e9800998ecf8427e
1058
   */
1059
  protected function hash($item) {
1060
    return hash('md5', serialize($item) . serialize($this->config['mappings']));
1061
  }
1062

    
1063
  /**
1064
   * Retrieves the MD5 hash of $entity_id from the database.
1065
   *
1066
   * @return string
1067
   *   Empty string if no item is found, hash otherwise.
1068
   */
1069
  protected function getHash($entity_id) {
1070

    
1071
    if ($hash = db_query("SELECT hash FROM {feeds_item} WHERE entity_type = :type AND entity_id = :id", array(':type' => $this->entityType(), ':id' => $entity_id))->fetchField()) {
1072
      // Return with the hash.
1073
      return $hash;
1074
    }
1075
    return '';
1076
  }
1077

    
1078
  /**
1079
   * DEPRECATED: Creates a log message for exceptions during import.
1080
   *
1081
   * Don't use this method as it concatenates user variables into the log
1082
   * message, which will pollute the locales_source table when the log message
1083
   * is translated. Use ::createLogEntry instead.
1084
   *
1085
   * @param Exception $e
1086
   *   The exception that was throwned during processing the item.
1087
   * @param $entity
1088
   *   The entity object.
1089
   * @param $item
1090
   *   The parser result for this entity.
1091
   *
1092
   * @return string
1093
   *   The message to log.
1094
   *
1095
   * @deprecated
1096
   *   Use ::createLogEntry instead.
1097
   */
1098
  protected function createLogMessage(Exception $e, $entity, $item) {
1099
    $message = $e->getMessage();
1100
    $message .= '<h3>Original item</h3>';
1101
    // $this->exportObjectVars() already runs check_plain() for us, so we can
1102
    // concatenate here as is.
1103
    $message .= '<pre>' . $this->exportObjectVars($item) . '</pre>';
1104
    $message .= '<h3>Entity</h3>';
1105
    $message .= '<pre>' . $this->exportObjectVars($entity) . '</pre>';
1106

    
1107
    return $message;
1108
  }
1109

    
1110
  /**
1111
   * Creates a log entry for when an exception occured during import.
1112
   *
1113
   * @param Exception $e
1114
   *   The exception that was throwned during processing the item.
1115
   * @param object $entity
1116
   *   The entity object.
1117
   * @param array $item
1118
   *   The parser result for this entity.
1119
   *
1120
   * @return array
1121
   *   The message and arguments to log.
1122
   */
1123
  protected function createLogEntry(Exception $e, $entity, $item) {
1124
    $message = '@exception';
1125
    $message .= '<h3>Original item</h3>';
1126
    $message .= '<pre>!item</pre>';
1127
    $message .= '<h3>Entity</h3>';
1128
    $message .= '<pre>!entity</pre>';
1129
    $arguments = array(
1130
      '@exception' => $e->getMessage(),
1131
      // $this->exportObjectVars() already runs check_plain() for us, so we can
1132
      // use the "!" placeholder.
1133
      '!item' => $this->exportObjectVars($item),
1134
      '!entity' => $this->exportObjectVars($entity),
1135
    );
1136

    
1137
    return array($message, $arguments);
1138
  }
1139

    
1140
  /**
1141
   * Returns a string representation of an object or array for log messages.
1142
   *
1143
   * @param object|array $object
1144
   *   The object to convert.
1145
   *
1146
   * @return string
1147
   *   The sanitized string representation of the object.
1148
   */
1149
  protected function exportObjectVars($object) {
1150
    include_once DRUPAL_ROOT . '/includes/utility.inc';
1151

    
1152
    $out = is_array($object) ? $object : get_object_vars($object);
1153
    $out = array_filter($out, 'is_scalar');
1154

    
1155
    foreach ($out as $key => $value) {
1156
      if (is_string($value)) {
1157
        $out[$key] = truncate_utf8($value, 100, FALSE, TRUE);
1158
      }
1159
    }
1160

    
1161
    if (is_array($object)) {
1162
      return check_plain(drupal_var_export($out));
1163
    }
1164

    
1165
    return check_plain(drupal_var_export((object) $out));
1166
  }
1167

    
1168
  /**
1169
   * Overrides FeedsPlugin::dependencies().
1170
   */
1171
  public function dependencies() {
1172
    $dependencies = parent::dependencies();
1173

    
1174
    // Find out which module defined the entity type.
1175
    $info = $this->entityInfo();
1176
    if (isset($info['module'])) {
1177
      $dependencies[$info['module']] = $info['module'];
1178
    }
1179

    
1180
    return $dependencies;
1181
  }
1182

    
1183
}
1184

    
1185
class FeedsProcessorBundleNotDefined extends Exception {}