#!/usr/bin/ruby
class Host
attr_reader *%i(name ip tags reported tn tmax dmax location gmond_started metrics)
def initialize(name, ip, tags, reported, tn, tmax, dmax, location, gmond_started, metrics)
@name = name
@ip = ip
@tags = tags
@reported = reported
@tn = tn
@tmax = tmax
@dmax = dmax
@location = location
@gmond_started = gmond_started
@metrics = metrics
end
def to_s
"name = #{self.name}, ip = #{self.ip}, tags = #{self.tags}, reported = #{self.reported}, tn = #{self.tn}, tmax = #{self.tmax}, dmax = #{self.dmax}, location = #{self.location}, gmond_started = #{self.gmond_started}, metrics = #{self.metrics}"
end
end
class Metric
attr_reader *%i(name val type units tn tmax dmax slope source group desc title)
def initialize(name, val, type, units, tn, tmax, dmax, slope, source, group, desc, title)
@name = name
@val = val
@type = type
@units = units
@tn = tn
@tmax = tmax
@dmax = dmax
@slope = slope
@source = source
@group = group
@desc = desc
@title = title
end
end
class GangliaParser
def initialize(on_message)
require 'rexml/document'
@on_message = on_message
end
def call(input)
xml = REXML::Document.new(input)
hosts = REXML::XPath.match(xml, "/GANGLIA_XML/CLUSTER/HOST").map do |h|
name = h.attribute("NAME").value
ip = h.attribute("IP").value
tags = h.attribute("TAGS").value
reported = h.attribute("REPORTED").value
tn = h.attribute("TN").value
tmax = h.attribute("TMAX").value
dmax = h.attribute("DMAX").value
location = h.attribute("LOCATION").value
gmond_started = h.attribute("GMOND_STARTED").value
metrics = h.get_elements("METRIC").map do |m|
metric_name = m.attribute("NAME").value
metric_val = m.attribute("VAL").value
metric_type = m.attribute("TYPE").value
metric_units = m.attribute("UNITS") ? m.attribute("UNITS").value : nil
metric_tn = m.attribute("TN") ? m.attribute("TN").value : nil
metric_tmax = m.attribute("TMAX") ? m.attribute("TMAX").value : nil
metric_dmax = m.attribute("DMAX") ? m.attribute("DMAX").value : nil
metric_slope = m.attribute("SLOPE") ? m.attribute("SLOPE").value : nil
metric_source = m.attribute("SOURCE") ? m.attribute("SOURCE").value : nil
extra_data = m.get_elements("EXTRA_DATA").first.get_elements("EXTRA_ELEMENT").map {|e| [e.attribute("NAME").value, e.attribute("VAL").value]}.to_h
Metric.new(metric_name, metric_val, metric_type, metric_units, metric_tn, metric_tmax, metric_dmax, metric_slope, metric_source, extra_data["GROUP"], extra_data["DESC"], extra_data["TITLE"])
end
Host.new(name, ip, tags, reported, tn, tmax, dmax, location, gmond_started, metrics)
end
hosts.each do |h|
h.metrics.each do |e|
data = {hostname: h.name, metric_name: e.name, value: e.val}.to_json
@on_message.call(data)
end unless h.metrics.nil?
end
end
end
module Fluent
class GangliaInput < Input
Plugin.register_input('ganglia', self)
def initialize
super
require 'fluent/timezone'
require 'socket'
require 'open3'
require 'json'
end
config_param :tag, :string, :default => nil
config_param :run_interval, :time, :default => nil
config_param :gmond_host, :string, :default => "localhost"
config_param :gmond_port, :string, :default => "8649"
def configure(conf)
super
if localtime = conf['localtime']
@localtime = true
elsif utc = conf['utc']
@localtime = false
end
if conf['timezone']
@timezone = conf['timezone']
Fluent::Timezone.validate!(@timezone)
end
if !@tag or !@run_interval
raise ConfigError, "'tag' and 'run_interval' option is required on ganglia input"
end
@parser = GangliaParser.new(method(:on_message))
end
def start
@finished = false
@thread = Thread.new(&method(:run_periodic))
end
def shutdown
@finished = true
@thread.join
end
def run_periodic
until @finished
begin
sleep @run_interval
sock = TCPSocket.open(@gmond_host, @gmond_port)
input = []
begin
while( (data = sock.recv_nonblock(100)) != "")
input << data
end
rescue Errno::EAGAIN
ensure
sock.close
end
@parser.call(input.join)
rescue
log.error "ganglia failed to run or shutdown child process", :error => $!.to_s, :error_class => $!.class.to_s
log.warn_backtrace $!.backtrace
end
end
end
private
def on_message(record)
tag = @tag
time = Engine.now
router.emit(tag, time, record)
rescue => e
log.error "ganglia failed to emit", :error => e.to_s, :error_class => e.class.to_s, :tag => tag, :record => Yajl.dump(record)
end
end
end