mpneuried
5/13/2015 - 6:42 AM

RSMQ simple wildcard queue example

RSMQ simple wildcard queue example

/*
INSTALL:

Init Testcase by using the module rsmq-cli:
$ rsmq -n wildcard create -q wildlife
$ rsmq -n wildcard create -q wildpark
$ rsmq -n wildcard create -q wildcard
$ rsmq -n wildcard create -q wilderness
$ rsmq -n wildcard create -q safari

install the testcase with:
$ npm i rsmq wild


USAGE:
$ node rsmq_wildcard.js [queuewildcard]

examples: 
$ node rsmq_wildcard.js
$ node rsmq_wildcard.js "wild*"
$ node rsmq_wildcard.js "*card"
$ node rsmq_wildcard.js "*a*"
*/

// The wildcard to reduce the queues
var WILDCARD = process.argv[2] || "*"

/* INIT */

// load external modules
var rsmq = new ( require( "rsmq" ) )( {ns: "wildcard"} )
var wild = require('wild')
// init the wildcard module
var regex = wild(WILDCARD, true)

// global queue list
var queues = []

/* FUNCTIONS */

// function to (re)load the availible queues
var loadQueues = function(){
	rsmq.listQueues( function( err, _queues ){
		console.log("Loaded queues", _queues)
		
		// reset the queue list
		queues = []
		// filter the loaded queues by the given wildcard
		for( var i = 0; i < _queues.length; i++ ){
			if( regex.test( _queues[ i ] ) ){
				queues.push( _queues[ i ] )
			}
		}
		console.log("relevant queues", queues)
	});
}

// method the loop throug the available queue names
var getQueueName = (function(){
	// wrap to define a private index
	var currentIdx = 0;
	return function(){
		if( !queues.length ){
			return null
		}
		var _qname = queues[ currentIdx ]

		// rotate through the availible queues
		currentIdx ++
		if( !_qname || currentIdx >= queues.length){
			currentIdx = 0
		}
		return _qname
	}
})()

// a simple receive function
var receive = function(){
	var qn = getQueueName()
	if( !qn ){
		console.error( "no queue found" )
		return
	}
	console.log( "Receive message from queue:", qn )
	rsmq.receiveMessage( {qname: qn}, function( err, msg ){
		if( err ){
			console.error( err )
			return 
		}
		console.log( "Message:", msg )
	} )
}

/* RUN */

// load the availible queues on init
loadQueues()
// reload the list of queues every minute
setInterval( loadQueues, 1000 * 5 )

// load messages from the next queue every second
setInterval( receive, 1000 )