GJaminon
1/7/2017 - 5:15 PM

RabbitMQ RPC

using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Diagnostics;

namespace QueueTests
{
    [TestClass]
    public class RpcTests
    {
        private string _hostName;
        private string _queueName;

        private RPCClient _clientA;
        private RPCClient _clientB;

        private string _messageForA;
        private string _messageForB;

        private string _responseFromA;
        private string _responseFromB;

        private readonly string _corIdA = Guid.NewGuid().ToString();
        private readonly string _coridB = Guid.NewGuid().ToString();

        private IConnection _connection;
        private IModel _channel;

        private string _testPrefix;

        [TestInitialize]
        public void Initialize()
        {
            _testPrefix = "Handled : ";
            _messageForA = "Message A ";
            _messageForB = "Message B ";
            _hostName = "localhost";
            _queueName = "Rpc_Demo";

            Trace.WriteLine("Corelation ID for A : " + _corIdA);
            Trace.WriteLine("Corelation ID for B : " + _coridB);

            //Listen to the RPC Quene
            StartServerListener(_hostName, _queueName);

            //Create the 2 RPC Clients
            _clientA = new RPCClient(HandleResponseForA, _hostName);
            _clientB = new RPCClient(HandleResponseForB, _hostName);
        }

        /*
          The server listen to the queue
          The server read the reply-queue name in the message and send the processed response to this queue
        */
        private void StartServerListener(string hostName, string queueName)
        {
            //Create the connection
            var factory = new ConnectionFactory() { HostName = hostName };
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();

            //Declare the RPC queue to ensure it exist
            _channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
            _channel.BasicQos(0, 1, false);

            //Listen to the RPC quene
            var consumer = new EventingBasicConsumer(_channel);
            _channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
            Trace.WriteLine("Server listening to RPC calls....");

            //Describe what to do when a message is coming from the RPC Queue
            consumer.Received += (model, ea) =>
            {
                string processedMessage = null;

                //Read the content of the received message
                var body = ea.Body;
                var props = ea.BasicProperties;

                //Get the replyTo QueneName generated by the client
                var replyQueueName = props.ReplyTo;

                //Create the properties of the reply message and assign the received CorrelationId 
                //to it so the client can link the response with the request
                var receivedCorelationid = props.CorrelationId;
                var replyProps = _channel.CreateBasicProperties();
                replyProps.CorrelationId = receivedCorelationid;

                try
                {
                    //Process the received message 
                    processedMessage = HandleRpcRequest(body);
                    Trace.WriteLine("The server processed : " + processedMessage);
                }
                catch (Exception e)
                {
                    Trace.WriteLine(" [.] " + e.Message);
                    processedMessage = "ERROR";
                }
                finally
                {
                    //Publish the response to the replyQueue generated by the client
                    if (processedMessage != null)
                    {
                        var responseBytes = Encoding.UTF8.GetBytes(processedMessage);
                        _channel.BasicPublish(exchange: "", routingKey: replyQueueName, basicProperties: replyProps, body: responseBytes);
                    }
                    _channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                }
            };

            //Do not close the connection. Other message could be processed
            //Close the connection when the test is done (ReleaseConnections());
        }

        //Process the response received from the remote server for each RPCClient
        private void HandleResponseForA(string message)
        {
            _responseFromA = message;
        }
        private void HandleResponseForB(string message)
        {
            _responseFromB = message;
        }

        [TestMethod]
        public void RpcTest()
        {
            // Each client send a message to the RpcQueue
            _clientB.Call(_messageForB, _queueName, _coridB);
            _clientA.Call(_messageForA, _queueName, _corIdA);
           

            bool finished = false;
            Stopwatch stopwatch = Stopwatch.StartNew();

            //Wait for the response during maximum 15 sec
            while (stopwatch.Elapsed <= new TimeSpan(0, 0, 0, 15) && finished == false)
            {
                if (_responseFromA != null && _responseFromB != null)
                {
                    Trace.WriteLine("Message received from A: " + _responseFromA);
                    Trace.WriteLine("Message received from B: " + _responseFromB);
                    finished = true;
                    var checkA = _testPrefix + _messageForA + _corIdA;
                    var checkB = _testPrefix + _messageForB + _coridB;

                    var result = _responseFromA == checkA;
                    if (result)
                    {
                        result = _responseFromB == checkB;
                    }
                    ReleaseConnections();
                    Assert.IsTrue(result);
                }
            }
        }


        private string HandleRpcRequest(Byte[] message)
        {
            return _testPrefix + Encoding.UTF8.GetString(message);
        }
        private void ReleaseConnections()
        {
            _connection.Close();
            _clientA.Close();
            _clientB.Close();
        }

    }

    public class RPCClient
    {
        private readonly IConnection _connection;
        private readonly IModel _channel;
        private readonly string _replyQueueName;
        private readonly string _hostname;

        //Create the RPC Client for a defined QueueServer and give the action to invoke when the server response
        public RPCClient(Action<string> handler, string hostName)
        {
            //Create the client connection
            var factory = new ConnectionFactory() { HostName = hostName };
            _connection = factory.CreateConnection();
            _channel = _connection.CreateModel();
            _hostname = hostName;

            //Create the reply queue
            var replyQueue = _channel.QueueDeclare();

            //Get the generate quene name
            _replyQueueName = replyQueue.QueueName;

            //Listen to the replyQueue and configure what to do after that the server has sended his reponse
            var consumer = new EventingBasicConsumer(_channel);
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                //Read the correlationId received in response. It must be the same as the one sended
                var returnedCorelationId = ea.BasicProperties.CorrelationId;

                //Create the response
                var response = Encoding.UTF8.GetString(body) + returnedCorelationId;

                //Invoke the action gived by the client with the response as parameter
                handler.Invoke(response);
            };
            _channel.BasicConsume(queue: _replyQueueName, noAck: true, consumer: consumer);
        }

        public void Call(string message, string queue, string corrId)
        {
            //The client call the RPCQueue

            //Create the publication properties
            var props = _channel.CreateBasicProperties();

            //Set the replyQueueName generate during the client initialization in the properties
            props.ReplyTo = _replyQueueName;

            //Set the corelationId received by the client;
            props.CorrelationId = corrId;

            //Publish the message to the RPCQueue
            Trace.WriteLine($"Sending message " + message  + "to host " + _hostname  + " on queue " + queue + " with corelationId " + corrId);
            var messageBytes = Encoding.UTF8.GetBytes(message);
            _channel.BasicPublish(exchange: "",
                                 routingKey: queue,
                                 basicProperties: props,
                                 body: messageBytes);

        }


        public void Close()
        {
            //Close the Client Connetion
            _connection.Close();
        }
    }
}