diff --git a/DependencyInjection/Configuration.php b/DependencyInjection/Configuration.php index 64e3230..e6099f8 100644 --- a/DependencyInjection/Configuration.php +++ b/DependencyInjection/Configuration.php @@ -10,10 +10,16 @@ class Configuration implements ConfigurationInterface /** * {@inheritdoc} */ - public function getConfigTreeBuilder() + public function getConfigTreeBuilder(): TreeBuilder { - $tb = new TreeBuilder(); - $rootNode = $tb->root('enqueue_elastica'); + + $tb = new TreeBuilder('enqueue_elastica'); + if (method_exists($tb, 'getRootNode')) { + $rootNode = $tb->getRootNode(); + } else { + // BC layer for symfony/config 4.1 and older + $rootNode = $tb->root('enqueue_elastica'); + } $rootNode ->children() ->booleanNode('enabled')->defaultValue(true)->end() @@ -32,7 +38,6 @@ public function getConfigTreeBuilder() ->booleanNode('remove')->defaultTrue()->end() ->scalarNode('connection')->defaultValue('default')->cannotBeEmpty()->end() ->scalarNode('index_name')->isRequired()->cannotBeEmpty()->end() - ->scalarNode('type_name')->isRequired()->cannotBeEmpty()->end() ->scalarNode('model_class')->isRequired()->cannotBeEmpty()->end() ->scalarNode('model_id')->defaultValue('id')->cannotBeEmpty()->end() ->scalarNode('repository_method')->defaultValue('find')->cannotBeEmpty()->end() diff --git a/DependencyInjection/EnqueueElasticaExtension.php b/DependencyInjection/EnqueueElasticaExtension.php index d717cdf..d354fc0 100644 --- a/DependencyInjection/EnqueueElasticaExtension.php +++ b/DependencyInjection/EnqueueElasticaExtension.php @@ -34,6 +34,7 @@ public function load(array $configs, ContainerBuilder $container) $container->register('enqueue_elastica.populate_processor', PopulateProcessor::class) ->addArgument(new Reference('fos_elastica.pager_provider_registry')) ->addArgument(new Reference('fos_elastica.pager_persister_registry')) + ->addArgument(new Reference('fos_elastica.index_manager')) ->addTag('enqueue.command_subscriber', ['client' => $transport]) ->addTag('enqueue.transport.processor', ['transport' => $transport]) @@ -45,10 +46,11 @@ public function load(array $configs, ContainerBuilder $container) ->addTag('kernel.event_subscriber') ; - $container->register('enqueue_elastica.queue_pager_perister', QueuePagerPersister::class) + $container->register('enqueue_elastica.queue_pager_persister', QueuePagerPersister::class) ->addArgument(new Reference('enqueue_elastica.context')) ->addArgument(new Reference('fos_elastica.persister_registry')) ->addArgument(new Reference('event_dispatcher')) + ->addArgument(new Reference('fos_elastica.index_manager')) ->addTag('fos_elastica.pager_persister', ['persisterName' => 'queue']) ->setPublic(true) @@ -67,9 +69,8 @@ public function load(array $configs, ContainerBuilder $container) foreach ($config['doctrine']['queue_listeners'] as $listenerConfig) { $listenerId = sprintf( - 'enqueue_elastica.doctrine_queue_listener.%s.%s', - $listenerConfig['index_name'], - $listenerConfig['type_name'] + 'enqueue_elastica.doctrine_queue_listener.%s', + $listenerConfig['index_name'] ); $container->register($listenerId, SyncIndexWithObjectChangeListener::class) diff --git a/Doctrine/Queue/SyncIndexWithObjectChangeProcessor.php b/Doctrine/Queue/SyncIndexWithObjectChangeProcessor.php index 1f818ca..ec6024e 100644 --- a/Doctrine/Queue/SyncIndexWithObjectChangeProcessor.php +++ b/Doctrine/Queue/SyncIndexWithObjectChangeProcessor.php @@ -10,7 +10,7 @@ use Interop\Queue\Context; use Interop\Queue\Message; use Interop\Queue\Processor; -use Doctrine\Common\Persistence\ManagerRegistry; +use Doctrine\Persistence\ManagerRegistry; final class SyncIndexWithObjectChangeProcessor implements Processor, CommandSubscriberInterface, QueueSubscriberInterface { @@ -49,9 +49,6 @@ public function process(Message $message, Context $context): Result if (false == isset($data['index_name'])) { return Result::reject('The message data misses index_name'); } - if (false == isset($data['type_name'])) { - return Result::reject('The message data misses type_name'); - } if (false == isset($data['repository_method'])) { return Result::reject('The message data misses repository_method'); } @@ -60,11 +57,10 @@ public function process(Message $message, Context $context): Result $modelClass = $data['model_class']; $id = $data['id']; $index = $data['index_name']; - $type = $data['type_name']; $repositoryMethod = $data['repository_method']; $repository = $this->doctrine->getManagerForClass($modelClass)->getRepository($modelClass); - $persister = $this->persisterRegistry->getPersister($index, $type); + $persister = $this->persisterRegistry->getPersister($index); switch ($action) { case self::UPDATE_ACTION: @@ -75,7 +71,7 @@ public function process(Message $message, Context $context): Result } if ($persister->handlesObject($object)) { - if ($this->indexable->isObjectIndexable($index, $type, $object)) { + if ($this->indexable->isObjectIndexable($index, $object)) { $persister->replaceOne($object); } else { $persister->deleteOne($object); @@ -90,7 +86,7 @@ public function process(Message $message, Context $context): Result return Result::ack(sprintf('The object "%s" with id "%s" could not be found.', $modelClass, $id)); } - if ($persister->handlesObject($object) && $this->indexable->isObjectIndexable($index, $type, $object)) { + if ($persister->handlesObject($object) && $this->indexable->isObjectIndexable($index, $object)) { $persister->insertOne($object); } diff --git a/Doctrine/SyncIndexWithObjectChangeListener.php b/Doctrine/SyncIndexWithObjectChangeListener.php index d286a94..1966f2b 100644 --- a/Doctrine/SyncIndexWithObjectChangeListener.php +++ b/Doctrine/SyncIndexWithObjectChangeListener.php @@ -1,7 +1,8 @@ getObject() instanceof $this->modelClass) { - $this->sendUpdateIndexMessage(SyncProcessor::UPDATE_ACTION, $args); + $this->scheduledForUpdateIndex[] = [ + 'action' => SyncProcessor::UPDATE_ACTION, + 'id' => $this->extractId($args->getObject()) + ]; } } public function postPersist(LifecycleEventArgs $args) { if ($args->getObject() instanceof $this->modelClass) { - $this->sendUpdateIndexMessage(SyncProcessor::INSERT_ACTION, $args); + $this->scheduledForUpdateIndex[] = [ + 'action' => SyncProcessor::INSERT_ACTION, + 'id' => $this->extractId($args->getObject()) + ]; } } public function preRemove(LifecycleEventArgs $args) { if ($args->getObject() instanceof $this->modelClass) { - $this->sendUpdateIndexMessage(SyncProcessor::REMOVE_ACTION, $args); + $this->scheduledForUpdateIndex[] = [ + 'action' => SyncProcessor::REMOVE_ACTION, + 'id' => $this->extractId($args->getObject()) + ]; + } + } + + public function postFlush(PostFlushEventArgs $event) + { + if (count($this->scheduledForUpdateIndex)) { + foreach ($this->scheduledForUpdateIndex as $updateIndex) { + $this->sendUpdateIndexMessage($updateIndex['action'], $updateIndex['id']); + } + + $this->scheduledForUpdateIndex = []; } } @@ -57,22 +82,16 @@ public function getSubscribedEvents() 'postPersist', 'postUpdate', 'preRemove', + 'postFlush' ]; } /** * @param string $action - * @param LifecycleEventArgs $args + * @param $id */ - private function sendUpdateIndexMessage($action, LifecycleEventArgs $args) + private function sendUpdateIndexMessage($action, $id) { - $object = $args->getObject(); - - $rp = new \ReflectionProperty($object, $this->config['model_id']); - $rp->setAccessible(true); - $id = $rp->getValue($object); - $rp->setAccessible(false); - $queue = $this->context->createQueue(Commands::SYNC_INDEX_WITH_OBJECT_CHANGE); $message = $this->context->createMessage(JSON::encode([ @@ -81,10 +100,24 @@ private function sendUpdateIndexMessage($action, LifecycleEventArgs $args) 'model_id' => $this->config['model_id'], 'id' => $id, 'index_name' => $this->config['index_name'], - 'type_name' => $this->config['type_name'], 'repository_method' => $this->config['repository_method'], ])); $this->context->createProducer()->send($queue, $message); } + + /** + * @param $object + * @return mixed + * @throws \ReflectionException + */ + private function extractId($object) + { + $rp = (new \ReflectionClass($this->modelClass))->getProperty($this->config['model_id']); + $rp->setAccessible(true); + $id = $rp->getValue($object); + $rp->setAccessible(false); + + return $id; + } } diff --git a/Persister/Listener/PurgePopulateQueueListener.php b/Persister/Listener/PurgePopulateQueueListener.php index 052e80b..866b949 100644 --- a/Persister/Listener/PurgePopulateQueueListener.php +++ b/Persister/Listener/PurgePopulateQueueListener.php @@ -3,7 +3,6 @@ use FOS\ElasticaBundle\Persister\Event\PrePersistEvent; use Interop\Queue\Context; -use FOS\ElasticaBundle\Persister\Event\Events; use Symfony\Component\EventDispatcher\EventSubscriberInterface; class PurgePopulateQueueListener implements EventSubscriberInterface @@ -44,7 +43,7 @@ public function purgePopulateQueue(PrePersistEvent $event) public static function getSubscribedEvents() { return [ - Events::PRE_PERSIST => 'purgePopulateQueue', + PrePersistEvent::class => 'purgePopulateQueue', ]; } } diff --git a/Persister/QueuePagerPersister.php b/Persister/QueuePagerPersister.php index f45cb92..9c2c471 100644 --- a/Persister/QueuePagerPersister.php +++ b/Persister/QueuePagerPersister.php @@ -3,7 +3,7 @@ use Enqueue\ElasticaBundle\Queue\Commands; use Enqueue\Util\JSON; -use FOS\ElasticaBundle\Persister\Event\Events; +use FOS\ElasticaBundle\Index\IndexManager; use FOS\ElasticaBundle\Persister\Event\PostAsyncInsertObjectsEvent; use FOS\ElasticaBundle\Persister\Event\PostPersistEvent; use FOS\ElasticaBundle\Persister\Event\PrePersistEvent; @@ -29,11 +29,21 @@ final class QueuePagerPersister implements PagerPersisterInterface */ private $dispatcher; - public function __construct(Context $context, PersisterRegistry $registry, EventDispatcherInterface $dispatcher) - { + /** + * @var IndexManager + */ + private $indexManager; + + public function __construct( + Context $context, + PersisterRegistry $registry, + EventDispatcherInterface $dispatcher, + IndexManager $indexManager + ) { $this->context = $context; $this->dispatcher = $dispatcher; $this->registry = $registry; + $this->indexManager = $indexManager; } /** @@ -43,22 +53,28 @@ public function insert(PagerInterface $pager, array $options = array()) { $pager->setMaxPerPage(empty($options['max_per_page']) ? 100 : $options['max_per_page']); - $options = array_replace([ + $defaultOptions = [ 'max_per_page' => $pager->getMaxPerPage(), 'first_page' => $pager->getCurrentPage(), 'last_page' => $pager->getNbPages(), 'populate_queue' => Commands::POPULATE, 'populate_reply_queue' => null, 'reply_receive_timeout' => 5000, // ms - 'limit_overall_reply_time' => 180, // sec - ], $options); + 'limit_overall_reply_time' => 180 // sec + ]; + $index = $this->indexManager->getIndex($options['indexName']); + if ($index->getName() !== $index->getOriginalName()) { + $defaultOptions['realIndexName'] = $index->getName(); + } + + $options = array_replace($defaultOptions, $options); $pager->setCurrentPage($options['first_page']); - $objectPersister = $this->registry->getPersister($options['indexName'], $options['typeName']); + $objectPersister = $this->registry->getPersister($options['indexName']); $event = new PrePersistEvent($pager, $objectPersister, $options); - $this->dispatcher->dispatch(Events::PRE_PERSIST, $event); + $this->dispatcher->dispatch($event); $pager = $event->getPager(); $options = $event->getOptions(); @@ -122,7 +138,7 @@ public function insert(PagerInterface $pager, array $options = array()) $errorMessage, $data['options'] ); - $this->dispatcher->dispatch(Events::POST_ASYNC_INSERT_OBJECTS, $event); + $this->dispatcher->dispatch($event); } if (microtime(true) > $limitTime) { @@ -131,6 +147,6 @@ public function insert(PagerInterface $pager, array $options = array()) } $event = new PostPersistEvent($pager, $objectPersister, $options); - $this->dispatcher->dispatch(Events::POST_PERSIST, $event); + $this->dispatcher->dispatch($event); } } diff --git a/Queue/PopulateProcessor.php b/Queue/PopulateProcessor.php index 3951a6c..02456ea 100644 --- a/Queue/PopulateProcessor.php +++ b/Queue/PopulateProcessor.php @@ -4,6 +4,7 @@ use Enqueue\Client\CommandSubscriberInterface; use Enqueue\Consumption\QueueSubscriberInterface; use Enqueue\Consumption\Result; +use FOS\ElasticaBundle\Index\IndexManager; use FOS\ElasticaBundle\Persister\InPlacePagerPersister; use FOS\ElasticaBundle\Persister\PagerPersisterRegistry; use FOS\ElasticaBundle\Provider\PagerProviderRegistry; @@ -18,12 +19,16 @@ final class PopulateProcessor implements Processor, CommandSubscriberInterface, private $pagerPersisterRegistry; + private $indexManager; + public function __construct( PagerProviderRegistry $pagerProviderRegistry, - PagerPersisterRegistry $pagerPersisterRegistry + PagerPersisterRegistry $pagerPersisterRegistry, + IndexManager $indexManager ) { $this->pagerPersisterRegistry = $pagerPersisterRegistry; $this->pagerProviderRegistry = $pagerProviderRegistry; + $this->indexManager = $indexManager; } public function process(Message $message, Context $context): Result @@ -48,15 +53,16 @@ public function process(Message $message, Context $context): Result if (!isset($data['options']['indexName'])) { return Result::reply($this->createReplyMessage($context, $message, 0,'The message is invalid. Missing indexName option.')); } - if (!isset($data['options']['typeName'])) { - return Result::reply($this->createReplyMessage($context, $message, 0,'The message is invalid. Missing typeName option.')); - } $options = $data['options']; $options['first_page'] = $data['page']; $options['last_page'] = $data['page']; - $provider = $this->pagerProviderRegistry->getProvider($options['indexName'], $options['typeName']); + if (isset($options['realIndexName'])) { + $this->indexManager->getIndex($options['indexName'])->overrideName($options['realIndexName']); + } + + $provider = $this->pagerProviderRegistry->getProvider($options['indexName']); $pager = $provider->provide($options); $pager->setMaxPerPage($options['max_per_page']); $pager->setCurrentPage($options['first_page']); diff --git a/composer.json b/composer.json index 7391d3d..fe0564e 100644 --- a/composer.json +++ b/composer.json @@ -5,10 +5,13 @@ "keywords": ["elasticsearch", "elastica", "fos", "performance"], "license": "MIT", "require": { - "php": "^7.1", - "symfony/framework-bundle": "^3.4|^4", - "friendsofsymfony/elastica-bundle": "^5", - "enqueue/enqueue-bundle": "^0.9" + "php": "^7.1|^8.0", + "symfony/framework-bundle": "^6.0|^7.0", + "friendsofsymfony/elastica-bundle": "^6.0", + "enqueue/enqueue-bundle": "^0.10" + }, + "require-dev": { + "doctrine/orm": "^2.0" }, "autoload": { "psr-4": { "Enqueue\\ElasticaBundle\\": "" } @@ -16,7 +19,7 @@ "minimum-stability": "dev", "extra": { "branch-alias": { - "dev-master": "0.9.x-dev" + "dev-master": "0.10.x-dev" } } }