vendor/pimcore/portal-engine/src/EventSubscriber/BatchTaskSubscriber.php line 99

Open in your IDE?
  1. <?php
  2. /**
  3.  * Pimcore
  4.  *
  5.  * This source file is available under following license:
  6.  * - Pimcore Commercial License (PCL)
  7.  *
  8.  *  @copyright  Copyright (c) Pimcore GmbH (http://www.pimcore.org)
  9.  *  @license    http://www.pimcore.org/license     PCL
  10.  */
  11. namespace Pimcore\Bundle\PortalEngineBundle\EventSubscriber;
  12. use Pimcore\Bundle\PortalEngineBundle\Entity\BatchTask;
  13. use Pimcore\Bundle\PortalEngineBundle\Message\BatchTask\Interfaces\BatchTaskMessageInterface;
  14. use Pimcore\Bundle\PortalEngineBundle\Message\BatchTask\Interfaces\SequentialBatchTaskMessageInterface;
  15. use Pimcore\Bundle\PortalEngineBundle\Message\BatchTask\Interfaces\SplittedBatchTaskMessageInterface;
  16. use Pimcore\Bundle\PortalEngineBundle\Message\BatchTask\Interfaces\TriggerFinishedMessageBatchTaskMessageInterface;
  17. use Pimcore\Bundle\PortalEngineBundle\Service\BatchTask\BatchTaskService;
  18. use Symfony\Component\EventDispatcher\EventSubscriberInterface;
  19. use Symfony\Component\HttpKernel\Event\TerminateEvent;
  20. use Symfony\Component\Messenger\Event\WorkerMessageFailedEvent;
  21. use Symfony\Component\Messenger\Event\WorkerMessageHandledEvent;
  22. use Symfony\Component\Messenger\MessageBusInterface;
  23. /**
  24.  * Class IndexUpdateListener
  25.  *
  26.  * @package Pimcore\Bundle\PortalEngineBundle\EventListener
  27.  */
  28. class BatchTaskSubscriber implements EventSubscriberInterface
  29. {
  30.     /**
  31.      * @var BatchTaskService
  32.      */
  33.     protected $batchTaskService;
  34.     /**
  35.      * @var MessageBusInterface
  36.      */
  37.     protected $messageBus;
  38.     /**
  39.      * @var BatchTask
  40.      */
  41.     protected $terminateBatchTask;
  42.     /**
  43.      * @param BatchTaskService $batchTaskService
  44.      * @param MessageBusInterface $messengerBusPortalEngine
  45.      */
  46.     public function __construct(BatchTaskService $batchTaskServiceMessageBusInterface $messengerBusPortalEngine)
  47.     {
  48.         $this->batchTaskService $batchTaskService;
  49.         $this->messageBus $messengerBusPortalEngine;
  50.     }
  51.     public static function getSubscribedEvents()
  52.     {
  53.         return [
  54.             WorkerMessageFailedEvent::class => 'onBatchTaskMessageFailed',
  55.             WorkerMessageHandledEvent::class => 'onWorkerMessageHandled',
  56.             TerminateEvent::class => 'onTerminate',
  57.         ];
  58.     }
  59.     /**
  60.      * Mark batch tasks with failed items as finished as otherwise they will run forever.
  61.      *
  62.      * @param WorkerMessageFailedEvent $event
  63.      *
  64.      * @throws \Doctrine\DBAL\DBALException
  65.      */
  66.     public function onBatchTaskMessageFailed(WorkerMessageFailedEvent $event)
  67.     {
  68.         $message $event->getEnvelope()->getMessage();
  69.         if (!$message instanceof BatchTaskMessageInterface) {
  70.             return;
  71.         }
  72.         if ($event->willRetry()) {
  73.             return;
  74.         }
  75.         if (!$batchTask $this->batchTaskService->getTaskById($message->getTaskId())) {
  76.             return;
  77.         }
  78.         foreach (array_keys($message->getItems()) as $itemIndex) {
  79.             if (!$this->batchTaskService->isItemIndexProcessed($batchTask$itemIndex)) {
  80.                 $this->batchTaskService->markItemIndexAsProcessed($batchTask$itemIndex);
  81.             }
  82.         }
  83.         $this->checkBatchTaskFinished($batchTask$message);
  84.     }
  85.     public function onWorkerMessageHandled(WorkerMessageHandledEvent $event)
  86.     {
  87.         $message $event->getEnvelope()->getMessage();
  88.         if (!$message instanceof BatchTaskMessageInterface) {
  89.             return;
  90.         }
  91.         if ($message instanceof SplittedBatchTaskMessageInterface) {
  92.             return;
  93.         }
  94.         if (!$batchTask $this->batchTaskService->getTaskById($message->getTaskId())) {
  95.             return;
  96.         }
  97.         if ($message instanceof SequentialBatchTaskMessageInterface && $message->hasRemainingItems()) {
  98.             $remainingMessage $message->createRemainingMessage($this->batchTaskService);
  99.             $this->messageBus->dispatch($remainingMessage);
  100.             return;
  101.         }
  102.         $this->checkBatchTaskFinished($batchTask$message);
  103.     }
  104.     public function onTerminate(TerminateEvent $event)
  105.     {
  106.         $this->batchTaskService->terminateBatchTask();
  107.     }
  108.     public function setTerminateBatchTask(BatchTask $batchTask)
  109.     {
  110.         $this->terminateBatchTask $batchTask;
  111.     }
  112.     protected function checkBatchTaskFinished(BatchTask $batchTaskBatchTaskMessageInterface $message)
  113.     {
  114.         $this->batchTaskService->checkBatchTaskFinished($batchTask);
  115.         if ($message instanceof TriggerFinishedMessageBatchTaskMessageInterface) {
  116.             $finishedMessage $message->createFinishedMessage();
  117.             $this->messageBus->dispatch($finishedMessage);
  118.         }
  119.     }
  120. }