bogdanrada
1/24/2015 - 6:47 PM

workers.rb

require 'rubygems'
require 'bundler/setup'
require 'celluloid'

WORKERS_COUNT = (ARGV[0] || 1).to_i

class Output
  include Celluloid
  
  def puts(msg)
    Kernel.puts(msg)
  end
end

class Worker
  include Celluloid
  
  def initialize(source)
    async.run(source)
  end
  
  def run(source)
    loop do
      job = source.get_job()
      if job == :exit
        break
      elsif job != nil
        Actor[:out].puts "[#{thread_id}] Got job '#{job}'"
        sleep(0.5)
        Actor[:out].puts "[#{thread_id}] Completed job '#{job}'"
      end
    end
    
    Actor[:out].puts "[#{thread_id}] Done."
    source.async.work_completed()
  end

private
  def thread_id
    '%x' % Thread.current.object_id
  end

end



class Store
  include Celluloid
  
  def initialize(*content)
    @left = content
    @done = false
    @workers_left = WORKERS_COUNT
  end
  
  def get_job
    if @left.empty? && @done
      :exit
    else
      if @left.empty?
        wait(:new_jobs)
      end
      
      @left.shift
    end
  end
  
  def add_job(value)
    @left << value
    signal(:new_jobs)
  end
    
  def done
    @done = true
    signal(:new_jobs)
  end
  
  def work_completed
    @workers_left -= 1
    if @workers_left == 0
      signal(:all_job_completed)
    end
  end
  
  def wait_completion
    wait(:all_job_completed)
    Actor[:out].puts "Shutting down..."
  end
  
end

Output.supervise_as(:out)

started_at = Time.now

store = Store.new("job 0", "job 1", "job 3", "job 4")

if WORKERS_COUNT == 1
  Worker.new(store)
else
  Worker.pool(size: WORKERS_COUNT, args: [store])
end

Thread.new do
  sleep 2
  10.times{|n| store.add_job("new job #{n}") }
  sleep 2
  store.done()
end


store.wait_completion()
elapsed = '%.2f' % (Time.now - started_at)
puts "Completd in #{elapsed} seconds"
require 'rubygems'
require 'bundler/setup'
require 'celluloid'

WORKERS_COUNT = (ARGV[0] || 1).to_i

class Output
  include Celluloid
  
  def puts(msg)
    Kernel.puts(msg)
  end
end

class Worker
  include Celluloid
  
  def initialize(source)
    async.run(source)
  end
  
  def run(source)
    loop do
      job = source.get_job()
      break unless job
      Actor[:out].puts "[#{thread_id}] Got job '#{job}'"
      sleep(0.5)
      Actor[:out].puts "[#{thread_id}] Completed job '#{job}'"
    end
    
    Actor[:out].puts "[#{thread_id}] Done."
    source.async.work_completed()
  end

private
  def thread_id
    '%x' % Thread.current.object_id
  end

end



class Store
  include Celluloid
  
  def initialize(*content)
    @left = content
    @done = false
    @workers_left = WORKERS_COUNT
  end
  
  def get_job
    if !@done && @left.empty?
      wait(:new_jobs)
    end
    
    @left.shift
  end
  
  def add_job(value)
    @left << value
  end
  
  def resume
    signal(:new_jobs)
  end
  
  def done
    @done = true
    resume()
  end
  
  def work_completed
    @workers_left -= 1
    if @workers_left == 0
      signal(:all_job_completed)
    end
  end
  
  def wait_completion
    wait(:all_job_completed)
    Actor[:out].puts "Shutting down..."
  end
  
end

Output.supervise_as(:out)

started_at = Time.now

store = Store.new("job 0", "job 1", "job 3", "job 4")

if WORKERS_COUNT == 1
  Worker.new(store)
else
  Worker.pool(size: WORKERS_COUNT, args: [store])
end

Thread.new do
  sleep 2
  10.times{|n| store.add_job("new job #{n}") }
  store.resume()
  sleep 2
  store.done()
end


store.wait_completion()
elapsed = '%.2f' % (Time.now - started_at)
puts "Completd in #{elapsed} seconds"
require 'celluloid'

WORKERS_COUNT = (ARGV[0] || 1).to_i

class Output
  include Celluloid
  
  def puts(msg)
    Kernel.puts(msg)
  end
end

class Worker
  include Celluloid
  
  def initialize(source)
    async.run(source)
  end
  
  def run(source)
    loop do
      job = source.get_job()
      break unless job
      Actor[:out].puts "[#{thread_id}] Got job '#{job}'"
      sleep(1)
      Actor[:out].puts "[#{thread_id}] Completed job '#{job}'"
    end
    
    Actor[:out].puts "[#{thread_id}] Done."
    source.async.work_completed()
  end

private
  def thread_id
    '%x' % Thread.current.object_id
  end

end



class Store
  include Celluloid
  
  def initialize(*content)
    @left = content
    @workers_left = WORKERS_COUNT
  end
  
  def get_job
    @left.shift
  end
  
  def work_completed
    @workers_left -= 1
    if @workers_left == 0
      signal(:all_job_completed)
    end
  end
  
  def wait_completion
    wait(:all_job_completed)
    Actor[:out].puts "Shutting down..."
  end
  
end

Output.supervise_as(:out)

started_at = Time.now

store = Store.new("a", 45, "hu", 6, 8, 9)

if WORKERS_COUNT == 1
  Worker.new(store)
else
  Worker.pool(size: WORKERS_COUNT, args: [store])
end

store.wait_completion()
elapsed = '%.2f' % (Time.now - started_at)
puts "Completd in #{elapsed} seconds"