RabbitMQ integration test
<?xml version="1.0" encoding="UTF-8"?>
<!-- http://phpunit.de/manual/4.1/en/appendixes.configuration.html -->
<phpunit xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="http://schema.phpunit.de/4.1/phpunit.xsd"
backupGlobals="false"
colors="true"
bootstrap="app/autoload.php"
>
<php>
<ini name="error_reporting" value="-1" />
<server name="KERNEL_DIR" value="app/" />
</php>
<listeners>
<listener class="Zalas\Test\RabbitMQListener"></listener>
</listeners>
<testsuites>
<testsuite name="Project Test Suite">
<directory>tests</directory>
</testsuite>
</testsuites>
</phpunit>
<?php
namespace Zalas\Test;
trait RabbitMQVhostSetup
{
/**
* @beforeClass
*/
public function setupRabbitMQVhosts()
{
foreach ($this->rabbitMQVhosts() as $vhost) {
$this->createVhost($vhost);
$this->configureVhostPermissions($vhost);
}
}
protected function rabbitMQVhosts()
{
return [];
}
protected function rabbitMQManagementConfiguration()
{
return [
'username' => 'guest',
'password' => 'guest',
'host' => 'localhost',
'port' => 15672
];
}
private function createVhost($vhost)
{
$this->rabbitMQManagement('PUT', '/api/vhosts/'.$vhost);
}
private function configureVhostPermissions($vhost)
{
$this->rabbitMQManagement(
'PUT',
sprintf('/api/permissions/%s/vagrant', $vhost),
'{"configure":".*","write":".*","read":".*"}'
);
}
private function rabbitMQManagement($method, $uri, $content = null)
{
$config = $this->rabbitMQManagementConfiguration();
$context = stream_context_create(['http' =>
[
'method' => $method,
'header' => implode("\r\n", [
'Content-Type: application/json',
'Authorization: Basic ' . base64_encode(sprintf('%s:%s', $config['username'], $config['password'])),
]),
'content' => $content
],
]);
$result = @file_get_contents(sprintf('http://%s:%d%s', $config['host'], $config['port'], $uri), null, $context);
if (false === $result) {
self::markTestSkipped('Failed to configure a RabbitMQ vhost for testing');
}
return $result;
}
}
<?php
namespace Zalas\Test;
use Exception;
use PHPUnit_Framework_AssertionFailedError;
use PHPUnit_Framework_Test;
use PHPUnit_Framework_TestSuite;
final class RabbitMQListener implements \PHPUnit_Framework_TestListener
{
const GROUP = 'rabbitmq';
/**
* @var bool
*/
private $initialised = false;
/**
* @param PHPUnit_Framework_Test $test
* @param Exception $e
* @param float $time
*/
public function addError(PHPUnit_Framework_Test $test, Exception $e, $time)
{
}
/**
* @param PHPUnit_Framework_Test $test
* @param PHPUnit_Framework_AssertionFailedError $e
* @param float $time
*/
public function addFailure(PHPUnit_Framework_Test $test, PHPUnit_Framework_AssertionFailedError $e, $time)
{
}
/**
* @param PHPUnit_Framework_Test $test
* @param Exception $e
* @param float $time
*/
public function addIncompleteTest(PHPUnit_Framework_Test $test, Exception $e, $time)
{
}
/**
* @param PHPUnit_Framework_Test $test
* @param Exception $e
* @param float $time
*/
public function addRiskyTest(PHPUnit_Framework_Test $test, Exception $e, $time)
{
}
/**
* @param PHPUnit_Framework_Test $test
* @param Exception $e
* @param float $time
*/
public function addSkippedTest(PHPUnit_Framework_Test $test, Exception $e, $time)
{
}
/**
* @param PHPUnit_Framework_TestSuite $suite
*/
public function startTestSuite(PHPUnit_Framework_TestSuite $suite)
{
if (!$this->initialised && in_array(self::GROUP, $suite->getGroups())) {
$this->setUpVhosts();
}
}
/**
* @param PHPUnit_Framework_TestSuite $suite
*/
public function endTestSuite(PHPUnit_Framework_TestSuite $suite)
{
}
/**
* @param PHPUnit_Framework_Test $test
*/
public function startTest(PHPUnit_Framework_Test $test)
{
}
/**
* @param PHPUnit_Framework_Test $test
* @param float $time
*/
public function endTest(PHPUnit_Framework_Test $test, $time)
{
}
private function setUpVhosts()
{
foreach ($this->rabbitMQVhosts() as $vhost) {
$this->createVhost($vhost);
$this->configureVhostPermissions($vhost);
}
$this->initialised = true;
}
protected function rabbitMQVhosts()
{
return ['test_scraper'];
}
private function rabbitMQManagementConfiguration()
{
return [
'username' => 'vagrant',
'password' => 'vagrant',
'host' => 'rabbitmq',
'port' => 15672
];
}
private function createVhost($vhost)
{
$this->rabbitMQManagement('PUT', '/api/vhosts/'.$vhost);
}
private function configureVhostPermissions($vhost)
{
$this->rabbitMQManagement(
'PUT',
sprintf('/api/permissions/%s/vagrant', $vhost),
'{"configure":".*","write":".*","read":".*"}'
);
}
private function rabbitMQManagement($method, $uri, $content = null)
{
$config = $this->rabbitMQManagementConfiguration();
$context = stream_context_create(['http' =>
[
'method' => $method,
'header' => implode("\r\n", [
'Content-Type: application/json',
'Authorization: Basic ' . base64_encode(sprintf('%s:%s', $config['username'], $config['password'])),
]),
'content' => $content
],
]);
$result = @file_get_contents(sprintf('http://%s:%d%s', $config['host'], $config['port'], $uri), null, $context);
if (false === $result) {
self::markTestSkipped('Failed to configure a RabbitMQ vhost for testing');
}
return $result;
}
}
<?php
namespace Zalas\Infrastructure\Scraper;
use OldSound\RabbitMqBundle\RabbitMq\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
use PhpAmqpLib\Connection\AbstractConnection;
use PhpAmqpLib\Connection\AMQPLazyConnection;
use PhpAmqpLib\Exception\AMQPTimeoutException;
use PhpAmqpLib\Message\AMQPMessage;
use Symfony\Bundle\FrameworkBundle\Test\KernelTestCase;
/**
* @group integration
* @group rabbitmq
*/
class QueueScraperTest extends KernelTestCase
{
/**
* @var ProducerInterface
*/
private $queueProducer;
/**
* @var AMQPMessage
*/
private $consumedMessage;
protected function setUp()
{
self::bootKernel();
$this->queueProducer = $this->getProducer();
}
public function testItPushesScrapeRequestToTheQueue()
{
$this->requiresRabbitMQ();
$uuid = new Uuid('1f8192fe-1349-49c7-a04d-240455af1063');
$scraper = self::$kernel->getContainer()->get('scraper');
$scraper->scrape($uuid);
$this->consumeMessage();
$this->assertInstanceOf(AMQPMessage::class, $this->consumedMessage);
$this->assertSame((string) $uuid, $this->consumedMessage->getBody());
}
private function consumeMessage()
{
try {
$this->getConsumer()->consume(1);
} catch (AMQPTimeoutException $e) {
$this->fail('Failed to consume a message: ' . $e->getMessage());
}
}
public function handleMessage(AMQPMessage $message)
{
$this->consumedMessage = $message;
}
/**
* @return ProducerInterface
*/
private function getProducer()
{
return self::$kernel->getContainer()->get('old_sound_rabbit_mq.scrape_product_producer');
}
/**
* @return ConsumerInterface
*/
private function getConsumer()
{
$consumer = new Consumer($this->getConnection());
$consumer->setExchangeOptions(['name' => 'scrape-product', 'type' => 'direct']);
$consumer->setQueueOptions(['name' => 'scrape-product']);
$consumer->setCallback([$this, 'handleMessage']);
$consumer->setIdleTimeout(2);
return $consumer;
}
/**
* @return AbstractConnection
*/
private function getConnection()
{
return self::$kernel->getContainer()->get('old_sound_rabbit_mq.connection.checkout');
}
private function requiresRabbitMQ()
{
$connection = $this->getConnection();
if ($connection instanceof AMQPLazyConnection) {
try {
$connection->reconnect();
} catch (\ErrorException $e) {
}
}
if (!$connection->isConnected()) {
$this->markTestSkipped('RabbitMQ is not available.');
}
}
}