martinkuba
8/17/2017 - 7:35 PM

New Relic Node Agent - message instrumentation examples

New Relic Node Agent - message instrumentation examples

// This is a mock of a Kafka client module.  It exports a Consumer and a Producer
// class with a very basic interface that we would want to instrument.

var util = require('util')
var EventEmitter = require('events').EventEmitter

function Consumer() {
  EventEmitter.call(this)
  var consumer = this

  // simulate sending message to consumers every second
  setInterval(function() {
    consumer.emit('message', 'some message')
  }, 1000)
}
util.inherits(Consumer, EventEmitter)


function Producer() {
  EventEmitter.call(this)
}
util.inherits(Producer, EventEmitter)

Producer.prototype.send = function(topic, message, callback) {
  // simulate work to send the message
  setTimeout(function() {
    var metadata = {}
    callback(null, metadata)
  }, 100)
}


module.exports = {
  Consumer: Consumer,
  Producer: Producer
}
// This file contains the instrumentation function for the Kafka client module.
// This file could be part of a completely separate module from the app, reusable
// between multiple apps that use the Kafka client.

module.exports = instrumentKafkaClient

function instrumentKafkaClient(shim, kafkaModule, moduleName) {
  // set name of the message broker, this is required
  shim.setLibrary('Kafka')

  var consumerProto = kafkaModule.Consumer.prototype

  // wrap Consumer.on, so that everytime a message is received, a new
  // transaction is started
  shim.recordSubscribedConsume(consumerProto, ['on', 'addListener'], {
    consumer: shim.LAST,
    messageHandler: function(shim, consumer, name, args) {
      var message = args[0]
      var metadata = args[1]

      // get topic name from metadata?
      var topicName = 'TODO'

      return {
        destinationName: topicName,
        destinationType: shim.TOPIC,
      }
    }
  })

  // instrument Producer.send, so that a new transaction segment is created when
  // send() is called.
  var producerProto = kafkaModule.Producer.prototype
  shim.recordProduce(producerProto, 'send', function(shim, fn, name, args) {
    return {
      callback: shim.LAST,
      destinationName: shim.FIRST,
      destinationType: shim.TOPIC
    }
  })
}
// This example shows recording a segment for sending messages to Kafka using a separate
// instrumentation function.

const newrelic = require('newrelic')
const instrumentKafkaClient = require('./kafka-client-instrumentation')

// register instrumentation for the kafka client module
newrelic.instrumentMessages('./mock-kafka-client', instrumentKafkaClient,
  function handleError(err) {
    // this is called when the instrumentation itself errors
    console.log(err)
  })

// now requiring the kafka-client will result in the agent instrumenting it
const Producer = require('./mock-kafka-client').Producer

var producer = new Producer()

// calling producer.send() will only be traced if called in an existing transaction
// (e.g. as a part of a HTTP server). Here we will use the API to start a new background
// transaction to keep the example simple.
newrelic.startBackgroundTransaction('myProduceTransaction', function() {
  var tx = newrelic.getTransaction()

  producer.send('topic1', 'some message', function(err, metadata) {
    tx.end()
  })
})

// output the trace when transaction is finished, for debugging
newrelic.agent.on('transactionFinished', function(tx) {
  console.log(JSON.stringify(tx.trace.root, null, 2))
})
// this example shows starting transactions within the user app itself
// in this case, the kafka module is not instrumented, but rather the app itself

const newrelic = require('newrelic')

// now requiring the kafka-client will result in the agent instrumenting it
const Consumer = require('./mock-kafka-client').Consumer

var consumer = new Consumer()

consumer.on('message', message => {
  newrelic.startBackgroundTransaction('someName', function() {
    var tx = newrelic.getTransaction()

    // do some work to process the message, and then end the transaction
    processMessage(message, function() {
      tx.end()
    })
  })
})

function processMessage(message, callback) {
  process.nextTick(callback)
}

// output the trace when transaction is finished, for debugging
newrelic.agent.on('transactionFinished', function(tx) {
  console.log(JSON.stringify(tx.trace.root, null, 2))
})
// This example shows using custom instrumentation function.
// The kafka module is instrumented in a central place, separately from the app
// that uses it.
const newrelic = require('newrelic')
const instrumentKafkaClient = require('./kafka-client-instrumentation')

// register instrumentation for the kafka client module
newrelic.instrumentMessages('./mock-kafka-client', instrumentKafkaClient,
  function handleError(err) {
    // this is called when the instrumentation itself errors
    console.log(err)
  })

// now requiring the kafka-client will result in the agent instrumenting it
const Consumer = require('./mock-kafka-client').Consumer

var consumer = new Consumer()

consumer.on('message', message => {
  // since there is no automatic way of detecting end of async message processing
  // we need to get the transaction handle and call end
  var tx = newrelic.getTransaction()

  // do some work to process the message, and then end the transaction
  processMessage(message, function() {
    tx.end()
  })
})

function processMessage(message, callback) {
  process.nextTick(callback)
}

// output the trace when transaction is finished, for debugging
newrelic.agent.on('transactionFinished', function(tx) {
  console.log(JSON.stringify(tx.trace.root, null, 2))
})