eprothro
11/6/2017 - 7:54 PM

Sidekiq shared queue stats

Prints stats about all queues being processed by Sidekiq

# Emit key statistics about Sidekiq queues to a stream.
#
require 'sidekiq/api'

class SidekiqQueuesPrinter
  attr_reader :format

  def initialize(opts={})
    @format = opts.fetch(:format){:plain}
  end

  def metadata
    {
      source: "sidekiq"
    }
  end

  def run
    @queues = nil
    queues.each do |q|
      puts formatter.new(metadata.merge(queue_stats(q))).inspect
    end
  end

  def queues
    @queues ||= build_queues
  end

  protected

  def formatter
    case format.to_sym
    when :json
      JsonFormatter
    else
      SimpleFormatter
    end
  end

  def build_queues
    # queues = Sidekiq::Queue.all
    # names = queues.map(&:name)

    # # ensure default queues are reported
    # # even if the queues don't exist right now
    # DEFAULT_QUEUE_NAMES.each do |q|
    #   queues << Sidekiq::Queue.new(q) unless names.include? q
    # end
    # queues
    queues = Sidekiq::Queue.all
    queues << Sidekiq::ScheduledSet.new
    queues << Sidekiq::RetrySet.new
    queues
  end

  def queue_stats(queue)
    {}.tap do |stats|
      stats[:queue] = queue.name
      stats[:size] = queue.size
      stats[:queuing_latency] = queue.latency if queue.respond_to?(:latency)
    end
  end

  class Formatter
    attr_accessor :hash
    # source
    # queue
    # size
    # queuing_latency
    def initialize(hash)
      @hash = hash
    end
  end

  class SimpleFormatter < Formatter
    def inspect
      string =  "#{hash[:queue].upcase}\n"
      string += "     size: #{hash[:size]}\n"
      string += "  latency: #{hash[:queuing_latency] || "n/a"}\n"
    end
  end

  class JsonFormatter < Formatter
    def inspect
      hash.to_json
    end
  end
end

opts = { format: :json }
SidekiqQueuesPrinter.new(opts).run