scandel
scandel

Reputation: 1832

How to access EntityManager into a RabbitMQBundle custom producer class?

I am using RabbitMQBundle in a Symfony 2.8 project, and I would like to use a custom producer class which persists an entity (Message) in database before publishing the RabbitMQ message.

I defined the custom producer class in config.yml:

old_sound_rabbit_mq:
  ...
  producers:
    myproducer:
      class: AppBundle\Services\GenericProducer
      connection: default
      exchange_options: {name: 'my_exchange', type: direct}

And the custom Producer class:

<?php

namespace AppBundle\Services;

use AppBundle\Entity\Message;
use OldSound\RabbitMqBundle\RabbitMq\Producer;

/**
 * Customised Producer, that publishes AMQP Messages
 * but also:
 * - writes an entry in the Message table
 */
class GenericProducer extends Producer
{
    /**
     * Entity Manager
     */
    protected $em;


    public function setEntityManager($entityManager)
    {
        $this->em = $entityManager;

        return $this;
    }

    /**
     * Publishes the message and merges additional properties with basic properties
     * And also:
     * - writes an entry in the Message table
     *
     * @param string $action
     * @param array $parameters
     * @param string $routingKey
     * @param array $additionalProperties
     * @param array|null $headers
     */
    public function publish($action, $parameters = array() , $routingKey = '', $additionalProperties = array(), array $headers = null)
    {
        $message = new Message();
        $message->setAction($action)
            ->setParameters($parameters);
        $this->em->persist($message);
        $this->em->flush();

        $msgBody = array(
            'action' => $action,
            'parameters' => $parameters
        );
        parent::publish($msgBody, $routingKey, $additionalProperties, $headers);
    }
}

How can I make a call to GenericProducer->setEntityManager, as the producer is not defined in services.yml, like other services ?

Is there another way to achieve this ?

Thanks for your time.

Upvotes: 1

Views: 476

Answers (2)

scandel
scandel

Reputation: 1832

Following @lordrhodos suggestion, I decorated the producer service generated by RabbitMQBundle. Here is the complete code:

config.yml (nothing special to do):

old_sound_rabbit_mq:
  ...
  producers:
    myproducer:
      connection: default
      exchange_options: {name: 'my_exchange', type: direct} 

services.yml (here you define a decorating service):

app.decorating_myproducer_producer:
      class: AppBundle\Services\GenericProducer
      decorates: old_sound_rabbit_mq.myproducer_producer
      arguments: ['@
app.decorating_myproducer_producer.inner', '@doctrine.orm.entity_manager']
      public: false

decorator class:

<?php

namespace AppBundle\Services;

use AppBundle\Entity\Message;
use OldSound\RabbitMqBundle\RabbitMq\Producer;

/**
 * Customised Producer, that publishes AMQP Messages
 * but also:
 * - writes an entry in the Message table
 */
class GenericProducer extends Producer
{
    /**
     * @var Producer
     */
    protected $producer;

    /**
     * @var EntityManager
     */
    protected $em;

    /**
     * GenericProducer constructor.
     * @param Producer $producer
     * @param EntityManager $entityManager
     */
    public function __construct(Producer $producer, EntityManager $entityManager)
    {
        $this->producer = $producer;
        $this->em = $entityManager;
    }


    /**
     * Publishes the message and merges additional properties with basic properties
     * And also:
     * - writes an entry in the Message table
     *
     * @param string $action
     * @param array $parameters
     * @param string $routingKey
     * @param array $additionalProperties
     * @param array|null $headers
     */
    public function publish($action, $parameters = array() , $routingKey = '', $additionalProperties = array(), array $headers = null)
    {
        $message = new Message();
        $message->setAction($action)
            ->setParameters($parameters);
        $this->em->persist($message);
        $this->em->flush();

        $msgBody = array(
            'action' => $action,
            'parameters' => $parameters
        );
        $this->producer->publish(serialize($msgBody), $routingKey, $additionalProperties, $headers);

    }
}

Finally, call the original producer from a controller:

$this->get('old_sound_rabbit_mq.myproducer_producer')->publish('wait', ['time' => 30]);

Upvotes: 0

lordrhodos
lordrhodos

Reputation: 2745

the producer service definition is generated dynamically by the bundle in the Dependency Injection Extension of the bundle.

You can either try to decorate the existing service or create a compiler pass where you fetch the existing service and extend it by calling the setEntityManager function.

Upvotes: 1

Related Questions