totem3
3/8/2015 - 8:55 PM

in_ganglia.rb

#!/usr/bin/ruby 

class Host
  attr_reader *%i(name ip tags reported tn tmax dmax location gmond_started metrics)
  def initialize(name, ip, tags, reported, tn, tmax, dmax, location, gmond_started, metrics)
    @name = name
    @ip = ip
    @tags = tags
    @reported = reported
    @tn = tn
    @tmax = tmax
    @dmax = dmax
    @location = location
    @gmond_started = gmond_started
    @metrics = metrics
  end
  def to_s
    "name = #{self.name}, ip = #{self.ip}, tags = #{self.tags}, reported = #{self.reported}, tn = #{self.tn}, tmax = #{self.tmax}, dmax = #{self.dmax}, location = #{self.location}, gmond_started = #{self.gmond_started}, metrics = #{self.metrics}"
  end
end
class Metric
  attr_reader *%i(name val type units tn tmax dmax slope source group desc title)
  def initialize(name, val, type, units, tn, tmax, dmax, slope, source, group, desc, title)
    @name = name
    @val = val
    @type = type
    @units = units
    @tn = tn
    @tmax = tmax
    @dmax = dmax
    @slope = slope
    @source = source
    @group = group
    @desc = desc
    @title = title
  end
end

class GangliaParser
  def initialize(on_message)
    require 'rexml/document'
    @on_message = on_message
  end
  def call(input)
    xml = REXML::Document.new(input)
    hosts = REXML::XPath.match(xml, "/GANGLIA_XML/CLUSTER/HOST").map do |h|
      name = h.attribute("NAME").value
      ip = h.attribute("IP").value
      tags = h.attribute("TAGS").value
      reported = h.attribute("REPORTED").value
      tn = h.attribute("TN").value
      tmax = h.attribute("TMAX").value
      dmax = h.attribute("DMAX").value
      location = h.attribute("LOCATION").value
      gmond_started = h.attribute("GMOND_STARTED").value
      metrics = h.get_elements("METRIC").map do |m|
        metric_name   = m.attribute("NAME").value
        metric_val    = m.attribute("VAL").value
        metric_type   = m.attribute("TYPE").value
        metric_units  = m.attribute("UNITS")  ? m.attribute("UNITS").value  : nil
        metric_tn     = m.attribute("TN")     ? m.attribute("TN").value     : nil
        metric_tmax   = m.attribute("TMAX")   ? m.attribute("TMAX").value   : nil
        metric_dmax   = m.attribute("DMAX")   ? m.attribute("DMAX").value   : nil
        metric_slope  = m.attribute("SLOPE")  ? m.attribute("SLOPE").value  : nil
        metric_source = m.attribute("SOURCE") ? m.attribute("SOURCE").value : nil
        extra_data = m.get_elements("EXTRA_DATA").first.get_elements("EXTRA_ELEMENT").map {|e| [e.attribute("NAME").value,  e.attribute("VAL").value]}.to_h
        Metric.new(metric_name, metric_val, metric_type, metric_units, metric_tn, metric_tmax, metric_dmax, metric_slope, metric_source, extra_data["GROUP"], extra_data["DESC"], extra_data["TITLE"])
      end

      Host.new(name, ip, tags, reported, tn, tmax, dmax, location, gmond_started, metrics)
    end
    hosts.each do |h|
      h.metrics.each do |e|
        data = {hostname: h.name, metric_name: e.name, value: e.val}.to_json
        @on_message.call(data)
      end unless h.metrics.nil?
    end
  end
end

module Fluent
  class GangliaInput < Input
    Plugin.register_input('ganglia', self)

    def initialize
      super
      require 'fluent/timezone'
      require 'socket'
      require 'open3'
      require 'json'
    end

    config_param :tag, :string, :default => nil
    config_param :run_interval, :time, :default => nil
    config_param :gmond_host, :string, :default => "localhost"
    config_param :gmond_port, :string, :default => "8649"

    def configure(conf)
      super

      if localtime = conf['localtime']
        @localtime = true
      elsif utc = conf['utc']
        @localtime = false
      end

      if conf['timezone']
        @timezone = conf['timezone']
        Fluent::Timezone.validate!(@timezone)
      end

      if !@tag or !@run_interval
        raise ConfigError, "'tag' and 'run_interval' option is required on ganglia input"
      end

      @parser = GangliaParser.new(method(:on_message))
    end

    def start
      @finished = false
      @thread = Thread.new(&method(:run_periodic))
    end

    def shutdown
      @finished = true
      @thread.join
    end

    def run_periodic
      until @finished
        begin
          sleep @run_interval
          sock = TCPSocket.open(@gmond_host, @gmond_port)
          input = []
          begin
            while( (data = sock.recv_nonblock(100)) != "")
              input << data
            end
            rescue Errno::EAGAIN
            ensure
              sock.close
          end
          @parser.call(input.join)
        rescue
          log.error "ganglia failed to run or shutdown child process", :error => $!.to_s, :error_class => $!.class.to_s
          log.warn_backtrace $!.backtrace
        end
      end
    end

    private

    def on_message(record)
      tag = @tag
      time = Engine.now

      router.emit(tag, time, record)
    rescue => e
      log.error "ganglia failed to emit", :error => e.to_s, :error_class => e.class.to_s, :tag => tag, :record => Yajl.dump(record)
    end
  end
end