edubkendo
9/20/2013 - 9:15 PM

bounded_queue.rb

require "thread"

class BoundedQueue
  def initialize(max_size = :infinite)
    @lock  = Mutex.new
    @items = []
    @item_available = ConditionVariable.new
    @max_size = max_size
    @space_available = ConditionVariable.new
  end

  def push(obj, timeout=:never, &timeout_policy)
    timeout_policy ||= -> do
      raise "Push timed out"
    end
    wait_for_condition(
      @space_available,
      ->{!full?},
      timeout,
      timeout_policy) do

      @items.push(obj)
      @item_available.signal
    end
  end

  def pop(timeout = :never, &timeout_policy)
    timeout_policy ||= ->{nil}
    wait_for_condition(
      @item_available,
      ->{@items.any?},
      timeout,
      timeout_policy) do 
      
      @items.shift 
    end
  end

  private

  def full?
    return false if @max_size == :infinite
    @max_size <= @items.size
  end

  def wait_for_condition(
      cv, condition_predicate, timeout=:never, timeout_policy=->{nil})
    deadline = timeout == :never ? :never : Time.now + timeout
    @lock.synchronize do
      loop do
        cv_timeout = timeout == :never ? nil : deadline - Time.now
        if !condition_predicate.call && cv_timeout.to_f >= 0
          cv.wait(@lock, cv_timeout) 
        end
        if condition_predicate.call
          return yield
        elsif deadline == :never || deadline > Time.now
          next
        else
          return timeout_policy.call
        end
      end
    end
  end
end