using System;
using Microsoft.VisualStudio.TestTools.UnitTesting;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;
using System.Text;
using System.Diagnostics;
namespace QueueTests
{
[TestClass]
public class RpcTests
{
private string _hostName;
private string _queueName;
private RPCClient _clientA;
private RPCClient _clientB;
private string _messageForA;
private string _messageForB;
private string _responseFromA;
private string _responseFromB;
private readonly string _corIdA = Guid.NewGuid().ToString();
private readonly string _coridB = Guid.NewGuid().ToString();
private IConnection _connection;
private IModel _channel;
private string _testPrefix;
[TestInitialize]
public void Initialize()
{
_testPrefix = "Handled : ";
_messageForA = "Message A ";
_messageForB = "Message B ";
_hostName = "localhost";
_queueName = "Rpc_Demo";
Trace.WriteLine("Corelation ID for A : " + _corIdA);
Trace.WriteLine("Corelation ID for B : " + _coridB);
//Listen to the RPC Quene
StartServerListener(_hostName, _queueName);
//Create the 2 RPC Clients
_clientA = new RPCClient(HandleResponseForA, _hostName);
_clientB = new RPCClient(HandleResponseForB, _hostName);
}
/*
The server listen to the queue
The server read the reply-queue name in the message and send the processed response to this queue
*/
private void StartServerListener(string hostName, string queueName)
{
//Create the connection
var factory = new ConnectionFactory() { HostName = hostName };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
//Declare the RPC queue to ensure it exist
_channel.QueueDeclare(queue: queueName, durable: false, exclusive: false, autoDelete: false, arguments: null);
_channel.BasicQos(0, 1, false);
//Listen to the RPC quene
var consumer = new EventingBasicConsumer(_channel);
_channel.BasicConsume(queue: queueName, noAck: false, consumer: consumer);
Trace.WriteLine("Server listening to RPC calls....");
//Describe what to do when a message is coming from the RPC Queue
consumer.Received += (model, ea) =>
{
string processedMessage = null;
//Read the content of the received message
var body = ea.Body;
var props = ea.BasicProperties;
//Get the replyTo QueneName generated by the client
var replyQueueName = props.ReplyTo;
//Create the properties of the reply message and assign the received CorrelationId
//to it so the client can link the response with the request
var receivedCorelationid = props.CorrelationId;
var replyProps = _channel.CreateBasicProperties();
replyProps.CorrelationId = receivedCorelationid;
try
{
//Process the received message
processedMessage = HandleRpcRequest(body);
Trace.WriteLine("The server processed : " + processedMessage);
}
catch (Exception e)
{
Trace.WriteLine(" [.] " + e.Message);
processedMessage = "ERROR";
}
finally
{
//Publish the response to the replyQueue generated by the client
if (processedMessage != null)
{
var responseBytes = Encoding.UTF8.GetBytes(processedMessage);
_channel.BasicPublish(exchange: "", routingKey: replyQueueName, basicProperties: replyProps, body: responseBytes);
}
_channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
}
};
//Do not close the connection. Other message could be processed
//Close the connection when the test is done (ReleaseConnections());
}
//Process the response received from the remote server for each RPCClient
private void HandleResponseForA(string message)
{
_responseFromA = message;
}
private void HandleResponseForB(string message)
{
_responseFromB = message;
}
[TestMethod]
public void RpcTest()
{
// Each client send a message to the RpcQueue
_clientB.Call(_messageForB, _queueName, _coridB);
_clientA.Call(_messageForA, _queueName, _corIdA);
bool finished = false;
Stopwatch stopwatch = Stopwatch.StartNew();
//Wait for the response during maximum 15 sec
while (stopwatch.Elapsed <= new TimeSpan(0, 0, 0, 15) && finished == false)
{
if (_responseFromA != null && _responseFromB != null)
{
Trace.WriteLine("Message received from A: " + _responseFromA);
Trace.WriteLine("Message received from B: " + _responseFromB);
finished = true;
var checkA = _testPrefix + _messageForA + _corIdA;
var checkB = _testPrefix + _messageForB + _coridB;
var result = _responseFromA == checkA;
if (result)
{
result = _responseFromB == checkB;
}
ReleaseConnections();
Assert.IsTrue(result);
}
}
}
private string HandleRpcRequest(Byte[] message)
{
return _testPrefix + Encoding.UTF8.GetString(message);
}
private void ReleaseConnections()
{
_connection.Close();
_clientA.Close();
_clientB.Close();
}
}
public class RPCClient
{
private readonly IConnection _connection;
private readonly IModel _channel;
private readonly string _replyQueueName;
private readonly string _hostname;
//Create the RPC Client for a defined QueueServer and give the action to invoke when the server response
public RPCClient(Action<string> handler, string hostName)
{
//Create the client connection
var factory = new ConnectionFactory() { HostName = hostName };
_connection = factory.CreateConnection();
_channel = _connection.CreateModel();
_hostname = hostName;
//Create the reply queue
var replyQueue = _channel.QueueDeclare();
//Get the generate quene name
_replyQueueName = replyQueue.QueueName;
//Listen to the replyQueue and configure what to do after that the server has sended his reponse
var consumer = new EventingBasicConsumer(_channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body;
//Read the correlationId received in response. It must be the same as the one sended
var returnedCorelationId = ea.BasicProperties.CorrelationId;
//Create the response
var response = Encoding.UTF8.GetString(body) + returnedCorelationId;
//Invoke the action gived by the client with the response as parameter
handler.Invoke(response);
};
_channel.BasicConsume(queue: _replyQueueName, noAck: true, consumer: consumer);
}
public void Call(string message, string queue, string corrId)
{
//The client call the RPCQueue
//Create the publication properties
var props = _channel.CreateBasicProperties();
//Set the replyQueueName generate during the client initialization in the properties
props.ReplyTo = _replyQueueName;
//Set the corelationId received by the client;
props.CorrelationId = corrId;
//Publish the message to the RPCQueue
Trace.WriteLine($"Sending message " + message + "to host " + _hostname + " on queue " + queue + " with corelationId " + corrId);
var messageBytes = Encoding.UTF8.GetBytes(message);
_channel.BasicPublish(exchange: "",
routingKey: queue,
basicProperties: props,
body: messageBytes);
}
public void Close()
{
//Close the Client Connetion
_connection.Close();
}
}
}