nickwshaw
6/23/2017 - 7:04 AM

RabbitMQ integration test

RabbitMQ integration test

<?xml version="1.0" encoding="UTF-8"?>

<!-- http://phpunit.de/manual/4.1/en/appendixes.configuration.html -->
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:noNamespaceSchemaLocation="http://schema.phpunit.de/4.1/phpunit.xsd"
         backupGlobals="false"
         colors="true"
         bootstrap="app/autoload.php"
>
    <php>
        <ini name="error_reporting" value="-1" />
        <server name="KERNEL_DIR" value="app/" />
    </php>

    <listeners>
        <listener class="Zalas\Test\RabbitMQListener"></listener>
    </listeners>

    <testsuites>
        <testsuite name="Project Test Suite">
            <directory>tests</directory>
        </testsuite>
    </testsuites>
</phpunit>
<?php

namespace Zalas\Test;

trait RabbitMQVhostSetup
{
    /**
     * @beforeClass
     */
    public function setupRabbitMQVhosts()
    {
        foreach ($this->rabbitMQVhosts() as $vhost) {
            $this->createVhost($vhost);
            $this->configureVhostPermissions($vhost);
        }
    }

    protected function rabbitMQVhosts()
    {
        return [];
    }

    protected function rabbitMQManagementConfiguration()
    {
        return [
            'username' => 'guest',
            'password' => 'guest',
            'host' => 'localhost',
            'port' => 15672
        ];
    }

    private function createVhost($vhost)
    {
        $this->rabbitMQManagement('PUT', '/api/vhosts/'.$vhost);
    }

    private function configureVhostPermissions($vhost)
    {
        $this->rabbitMQManagement(
            'PUT',
            sprintf('/api/permissions/%s/vagrant', $vhost),
            '{"configure":".*","write":".*","read":".*"}'
        );
    }

    private function rabbitMQManagement($method, $uri, $content = null)
    {
        $config = $this->rabbitMQManagementConfiguration();

        $context = stream_context_create(['http' =>
            [
                'method' => $method,
                'header' => implode("\r\n", [
                    'Content-Type: application/json',
                    'Authorization: Basic ' . base64_encode(sprintf('%s:%s', $config['username'], $config['password'])),
                ]),
                'content' => $content
            ],
        ]);

        $result = @file_get_contents(sprintf('http://%s:%d%s', $config['host'], $config['port'], $uri), null, $context);

        if (false === $result) {
            self::markTestSkipped('Failed to configure a RabbitMQ vhost for testing');
        }

        return $result;
    }
}
<?php

namespace Zalas\Test;

use Exception;
use PHPUnit_Framework_AssertionFailedError;
use PHPUnit_Framework_Test;
use PHPUnit_Framework_TestSuite;

final class RabbitMQListener implements \PHPUnit_Framework_TestListener
{
    const GROUP = 'rabbitmq';

    /**
     * @var bool
     */
    private $initialised = false;

    /**
     * @param PHPUnit_Framework_Test $test
     * @param Exception              $e
     * @param float                  $time
     */
    public function addError(PHPUnit_Framework_Test $test, Exception $e, $time)
    {
    }

    /**
     * @param PHPUnit_Framework_Test                 $test
     * @param PHPUnit_Framework_AssertionFailedError $e
     * @param float                                  $time
     */
    public function addFailure(PHPUnit_Framework_Test $test, PHPUnit_Framework_AssertionFailedError $e, $time)
    {
    }

    /**
     * @param PHPUnit_Framework_Test $test
     * @param Exception              $e
     * @param float                  $time
     */
    public function addIncompleteTest(PHPUnit_Framework_Test $test, Exception $e, $time)
    {
    }

    /**
     * @param PHPUnit_Framework_Test $test
     * @param Exception              $e
     * @param float                  $time
     */
    public function addRiskyTest(PHPUnit_Framework_Test $test, Exception $e, $time)
    {
    }

    /**
     * @param PHPUnit_Framework_Test $test
     * @param Exception              $e
     * @param float                  $time
     */
    public function addSkippedTest(PHPUnit_Framework_Test $test, Exception $e, $time)
    {
    }

    /**
     * @param PHPUnit_Framework_TestSuite $suite
     */
    public function startTestSuite(PHPUnit_Framework_TestSuite $suite)
    {
        if (!$this->initialised && in_array(self::GROUP, $suite->getGroups())) {
            $this->setUpVhosts();
        }
    }

    /**
     * @param PHPUnit_Framework_TestSuite $suite
     */
    public function endTestSuite(PHPUnit_Framework_TestSuite $suite)
    {
    }

