ikucheriavenko
8/23/2017 - 8:13 AM

ObserverNotificationConsumer.php


        collect_observer_notification:
            connection:       default
            exchange_options: {name: 'app.e.collect_observer_notification', type: 'direct'}
            queue_options:
                name: 'lycan.q.collect_observer_notification'
#                arguments: {'x-message-ttl': ['I', 172800000]}
#                routing_keys:
#                    - lycan.provider.push.listing.#
            callback:   app.rabbit.consumer.collect_observer_notification
            qos_options:      { prefetch_size: 0, prefetch_count: 10, global: false}
            enable_logger: true

<?php
namespace AppBundle\Service\Notification;
use AppBundle\Entity\Notification;
use AppBundle\Entity\NotificationReport;
use AppBundle\Model\Notification\Observer;
use AppBundle\Monolog\ConsoleLogger;
use Carbon\Carbon;
use Doctrine\Common\Collections\ArrayCollection;
use Doctrine\Common\Collections\Collection;
use Doctrine\ORM\EntityManagerInterface;
use Gedmo\Loggable\Entity\LogEntry;
use Lycan\Providers\CoreBundle\Consumer\Base\ConsumerBase;
use PhpAmqpLib\Message\AMQPMessage;
/**
 * Class ObserverNotificationConsumer
 * @package AppBundle\Service\Notification
 */
