<?php
namespace AppBundle\Service\Notification;
use AppBundle\Entity\Notification;
use AppBundle\Entity\NotificationReport;
use AppBundle\Model\Notification\Observer;
use AppBundle\Monolog\ConsoleLogger;
use Carbon\Carbon;
use Doctrine\Common\Collections\ArrayCollection;
use Doctrine\Common\Collections\Collection;
use Doctrine\ORM\EntityManagerInterface;
use Gedmo\Loggable\Entity\LogEntry;
use Lycan\Providers\CoreBundle\Consumer\Base\ConsumerBase;
use PhpAmqpLib\Message\AMQPMessage;
/**
* Class ObserverNotificationConsumer
* @package AppBundle\Service\Notification
*/
class ObserverNotificationConsumer extends ConsumerBase implements NotificationQueue
{
public static $maxObservedElementsPerChunk = 500;
public static $partitionChunkSize = 250;
private $observerRegister;
private $observableRegister;
/**
* ObserverNotificationConsumer constructor.
*
* @param ObserverRegister $observerRegister
* @param ObservableRegister $observableRegister
* @param EntityManagerInterface $om
* @param ConsoleLogger $logger
*/
public function __construct(
ObserverRegister $observerRegister,
ObservableRegister $observableRegister,
EntityManagerInterface $om,
ConsoleLogger $logger
) {
parent::__construct($logger, $om);
$this->observerRegister = $observerRegister;
$this->observableRegister = $observableRegister;
}
/**
* @param Notification $notification
*
* @return void
*/
public function addNotification(Notification $notification)
{
/** @var Observer $observer */
$observer = $this->observerRegister->getObserverByClassname($notification->getObserverClass());
if (! $observer) {
throw new \RuntimeException("{$notification->getObserverClass()} has not been registered as Observer");
}
$observedFields = $this->observableRegister->getObserveFieldsByObservableClassname($notification->getObservableClass());
if (empty($observedFields)) {
// must not take place - observable without observed fields
return;
}
/** @var \ReflectionProperty $observedField */
foreach ($observedFields as $observedField) {
$observable = $this->em
->getRepository($notification->getObservableClass())
->find($notification->getObservableId());
$observedEntities = call_user_func([$observable, 'get' . ucfirst($observedField->getName())]);
if (! $observedEntities instanceof Collection) {
// another cases are not implemented
continue;
}
$observedEntityClass = get_class($observedEntities->get(0));
if ($observedEntities->count() > self::$maxObservedElementsPerChunk) {
// send to RabbitMQ
$msg = [
'observableClass' => $notification->getObservableClass(),
'observable' => (string) $observable->getId(),
'observableCallback' => 'get' . ucfirst($observedField->getName()),
'observedEntityClass' => $observedEntityClass,
'notificationId' => (string) $notification->getId(),
];
$this->container->get('app.rabbit.producer.collect_observer_notification')->publish(serialize($msg));
} else {
$notificationReport = $this->prepareNotificationReport($notification->getId(), $observedEntityClass);
$this->analyzeObservedEntities($observedEntities, $observedEntityClass, $notificationReport);
$this->sendNotificationReportToObserver($notificationReport);
}
}
}
/**
* @inheritdoc
*/
public function execute(AMQPMessage $msg)
{
$message = unserialize($msg->body);
$this->_preExecute($message);
//todo: is it needed?
// $batchLogger = $this->_getBatchLogger($message);
$observable = $this->em->getRepository($message['observableClass'])->find($message['observable']);
$observedEntities = call_user_func([$observable, $message['observableCallback']]);
if (! $observedEntities instanceof Collection) {
// another cases are not implemented
return false;
}
if ($observedEntities->count() > 0) {
if (
$observedEntities->count() > self::$maxObservedElementsPerChunk
&& !isset($message['partitionIndex'])
) {
$partitionsNeeded = (int) round( ceil( $observedEntities->count() / self::$partitionChunkSize ), 0 );
foreach (range(0, $partitionsNeeded) as $partIndex => $p) {
$message["partitionIndex"] = $partIndex;
$message["partitionChunkSize"] = self::$partitionChunkSize;
$this->container->get('app.rabbit.producer.collect_observer_notification')->publish(serialize($message));
return true;
}
}
if (isset($message['partitionIndex'])) {
$observedEntities = $observedEntities->slice(
$message['partitionIndex'] * self::$partitionChunkSize,
self::$partitionChunkSize
);
}
$notificationReport = $this->prepareNotificationReport($message['notificationId'], $message['observedEntityClass']);
$this->analyzeObservedEntities(
new ArrayCollection($observedEntities),
$message['observedEntityClass'],
$notificationReport
);
} else {
$notificationReport = $this->prepareNotificationReport($message['notificationId'], $message['observedEntityClass']);
$this->sendNotificationReportToObserver($notificationReport);
}
return true;
}
/**
* @param $notificationId
* @param $observedEntityClass
*
* @return NotificationReport
*/
private function prepareNotificationReport($notificationId, $observedEntityClass)
{
$notification = $this->em->getRepository(Notification::class)->find($notificationId);
$lastNotificationRun = $notification->getLastRun() ?: $notification->getCreatedAt();
$notificationReport = $this->em->getRepository(NotificationReport::class)->findOneBy([
'notification' => $notification,
'includesChangesFrom' => $lastNotificationRun,
'observedEntityClass' => $observedEntityClass,
]);
if (null == $notificationReport) {
$notificationReport = new NotificationReport();
$notificationReport
->setNotification($notification)
->setIncludesChangesFrom($lastNotificationRun)
->setApplication($notification->getApplication())
->setObservedEntityClass($observedEntityClass);
$this->em->persist($notificationReport);
$this->em->flush($notificationReport);
}
return $notificationReport;
}
/**
* @param Collection $observedEntities
* @param string $observedClass
* @param NotificationReport $notificationReport
*
* @return void
*/
private function analyzeObservedEntities(
Collection $observedEntities,
$observedClass,
NotificationReport $notificationReport
) {
$isReportChanged = false;
foreach ($observedEntities as $observedEntity) {
$logs = $this->em
->getRepository(LogEntry::class)
->getLogEntriesAfterDate(
$observedEntity->getId(),
$observedClass,
$notificationReport->getIncludesChangesFrom(),
1
);
if (empty($logs)) {
continue;
}
$notificationReport->addChangedObservedEntity($observedEntity->getId());
$isReportChanged = true;
}
if ($isReportChanged) {
$this->em->persist($notificationReport);
$this->em->flush($notificationReport);
}
}
/**
* @param NotificationReport $notificationReport
*
* @return void
*/
private function sendNotificationReportToObserver(NotificationReport $notificationReport)
{
$notification = $notificationReport->getNotification();
/** @var Observer $observer */
$observer = $this->observerRegister->getObserverByClassname($notification->getObserverClass());
$this->markSendTime($notificationReport->getNotification());
$observer->onNotification($notificationReport);
}
/**
* @param Notification $notification
*
* @return void
*/
private function markSendTime(Notification $notification)
{
$notification->setLastRun(Carbon::now());
$this->em->persist($notification);
$this->em->flush();
}
}