hivefans
7/1/2014 - 10:24 AM

out_reloadable_copy.rb

#
# Fluent
#
# Copyright (C) 2011 FURUHASHI Sadayuki
#
#    Licensed under the Apache License, Version 2.0 (the "License");
#    you may not use this file except in compliance with the License.
#    You may obtain a copy of the License at
#
#        http://www.apache.org/licenses/LICENSE-2.0
#
#    Unless required by applicable law or agreed to in writing, software
#    distributed under the License is distributed on an "AS IS" BASIS,
#    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#    See the License for the specific language governing permissions and
#    limitations under the License.
#
module Fluent
  class ReloadableCopyOutput < MultiOutput
    Plugin.register_output('reloadable_copy', self)

    config_param :deep_copy, :bool, :default => false
    config_param :my_config_path, :string

    def initialize
      super
      @q = Queue.new

      Signal.trap :INT do
        $log.warn 'reloadable_copy: reload my config start'
        shutdown
        load_my_config
        start
        $log.warn 'reloadable_copy: reload my config end'
      end
    end

    attr_reader :outputs

    def configure(conf)
      super

      load_my_config
    end

    def start
      @thread = Thread.new(&method(:run))

      @outputs.each {|o|
        o.start
      }
    rescue
      $log.warn "raises exception: #{$!.class}, '#{$!.message}"
    end

    def shutdown
      Thread.kill(@thread)

      @outputs.each {|o|
        o.shutdown
      }
    end

    def emit(tag, es, chain)
      param = OpenStruct.new
      param.tag = tag
      param.es = es
      param.chain = chain

      @q.push param
    end

    private

    def load_my_config
      @outputs = []

      path = File.expand_path(@my_config_path)
      my_conf = File.open(path) { |io|
        fname = File.basename(path)
        basepath = File.dirname(path)

        if fname =~ /\.rb$/
          require 'fluent/config/dsl'
          Config::DSL::Parser.parse(io, File.join(basepath, fname))
        else
          Config.parse(io, fname, basepath, false)
        end
      }

      my_conf.elements.select {|e|
        e.name == 'store'
      }.each {|e|
        type = e['type']
        unless type
          raise ConfigError, "Missing 'type' parameter on <store> directive"
        end
        log.debug "adding store type=#{type.dump}"

        output = Plugin.new_output(type)
        output.configure(e)
        @outputs << output
      }

    end

    def run
      loop do
        param = @q.pop
        tag = param.tag
        es = param.es
        chain = param.chain

        begin
          unless es.repeatable?
            m = MultiEventStream.new
            es.each {|time,record|
              m.add(time, record)
            }
            es = m
          end
          if @deep_copy
            chain = CopyOutputChain.new(@outputs, tag, es, chain)
          else
            chain = OutputChain.new(@outputs, tag, es, chain)
          end
          chain.next
        rescue
          $log.warn "raises exception: #{$!.class}, '#{$!.message}, #{param}'"
        end
      end
    end
  end
end