class ObserverNotificationConsumer extends ConsumerBase implements NotificationQueue
{
    public static $maxObservedElementsPerChunk = 500;
    public static $partitionChunkSize = 250;
    private $observerRegister;
    private $observableRegister;
    /**
     * ObserverNotificationConsumer constructor.
     *
     * @param ObserverRegister          $observerRegister
     * @param ObservableRegister        $observableRegister
     * @param EntityManagerInterface    $om
     * @param ConsoleLogger             $logger
     */
    public function __construct(
        ObserverRegister $observerRegister,
        ObservableRegister $observableRegister,
        EntityManagerInterface $om,
        ConsoleLogger $logger
    ) {
        parent::__construct($logger, $om);
        $this->observerRegister = $observerRegister;
        $this->observableRegister = $observableRegister;
    }
    /**
     * @param Notification $notification
     *
     * @return void
     */
    public function addNotification(Notification $notification)
    {
        /** @var Observer $observer */
        $observer = $this->observerRegister->getObserverByClassname($notification->getObserverClass());
        if (! $observer) {
            throw new \RuntimeException("{$notification->getObserverClass()} has not been registered as Observer");
        }
        $observedFields = $this->observableRegister->getObserveFieldsByObservableClassname($notification->getObservableClass());
        if (empty($observedFields)) {
            // must not take place - observable without observed fields
            return;
        }
        /** @var \ReflectionProperty $observedField */
        foreach ($observedFields as $observedField) {
            $observable = $this->em
                ->getRepository($notification->getObservableClass())
                ->find($notification->getObservableId());
            $observedEntities = call_user_func([$observable, 'get' . ucfirst($observedField->getName())]);
            if (! $observedEntities instanceof Collection) {
                // another cases are not implemented
                continue;
            }
            $observedEntityClass = get_class($observedEntities->get(0));
            if ($observedEntities->count() > self::$maxObservedElementsPerChunk) {
                // send to RabbitMQ
                $msg = [
                    'observableClass'       => $notification->getObservableClass(),
                    'observable'            => (string) $observable->getId(),
                    'observableCallback'    => 'get' . ucfirst($observedField->getName()),
                    'observedEntityClass'   => $observedEntityClass,
                    'notificationId'        => (string) $notification->getId(),
                ];
                $this->container->get('app.rabbit.producer.collect_observer_notification')->publish(serialize($msg));
            } else {
                $notificationReport = $this->prepareNotificationReport($notification->getId(), $observedEntityClass);
                $this->analyzeObservedEntities($observedEntities, $observedEntityClass, $notificationReport);
                $this->sendNotificationReportToObserver($notificationReport);
            }
        }
    }
    /**
     * @inheritdoc
     */
    public function execute(AMQPMessage $msg)
    {
        $message = unserialize($msg->body);
        $this->_preExecute($message);
        //todo: is it needed?
//        $batchLogger = $this->_getBatchLogger($message);
        $observable = $this->em->getRepository($message['observableClass'])->find($message['observable']);
        $observedEntities = call_user_func([$observable, $message['observableCallback']]);
        if (! $observedEntities instanceof Collection) {
            // another cases are not implemented
            return false;
        }
        if ($observedEntities->count() > 0) {
            if (
                $observedEntities->count() > self::$maxObservedElementsPerChunk
                    && !isset($message['partitionIndex'])
            ) {
                $partitionsNeeded = (int) round( ceil( $observedEntities->count() / self::$partitionChunkSize ), 0 );
                foreach (range(0, $partitionsNeeded) as $partIndex => $p) {
                    $message["partitionIndex"] = $partIndex;
                    $message["partitionChunkSize"] = self::$partitionChunkSize;
                    $this->container->get('app.rabbit.producer.collect_observer_notification')->publish(serialize($message));
                    return true;
                }
            }
            if (isset($message['partitionIndex'])) {
                $observedEntities = $observedEntities->slice(
                    $message['partitionIndex'] * self::$partitionChunkSize,
                    self::$partitionChunkSize
                );
            }
            $notificationReport = $this->prepareNotificationReport($message['notificationId'], $message['observedEntityClass']);
            $this->analyzeObservedEntities(
                new ArrayCollection($observedEntities),
                $message['observedEntityClass'],
                $notificationReport
            );
        } else {
            $notificationReport = $this->prepareNotificationReport($message['notificationId'], $message['observedEntityClass']);
            $this->sendNotificationReportToObserver($notificationReport);
        }
        return true;
    }
    /**
     * @param $notificationId
     * @param $observedEntityClass
     *
     * @return NotificationReport
     */
    private function prepareNotificationReport($notificationId, $observedEntityClass)
    {
        $notification = $this->em->getRepository(Notification::class)->find($notificationId);
        $lastNotificationRun = $notification->getLastRun() ?: $notification->getCreatedAt();
        $notificationReport = $this->em->getRepository(NotificationReport::class)->findOneBy([
            'notification'          => $notification,
            'includesChangesFrom'   => $lastNotificationRun,
            'observedEntityClass'   => $observedEntityClass,
        ]);
        if (null == $notificationReport) {
            $notificationReport = new NotificationReport();
            $notificationReport
                ->setNotification($notification)
                ->setIncludesChangesFrom($lastNotificationRun)
                ->setApplication($notification->getApplication())
                ->setObservedEntityClass($observedEntityClass);
            $this->em->persist($notificationReport);
            $this->em->flush($notificationReport);
        }
        return $notificationReport;
    }
    /**
     * @param Collection            $observedEntities
     * @param string                $observedClass
     * @param NotificationReport    $notificationReport
     *
     * @return void
     */
    private function analyzeObservedEntities(
        Collection $observedEntities,
        $observedClass,
        NotificationReport $notificationReport
    ) {
        $isReportChanged = false;
        foreach ($observedEntities as $observedEntity) {
            $logs = $this->em
                ->getRepository(LogEntry::class)
                ->getLogEntriesAfterDate(
                    $observedEntity->getId(),
                    $observedClass,
                    $notificationReport->getIncludesChangesFrom(),
                    1
                );
            if (empty($logs)) {
                continue;
            }
            $notificationReport->addChangedObservedEntity($observedEntity->getId());
            $isReportChanged = true;
        }
        if ($isReportChanged) {
            $this->em->persist($notificationReport);
            $this->em->flush($notificationReport);
        }
    }
    /**
     * @param NotificationReport $notificationReport
     *
     * @return void
     */
    private function sendNotificationReportToObserver(NotificationReport $notificationReport)
    {
        $notification = $notificationReport->getNotification();
        /** @var Observer $observer */
        $observer = $this->observerRegister->getObserverByClassname($notification->getObserverClass());
        $this->markSendTime($notificationReport->getNotification());
        $observer->onNotification($notificationReport);
    }
    /**
     * @param Notification $notification
     *
     * @return void
     */
    private function markSendTime(Notification $notification)
    {
        $notification->setLastRun(Carbon::now());
        $this->em->persist($notification);
        $this->em->flush();
    }
}