New Relic Node Agent - message instrumentation examples
// This is a mock of a Kafka client module. It exports a Consumer and a Producer
// class with a very basic interface that we would want to instrument.
var util = require('util')
var EventEmitter = require('events').EventEmitter
function Consumer() {
EventEmitter.call(this)
var consumer = this
// simulate sending message to consumers every second
setInterval(function() {
consumer.emit('message', 'some message')
}, 1000)
}
util.inherits(Consumer, EventEmitter)
function Producer() {
EventEmitter.call(this)
}
util.inherits(Producer, EventEmitter)
Producer.prototype.send = function(topic, message, callback) {
// simulate work to send the message
setTimeout(function() {
var metadata = {}
callback(null, metadata)
}, 100)
}
module.exports = {
Consumer: Consumer,
Producer: Producer
}
// This file contains the instrumentation function for the Kafka client module.
// This file could be part of a completely separate module from the app, reusable
// between multiple apps that use the Kafka client.
module.exports = instrumentKafkaClient
function instrumentKafkaClient(shim, kafkaModule, moduleName) {
// set name of the message broker, this is required
shim.setLibrary('Kafka')
var consumerProto = kafkaModule.Consumer.prototype
// wrap Consumer.on, so that everytime a message is received, a new
// transaction is started
shim.recordSubscribedConsume(consumerProto, ['on', 'addListener'], {
consumer: shim.LAST,
messageHandler: function(shim, consumer, name, args) {
var message = args[0]
var metadata = args[1]
// get topic name from metadata?
var topicName = 'TODO'
return {
destinationName: topicName,
destinationType: shim.TOPIC,
}
}
})
// instrument Producer.send, so that a new transaction segment is created when
// send() is called.
var producerProto = kafkaModule.Producer.prototype
shim.recordProduce(producerProto, 'send', function(shim, fn, name, args) {
return {
callback: shim.LAST,
destinationName: shim.FIRST,
destinationType: shim.TOPIC
}
})
}
// This example shows recording a segment for sending messages to Kafka using a separate
// instrumentation function.
const newrelic = require('newrelic')
const instrumentKafkaClient = require('./kafka-client-instrumentation')
// register instrumentation for the kafka client module
newrelic.instrumentMessages('./mock-kafka-client', instrumentKafkaClient,
function handleError(err) {
// this is called when the instrumentation itself errors
console.log(err)
})
// now requiring the kafka-client will result in the agent instrumenting it
const Producer = require('./mock-kafka-client').Producer
var producer = new Producer()
// calling producer.send() will only be traced if called in an existing transaction
// (e.g. as a part of a HTTP server). Here we will use the API to start a new background
// transaction to keep the example simple.
newrelic.startBackgroundTransaction('myProduceTransaction', function() {
var tx = newrelic.getTransaction()
producer.send('topic1', 'some message', function(err, metadata) {
tx.end()
})
})
// output the trace when transaction is finished, for debugging
newrelic.agent.on('transactionFinished', function(tx) {
console.log(JSON.stringify(tx.trace.root, null, 2))
})
// this example shows starting transactions within the user app itself
// in this case, the kafka module is not instrumented, but rather the app itself
const newrelic = require('newrelic')
// now requiring the kafka-client will result in the agent instrumenting it
const Consumer = require('./mock-kafka-client').Consumer
var consumer = new Consumer()
consumer.on('message', message => {
newrelic.startBackgroundTransaction('someName', function() {
var tx = newrelic.getTransaction()
// do some work to process the message, and then end the transaction
processMessage(message, function() {
tx.end()
})
})
})
function processMessage(message, callback) {
process.nextTick(callback)
}
// output the trace when transaction is finished, for debugging
newrelic.agent.on('transactionFinished', function(tx) {
console.log(JSON.stringify(tx.trace.root, null, 2))
})
// This example shows using custom instrumentation function.
// The kafka module is instrumented in a central place, separately from the app
// that uses it.
const newrelic = require('newrelic')
const instrumentKafkaClient = require('./kafka-client-instrumentation')
// register instrumentation for the kafka client module
newrelic.instrumentMessages('./mock-kafka-client', instrumentKafkaClient,
function handleError(err) {
// this is called when the instrumentation itself errors
console.log(err)
})
// now requiring the kafka-client will result in the agent instrumenting it
const Consumer = require('./mock-kafka-client').Consumer
var consumer = new Consumer()
consumer.on('message', message => {
// since there is no automatic way of detecting end of async message processing
// we need to get the transaction handle and call end
var tx = newrelic.getTransaction()
// do some work to process the message, and then end the transaction
processMessage(message, function() {
tx.end()
})
})
function processMessage(message, callback) {
process.nextTick(callback)
}
// output the trace when transaction is finished, for debugging
newrelic.agent.on('transactionFinished', function(tx) {
console.log(JSON.stringify(tx.trace.root, null, 2))
})