rbngzlv
6/7/2012 - 3:47 AM

Example of using piece_pipe to generate a health summary of multiple nuke plants based on reactor leakage.

Example of using piece_pipe to generate a health summary of multiple nuke plants based on reactor leakage.

# Given an id, load and produce a Person instance
class LoadPersonById < PiecePipe::Step
  def process(person_id)
    produce Person.find(id)
  end
end
# Do not proceed with the processing of a Person unless they're local
class KeepLocals < PiecePipe::Step
  def process(person)
    produce person if person.lives_in_our_neighborhood?
  end
end
# For any given Person, continue processing that Person plus their family members
class IncludeFamilyMembers < PiecePipe::Step
  def process(person)
    produce person
    person.family_members.each do |fam_member|
      produce fam_member
    end
  end
end
# For any given power plant, determine the worst reactor. 
# Implemented as an AssemblyStep that analyzes inputs[:power_plant] from the prior Step,
# and installs a new key/val pair for :worst_reactor.  
class FindWorstReactor < PiecePipe::AssemblyStep
  def receive(inputs)
    # Figure out which reactor has the highest radiation levels.
    # "install" works a lot like "produce", but rather than take responsibility for the totality
    # of the produced object, we're just saying "add :worst_reactor to whatever's there and pass it on".
    install worst_reactor: inputs[:power_plant].reactors.reject(&:offline?).max_by(:&radiation)
  end
end
class NuclearPowerPlantHealthSummaryGenerator
  def generate(region)
    PiecePipe::Pipeline.new.
      source([{region: region}]).
      step(FetchPowerPlantsByRegion).
      step(FindWorstReactor).
      step(DetermineStatusClass).
      step(BuildPlantHealthSummary).
      step(SortByRadiationLevelsDescending).
      collect(:plant_health_summary).
      to_enum
  end

  # Custom Step, overriding #generate_sequence to produce a sequence of power plants for a given region.
  # Each power plant is "produced" as a Hash with one key (so far) heading down the pipeline.
  class FetchPowerPlantsByRegion < PiecePipe::Step
    def generate_sequence(inputs)
      # The expected interface of any Step's soure is that it supports #to_enum.  
      source.to_enum.each do |inputs|
        inputs[:region].power_plants.those_still_open.each do |power_plant|
          produce power_plant: power_plant  # Each call to #produce sends another object down the pipe
        end
      end
    enb
  end

  # For any given power plant, determine the worst reactor.
  # Implemented as an AssemblyStep that analyzes inputs[:power_plant] from the prior Step,
  # and installs a new key/val pair for :worst_reactor.  
  class FindWorstReactor < PiecePipe::AssemblyStep
    def receive(inputs)
      # Figure out which reactor has the highest radiation levels.
      # "install" works a lot like "produce", but rather than take responsibility for the totality
      # of the produced object, we're just saying "add :worst_reactor to whatever's there and pass it on".
      install worst_reactor: inputs[:power_plant].reactors.reject(&:offline?).max_by(:&radiation)
    end
  end

  # Figure out which CSS class corresonds to the radiation from the worst reactor.
  # (At this point, the inputs Hash has keys :region, :power_plant,  and :worst_reactor.)
  class DetermineStatusClass < PiecePipe::AssemblyStep
    def receive(inputs)
      install highlight_css_class: StatusFormatters.determine_css_class(inputs[:worst_reactor].radiation)
    end
  end

  # Composite our details into a line-item structure for our report.
  # Even though we consume most of the interesting values that arrive in the inputs Hash,
  # we're letting them ride as we simply install one more key, :plant_health_summary.
  # (This comes in handy, as we intend to sort these structures in a later step, using values
  # that are present in our transient input Hash, but NOT actually available in the 
  # report structure.)
  class BuildPlantHealthSummary < PiecePipe::AssemblyStep
    def receive(inputs)
      power_plant = inputs[:power_plant]
      worst_reactor = inputs[:worst_reactor]
      install plant_health_summary: PlantHealthSummary.new(
        power_plant_id: power_plant.id,
        supervisor: power_plant.supervisor,
        reactor_name: worst_reactor.name,
        radiation: StatusFormatters.format_radiation(worst_reactor.radiation),
        css_class: inputs[:highlight_css_class]
      )
    end
  end

  # Sort all the values that come through the pipe based on the radiation of the worst reactor
  # in each power_plant.
  # Notice this is not an AssemblyStep, and we're overriding #generate_sequence again, this time
  # because we're implementing a sink.  The resulting downstream objects have the same structure
  # they arrived with.
  class SortByRadiationLevelsDescending < PiecePipe::Step
    def generate_sequence
      source.to_enum.sort_by do |inputs|
        inputs[:worst_reactor].radiation
      end.each do |inputs|
        produce inputs
      end
    end
  end

  #... and that's it. Recall that the pipeline terminates with .collect(:plant_health_summary), which
  # is shorthand for a special Step that accepts Hashes and uses #produce to spit out only the specified
  # objects.  Downstream of our #collect, only the PlantHealthSummary remains.

end
PiecePipe::Pipeline.new.
  source([{region: region}]).
  step(FetchPowerPlantsByRegion).
  step(FindWorstReactor).
  step(DetermineStatusClass).
  step(BuildPlantHealthSummary).
  step(SortByRadiationLevelsDescending).
  collect(:plant_health_summary).
  to_enum