    /**
     * @param PHPUnit_Framework_Test $test
     */
    public function startTest(PHPUnit_Framework_Test $test)
    {
    }

    /**
     * @param PHPUnit_Framework_Test $test
     * @param float                  $time
     */
    public function endTest(PHPUnit_Framework_Test $test, $time)
    {
    }

    private function setUpVhosts()
    {
        foreach ($this->rabbitMQVhosts() as $vhost) {
            $this->createVhost($vhost);
            $this->configureVhostPermissions($vhost);
        }

        $this->initialised = true;
    }

    protected function rabbitMQVhosts()
    {
        return ['test_scraper'];
    }

    private function rabbitMQManagementConfiguration()
    {
        return [
            'username' => 'vagrant',
            'password' => 'vagrant',
            'host' => 'rabbitmq',
            'port' => 15672
        ];
    }

    private function createVhost($vhost)
    {
        $this->rabbitMQManagement('PUT', '/api/vhosts/'.$vhost);
    }

    private function configureVhostPermissions($vhost)
    {
        $this->rabbitMQManagement(
            'PUT',
            sprintf('/api/permissions/%s/vagrant', $vhost),
            '{"configure":".*","write":".*","read":".*"}'
        );
    }

    private function rabbitMQManagement($method, $uri, $content = null)
    {
        $config = $this->rabbitMQManagementConfiguration();

        $context = stream_context_create(['http' =>
            [
                'method' => $method,
                'header' => implode("\r\n", [
                    'Content-Type: application/json',
                    'Authorization: Basic ' . base64_encode(sprintf('%s:%s', $config['username'], $config['password'])),
                ]),
                'content' => $content
            ],
        ]);

        $result = @file_get_contents(sprintf('http://%s:%d%s', $config['host'], $config['port'], $uri), null, $context);

        if (false === $result) {
            self::markTestSkipped('Failed to configure a RabbitMQ vhost for testing');
        }

        return $result;
    }
}
<?php

namespace Zalas\Infrastructure\Scraper;

use OldSound\RabbitMqBundle\RabbitMq\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Connection\AMQPLazyConnection;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;

/**
 * @group integration
 * @group rabbitmq
 */
class QueueScraperTest extends KernelTestCase
{
    /**
     * @var ProducerInterface
     */
    private $queueProducer;

    /**
     * @var AMQPMessage
     */
    private $consumedMessage;

    protected function setUp()
    {
        self::bootKernel();

        $this->queueProducer = $this->getProducer();
    }

    public function testItPushesScrapeRequestToTheQueue()
    {
        $this->requiresRabbitMQ();

        $uuid = new Uuid('1f8192fe-1349-49c7-a04d-240455af1063');
        $scraper = self::$kernel->getContainer()->get('scraper');
        $scraper->scrape($uuid);

        $this->consumeMessage();

        $this->assertInstanceOf(AMQPMessage::class, $this->consumedMessage);
        $this->assertSame((string) $uuid, $this->consumedMessage->getBody());
    }

    private function consumeMessage()
    {
        try {
            $this->getConsumer()->consume(1);
        } catch (AMQPTimeoutException $e) {
            $this->fail('Failed to consume a message: ' . $e->getMessage());
        }
    }

    public function handleMessage(AMQPMessage $message)
    {
        $this->consumedMessage = $message;
    }

    /**
     * @return ProducerInterface
     */
    private function getProducer()
    {
        return self::$kernel->getContainer()->get('old_sound_rabbit_mq.scrape_product_producer');
    }

    /**
     * @return ConsumerInterface
     */
    private function getConsumer()
    {
        $consumer = new Consumer($this->getConnection());

        $consumer->setExchangeOptions(['name' => 'scrape-product', 'type' => 'direct']);
        $consumer->setQueueOptions(['name' => 'scrape-product']);
        $consumer->setCallback([$this, 'handleMessage']);
        $consumer->setIdleTimeout(2);

        return $consumer;
    }

    /**
     * @return AbstractConnection
     */
    private function getConnection()
    {
        return self::$kernel->getContainer()->get('old_sound_rabbit_mq.connection.checkout');
    }

    private function requiresRabbitMQ()
    {
        $connection = $this->getConnection();

        if ($connection instanceof AMQPLazyConnection) {
            try {
                $connection->reconnect();
            } catch (\ErrorException $e) {
            }
        }

        if (!$connection->isConnected()) {
            $this->markTestSkipped('RabbitMQ is not available.');
        }
    }
}