gustavopaes
1/28/2017 - 8:00 PM

simple rabbitmq benchmark

simple rabbitmq benchmark

{
  "name": "mq-benchmark",
  "version": "1.0.0",
  "description": "",
  "main": "index.js",
  "scripts": {
    "test": "echo \"Error: no test specified\" && exit 1"
  },
  "author": "Gustavo Paes <gustavo.paes@gmail.com>",
  "license": "ISC",
  "dependencies": {
    "amqplib": "^0.5.1"
  }
}
const amqplib = require('amqplib');
const connect = amqplib.connect('amqp://172.17.0.2:5672');

const QUEUE_NAME = 'benchmark';
const TOTAL_MESSAGES = process.argv[2] || 10;

let SEND_MESSAGES = 0;
let RECEIVED_MESSAGES = 0;

let sendingStartTime;
let consumerStartTime;

const log = () => {
  process.stdout.write(`Sending messages: ${SEND_MESSAGES}/${TOTAL_MESSAGES} (${(new Date() - sendingStartTime) / 1000}s); Receiving messages: ${RECEIVED_MESSAGES}/${TOTAL_MESSAGES} (${(new Date() - consumerStartTime) / 1000}s)\r`);
}

const getSampleMessage = () => {
  return JSON.stringify({
    id: new Date().getTime(),
    message: 'benchamrk message',
    action: 'YOU_CAN_DO_IT'
  });
};

const createChannel = conn => {
  return conn.createConfirmChannel();
};

const assertQueue = channel => {
  return channel.assertQueue(QUEUE_NAME).then(ok => {
    return Promise.resolve(channel);
  });
};

const consume = channel => {
  if(consumerStartTime == undefined) {
    consumerStartTime = new Date();
  }

  channel.prefetch(10);
  channel.consume(QUEUE_NAME, message => {
    RECEIVED_MESSAGES += 1;
    log();
    
    setTimeout(function() {
      channel.ack(message);
    }, Math.max(300, parseInt(Math.random() * 1000)));

    if(RECEIVED_MESSAGES == TOTAL_MESSAGES) {
      process.exit(0);
    }
  });

  return Promise.resolve(channel);
};

const publish = channel => {
  if(sendingStartTime == undefined) {
    sendingStartTime = new Date();
    const publishWorkers = 10;
    console.log(`Starting ${publishWorkers} publish workers...`);
    for(let i = 0; i < publishWorkers; i++) {
      publish(channel);
    }

    return Promise.resolve(channel);
  }

  if(SEND_MESSAGES >= TOTAL_MESSAGES) {
    return Promise.resolve(channel);
  }

  SEND_MESSAGES += 1;
  channel.sendToQueue(QUEUE_NAME, new Buffer(getSampleMessage()), {}, () => {
    log();

    if(SEND_MESSAGES < TOTAL_MESSAGES) {
      publish(channel);
    }
  });
};

console.log(`Starting test, publishing ${TOTAL_MESSAGES} messages...`);

connect
  .then(createChannel)
  .then(assertQueue)
  .then(publish)
  .then(consume)
  .catch(err => {
    console.error('Error during the test:');

    console.error(`\t${err.message}`);

    switch(err.code) {
      case 'ECONNREFUSED':
        console.error(`\tCertificate to up the rabbitmq service at localhost:5672`);
      break;
    }

    console.log('');
    process.exit(1);
  });