zousu
12/15/2015 - 8:52 AM

runner.rb

module MCollective
  class Runner
    attr_reader :state

    def initialize(configfile)
      begin
        @config = Config.instance
        @config.loadconfig(configfile) unless @config.configured
        @config.mode = :server
        @stats = PluginManager["global_stats"]
        @connection = PluginManager["connector_plugin"]

        # @state describes the current contextual state of the MCollective runner.
        # Valid states are:
        #   :running   - MCollective is alive and receiving messages from the middleware
        #   :stopping  - MCollective is shutting down and in the process of terminating
        #   :stopped   - MCollective is not running
        #   :pausing   - MCollective is going into it's paused state
        #   :unpausing - MCollective is waking up from it's paused state
        #   :paused    - MCollective is paused and not receiving messages but can be woken up
        @state = :stopped
        @exit_receiver_thread = false
        @registration_thread = nil
        @agent_threads = []

        @security = PluginManager["security_plugin"]
        @security.initiated_by = :node

        unless Util.windows?
          Signal.trap("USR1") do
            Thread.new do
              Log.info("Reloading all agents after receiving USR1 signal")
              @agents.loadagents
            end
          end

          Signal.trap("USR2") do
            Thread.new do
              Log.info("Cycling logging level due to USR2 signal")
              Log.cycle_level
            end
          end

          Signal.trap("WINCH") do
            Thread.new do
              Log.info("Reopening logfiles due to WINCH signal")
              Log.reopen
              Log.info("Reopened logfiles due to WINCH signal")
            end
          end
        else
          Util.setup_windows_sleeper
        end
      rescue => e
        Log.error("Failed to initialize MCollective runner.")
        Log.error(e)
        Log.error(e.backtrace.join("\n\t"))
        raise e
      end
    end

    # Deprecated
    def run
      Log.warn("The #run method has been deprecated. Use #main_loop instead.")
      main_loop
    end

    # The main runner loop
    def main_loop
      # Enter the main context
      @receiver_thread = start_receiver_thread
      loop do
        begin
          case @state
          when :stopping
            Log.debug("Stopping MCollective server")

            # If soft_shutdown has been enabled we wait for all running agents to
            # finish, one way or the other.
            if @config.soft_shutdown
              soft_shutdown
            end

            stop_threads
            @state = :stopped
            return

          # When pausing we stop the receiver thread but keep everything else alive
          # This means that running agents also run to completion.
          when :pausing
            Log.debug("Pausing MCollective server")
            stop_threads
            @state = :paused

          when :unpausing
            Log.debug("Unpausing MCollective server")
            start_receiver_thread
          end

          # prune dead threads from the agent_threads array
          @agent_threads.reject! { |t| !t.alive? }
          sleep 0.1
        rescue SignalException => e
          Log.info("Exiting after signal: #{e}")
          stop
        rescue => e
          Log.error("A failure occurred in the MCollective runner.")
          Log.error(e)
          Log.error(e.backtrace.join("\n\t"))
          stop
        end
      end
    end

    def stop
      @state = :stopping
    end

    def pause
      if @state == :running
        @state = :pausing
      else
        Log.error("Cannot pause MCollective while not in a running state")
      end
    end

    def resume
      if @state == :paused
        @state = :unpausing
      else
        Log.error("Cannot unpause MCollective when it is not paused")
      end
    end

    private

    def start_receiver_thread
      @state = :running
      @parent = Thread.current
      Thread.new do
        begin
          receiver_thread
        rescue Exception => e
          # When we are pausing the receiver thread will be killed.
          # If the thread raises an exception at any other time
          # reraise it in the main thread.
          if @state != :pausing
            @parent.raise(e)
          end
        end
      end
    end

    def stop_threads
      @receiver_thread.kill if @receiver_thread.alive?
      # Kill the registration thread if it was made and alive
      if @registration_thread && @registration_thread.alive?
        @registration_thread.kill
      end
    end

    def receiver_thread
      # Create internal connection in Connector
      @connection.connect

      # Subscribe to the direct addressing queue if direct_addressing is enabled
      if @config.direct_addressing
        Util.subscribe_to_direct_addressing_queue
      end

      #   Create the agents and let them create their subscriptions
      @agents ||= Agents.new

      #   Load data sources
      Data.load_data_sources

      # Start the registration plugin if interval isn't 0
      begin
        if @config.registerinterval != 0
          @registration_thread = PluginManager["registration_plugin"].run(@connection)
        end
      rescue Exception => e
        Log.error("Failed to start registration plugin: #{e}")
      end

      # Start the receiver loop
      loop do
        begin
          request = receive

          @agent_threads << agentmsg(request)
        rescue MsgTTLExpired => e
          Log.warn(e)

        rescue NotTargettedAtUs => e
          Log.debug("Message does not pass filters, ignoring")

        rescue MessageNotReceived, UnexpectedMessageType => e
          Log.warn(e)
          if e.backoff && @state != :stopping
            Log.info("sleeping for suggested #{e.backoff} seconds")
            sleep e.backoff
          end

        rescue Exception => e
          Log.warn("Failed to handle message: #{e} - #{e.class}\n")
          Log.warn(e.backtrace.join("\n\t"))
        end

        return if @exit_receiver_thread
      end
    end

    # Deals with messages directed to agents
    def agentmsg(request)
      Log.debug("Handling message for agent '#{request.agent}' on collective '#{request.collective}'")

      @agents.dispatch(request, @connection) do |reply_message|
        reply(reply_message, request) if reply_message
      end
    end

    # Receive a message from the connection handler
    def receive
      request = @connection.receive
      request.type = :request

      @stats.received

      request.decode!
      request.validate

      request
    end

    # Sends a reply to a specific target topic
    def reply(msg, request)
      msg = Message.new(msg, nil, :request => request)
      msg.encode!
      msg.publish

      @stats.sent
    end

    # Waits for all agent threads to complete
    # If soft_shutdown_timeout has been defined it will wait for the
    # configured grace period before killing all the threads
    def soft_shutdown
      timeout = @config.soft_shutdown_timeout

      if timeout && timeout <= 0
        Log.warn("soft_shutdown_timeout has been set to '#{timeout}'. soft_shutdown_timeout must be > 0")
        Log.warn("Shutting down normally.")
        return
      end

      if Util.windows?
        windows_soft_shutdown(timeout)
        return
      end

      posix_soft_shutdown(timeout)
    end

    # Implements soft shutdown on the Windows platform
    # Logs and returns without doing anything if a timeout
    # hasn't been specified since waiting for long running threads
    # to exit on Windows can put the MCollective service in a broken state
    def windows_soft_shutdown(timeout)
      if !timeout
        Log.warn("soft_shutdown specified but not soft_shutdown_timeout specified.")
        Log.warn("To enable soft_shutdown on windows a soft_shutdown_timeout must be specified.")
        Log.warn("Shutting down normally.")
        return
      end

      shutdown_with_timeout(timeout)
    end

    # Implements soft shutdown on posix systems
    def posix_soft_shutdown(timeout)
      if timeout
        shutdown_with_timeout(timeout)
        return
      end

      stop_agent_threads
    end

    def shutdown_with_timeout(timeout)
      Log.debug("Shutting down agents with a timeout of '#{timeout}' seconds")
      begin
        Timeout.timeout(timeout) do
          stop_agent_threads
        end
      rescue Timeout::Error
        Log.warn("soft_shutdown_timeout reached. Terminating all running agent threads.")
      end
    end

    def stop_agent_threads
      Log.debug("Waiting for all running agents to finish or timeout.")
      @agent_threads.each do |t|
        if t.alive?
          t.join
        end
      end
      Log.debug("All running agents have completed. Stopping.")
    end
  end
end