ruxo
5/18/2016 - 4:22 AM

Ruby: IPC Task Scheduler

Ruby: IPC Task Scheduler

require 'uuidtools'
require 'fp'

class EaTaskSchedule < ActiveRecord::Base
  attr_protected :created_at
  
  self.primary_key = 'id'

  def self.schedule(time, task_name, module_object, method_name, parameter = nil)
    fail if task_name.blank? or !module_object.present? or method_name.blank? or !method_name.respond_to?(:to_s)
    now = DateTime.current
    schedule_at = (time.class == ActiveSupport::Duration ? (now + time) : (time || now)).utc
    module_name = module_object.class == String ? module_object : module_object.name

    task = EaTaskSchedule.new(
        task_name: task_name,
        module: module_name,
        method_name: method_name.to_s,
        json_param: parameter.try(&:to_json),
        schedule_at: schedule_at
    )
    task.id = UUIDTools::UUID.random_create.hexdigest
    task.save
    task
  end

  MAX_EXEC_PER_SCAN = 10000
  # void => integer
  def self.execute_scheduled_tasks
    now = DateTime.current.utc
    self.where(executed_at: nil)
        .where('schedule_at <= ?', now)
        .order(schedule_at: :asc)
        .limit(MAX_EXEC_PER_SCAN)
        .each(&:execute)
        .count
  end

  def execute
    return self.json_result if self.executed_at.present?

    self.attempt += 1

    result = call_method
    now = DateTime.current.utc

    if result.succeeded
      self.json_result = result.get
      self.executed_at = now
      self.last_error = nil
    else
      e = result.get
      self.last_error = [e.inspect].concat(e.backtrace).join("\n")[0..250]
      self.executed_at = now  # just stop execution for now. Should we have retry logic? not sure..
    end

    save
    self.json_result
  end

  private

  def call_method
    module_obj = Object.const_get(self.module)
    result = if self.json_param
               module_obj.send(self.method_name, JSON.parse(self.json_param).with_indifferent_access)
             else
               module_obj.send(self.method_name)
             end
    FP::Right.new(result.try &:to_json)
  rescue => e
    FP::Left.new(e)
  end
end
class CreateTaskSchedule < ActiveRecord::Migration
  def change
    create_table :ea_task_schedules, id: false do |t|
      t.string :id, limit: 32, primary: true, null: false
      t.string :task_name, limit: 40, null: false, index: true
      t.string :module, limit: 60, null: false
      t.string :method_name, limit: 40, null: false
      t.text :json_param
      t.text :json_result
      t.datetime :schedule_at, default: nil, index: true
      t.datetime :executed_at, default: nil
      t.integer :attempt, null: false, default: 0
      t.text :last_error, limit: 255

      t.timestamps
    end
    add_index :ea_task_schedules, [:id]
    add_index :ea_task_schedules, [:created_at]
    add_index :ea_task_schedules, [:executed_at, :schedule_at]
  end
end
module FP
  class Either
    def initialize(data)
      @data = data
    end
    def get
      @data
    end
  end
  class Right < Either
    def initialize(data)
      super
    end
    def map
      Right(yield(@data))
    end
    def chain
      yield(@data)
    end
    def succeeded
      true
    end
    def failed
      false
    end
  end
  class Left < Either
    def initialize(data)
      super
    end
    def map(&_)
      self
    end
    def chain(&_)
      self
    end
    def succeeded
      false
    end
    def failed
      true
    end
  end
end