Thomascountz
6/15/2018 - 12:51 PM

Sample Code for Patterns as Hammers

class LightBulb
  attr_reader :state

  def initialize
    self.state = :off
  end
  
  def turn_on
    self.state = :on
  end
  
  private
  
  attr_writer :state
end
class LightBulb
  attr_reader :state
  
  def initialize
    self.state = :off
  end
  
  def turn_on
    self.state = :on
  end
  
  def turn_off
    self.state = :off
  end
  
  private
  
  attr_writer :state
end
class LightBulb
  attr_reader :state
  
  def initialize
    self.state = :off
  end
  
  def turn_on
    if state == :on
      false
    else
      self.state = :on
      true
    end
  end
  
  def turn_off
    if state == :off
      false
    else
      self.state = :off
      true
    end
  end
  
  private
  
  attr_writer :state
end
class LightBulb
  def initialize
    self.state = :off
  end

  def turn_on
    state.turn_on(self)
  end

  def turn_off
    state.turn_off(self)
  end

  def state=(state)
    @state = LightBulbState.for(state)
  end

  def state
    @state
  end
end

class LightBulbState
  def self.for(state)
    if state == :on
      LightBulbOnState
    elsif state == :off
      LightBulbOffState
    end.new
  end
end

class LightBulbOnState < LightBulbState
  attr_reader :state
  def initialize
    @state = :on
  end

  def turn_on(light_bulb)
    false
  end

  def turn_off(light_bulb)
    light_bulb.state = :off
    true
  end
end

class LightBulbOffState < LightBulbState
  attr_reader :state
  def initialize
    @state = :off
  end

  def turn_on(light_bulb)
    light_bulb.state = :on
    true
  end

  def turn_off(light_bulb)
    false
  end
end
class LightBulb
  attr_reader :state
  def initialize
    self.state = :off
  end

  def turn_on
    if state == :on
      false
    else
      self.state = :on
      true
    end
  end

  def turn_off
    if state == :off
      false
    else
      self.state = :off
      true
    end
  end

  def dim
    if state == :off
      false
    elsif state == :dimmed
      self.state = :on
      true
    else
      self.state = :dimmed
      true
    end
  end

  private

  attr_writer :state
end
class LightBulb
  def initialize
    self.state = :off
  end

  def turn_on
    state.turn_on(self)
  end

  def turn_off
    state.turn_off(self)
  end
  
  def dim
    state.dim(self)
  end

  def state=(state)
    @state = LightBulbState.for(state)
  end

  def state
    @state
  end
end

class LightBulbState
  def self.for(state)
    if state == :on
      LightBulbOnState
    elsif state == :off
      LightBulbOffState
    elsif state == :dimmed
      LightBulbDimmedState
    end.new
  end
end

class LightBulbOnState < LightBulbState
  attr_reader :state
  def initialize
    @state = :on
  end

  def turn_on(light_bulb)
    false
  end

  def turn_off(light_bulb)
    light_bulb.state = :off
    true
  end

  def dim(light_bulb)
    light_bulb.state = :dimmed
    true
  end
end

class LightBulbOffState < LightBulbState
  attr_reader :state
  def initialize
    @state = :off
  end

  def turn_on(light_bulb)
    light_bulb.state = :on
    true
  end

  def turn_off(light_bulb)
    false
  end
  def dim(light_bulb)
    false
  end
end

class LightBulbDimmedState < LightBulbState
  attr_reader :state
  def initialize
    @state = :dimmed
  end

  def turn_on(light_bulb)
    light_bulb.state = :on
    true
  end

  def turn_off(light_bulb)
    light_bulb.state = :off
    true
  end

  def dim(light_bulb)
    light_bulb.state = :on
    true
  end
end
class WebSocket(object):
    def connect(self, uri, origin=None, protocols=[]):
        """Establishes a new connection to a WebSocket server.
        This method connects to the host specified by uri and
        negotiates a WebSocket connection. origin should be specified
        in accordance with RFC 6454 if known. A list of valid
        sub-protocols can be specified in the protocols argument.
        The data will be sent in the clear if the "ws" scheme is used,
        and encrypted if the "wss" scheme is used.
        Both WebSocketWantReadError and WebSocketWantWriteError can be
        raised whilst negotiating the connection. Repeated calls to
        connect() must retain the same arguments.
        """

        self.client = True;

        uri = urlparse(uri)

        port = uri.port
        if uri.scheme in ("ws", "http"):
            if not port:
                port = 80
        elif uri.scheme in ("wss", "https"):
            if not port:
                port = 443
        else:
            raise Exception("Unknown scheme '%s'" % uri.scheme)

        # This is a state machine in order to handle
        # WantRead/WantWrite events

        if self._state == "new":
            self.socket = socket.create_connection((uri.hostname, port))

            if uri.scheme in ("wss", "https"):
                self.socket = ssl.wrap_socket(self.socket)
                self._state = "ssl_handshake"
            else:
                self._state = "headers"

        if self._state == "ssl_handshake":
            self.socket.do_handshake()
            self._state = "headers"

        if self._state == "headers":
            self._key = ''
            for i in range(16):
                self._key += chr(random.randrange(256))
            if sys.hexversion >= 0x3000000:
                self._key = bytes(self._key, "latin-1")
            self._key = b64encode(self._key).decode("ascii")

            path = uri.path
            if not path:
                path = "/"

            self._queue_str("GET %s HTTP/1.1\r\n" % path)
            self._queue_str("Host: %s\r\n" % uri.hostname)
            self._queue_str("Upgrade: websocket\r\n")
            self._queue_str("Connection: upgrade\r\n")
            self._queue_str("Sec-WebSocket-Key: %s\r\n" % self._key)
            self._queue_str("Sec-WebSocket-Version: 13\r\n")

            if origin is not None:
                self._queue_str("Origin: %s\r\n" % origin)
            if len(protocols) > 0:
                self._queue_str("Sec-WebSocket-Protocol: %s\r\n" % ", ".join(protocols))

            self._queue_str("\r\n")

            self._state = "send_headers"

        if self._state == "send_headers":
            self._flush()
            self._state = "response"

        if self._state == "response":
            if not self._recv():
                raise Exception("Socket closed unexpectedly")

            if self._recv_buffer.find('\r\n\r\n'.encode("ascii")) == -1:
                raise WebSocketWantReadError

            (request, self._recv_buffer) = self._recv_buffer.split('\r\n'.encode("ascii"), 1)
            request = request.decode("latin-1")

            words = request.split()
            if (len(words) < 2) or (words[0] != "HTTP/1.1"):
                raise Exception("Invalid response")
            if words[1] != "101":
                raise Exception("WebSocket request denied: %s" % " ".join(words[1:]))

            (headers, self._recv_buffer) = self._recv_buffer.split('\r\n\r\n'.encode("ascii"), 1)
            headers = headers.decode('latin-1') + '\r\n'
            headers = email.message_from_string(headers)

            if headers.get("Upgrade", "").lower() != "websocket":
                print(type(headers))
                raise Exception("Missing or incorrect upgrade header")

            accept = headers.get('Sec-WebSocket-Accept')
            if accept is None:
                raise Exception("Missing Sec-WebSocket-Accept header");

            expected = sha1((self._key + self.GUID).encode("ascii")).digest()
            expected = b64encode(expected).decode("ascii")

            del self._key

            if accept != expected:
                raise Exception("Invalid Sec-WebSocket-Accept header");

            self.protocol = headers.get('Sec-WebSocket-Protocol')
            if len(protocols) == 0:
                if self.protocol is not None:
                    raise Exception("Unexpected Sec-WebSocket-Protocol header")
            else:
                if self.protocol not in protocols:
                    raise Exception("Invalid protocol chosen by server")

            self._state = "done"

            return

        raise Exception("WebSocket is in an invalid state")
    class WebSocketState(object):
    @staticmethod
    def factory(state):
        if state == "new":
            return WebSocketNewState()
        if state == "ssl_handshake":
            return WebSocketSSLHandshakeState()
        if state == "headers":
            return WebSocketHeadersState()
        if state == "send_headers":
            return WebSocketSendHeadersState()
        if state == "response":
            return WebSocketResponseState()
        if state == "done":
            return WebSocketDoneState()
        if state == "flush":
            return WebSocketFlushState()


class WebSocketNewState(WebSocketState):
    @property
    def state(self):
        return "new"

    @staticmethod
    def connect(web_socket, uri, origin=None, protocols=[]):
        web_socket.client = True

        uri = urlparse(uri)

        port = uri.port
        if uri.scheme in ("ws", "http"):
            if not port:
                port = 80
        elif uri.scheme in ("wss", "https"):
            if not port:
                port = 443
        else:
            raise Exception("Unknown scheme '%s'" % uri.scheme)

        web_socket.socket = socket.create_connection((uri.hostname, port))

        if uri.scheme in ("wss", "https"):
            web_socket.socket = ssl.wrap_socket(web_socket.socket)
            web_socket.state = "ssl_handshake"
        else:
            web_socket.state = "headers"


class WebSocketSSLHandshakeState(WebSocketState):
    @property
    def state(self):
        return "ssl_handshake"

    @staticmethod
    def connect(web_socket, uri, origin=None, protocols=[]):
        web_socket.socket.do_handshake()
        web_socket.state = "headers"


class WebSocketHeadersState(WebSocketState):
    @property
    def state(self):
        return "headers"

    @staticmethod
    def connect(web_socket, uri, origin=None, protocols=[]):
        web_socket._key = ''
        for i in range(16):
            web_socket._key += chr(random.randrange(256))
        if sys.hexversion >= 0x3000000:
            web_socket._key = bytes(web_socket._key, "latin-1")
        web_socket._key = b64encode(web_socket._key).decode("ascii")

        path = uri.path
        if not path:
            path = "/"

        web_socket._queue_str("GET %s HTTP/1.1\r\n" % path)
        web_socket._queue_str("Host: %s\r\n" % uri.hostname)
        web_socket._queue_str("Upgrade: websocket\r\n")
        web_socket._queue_str("Connection: upgrade\r\n")
        web_socket._queue_str("Sec-WebSocket-Key: %s\r\n" % web_socket._key)
        web_socket._queue_str("Sec-WebSocket-Version: 13\r\n")

        if origin is not None:
            web_socket._queue_str("Origin: %s\r\n" % origin)
        if len(protocols) > 0:
            web_socket._queue_str(
                "Sec-WebSocket-Protocol: %s\r\n" % ", ".join(protocols))

        web_socket._queue_str("\r\n")

        web_socket.state = "send_headers"


class WebSocketSendHeadersState(WebSocketState):
    @property
    def state(self):
        return "send_headers"

    @staticmethod
    def connect(web_socket, uri, origin=None, protocols=[]):
        web_socket._flush()
        web_socket.state = "response"


class WebSocketResponseState(WebSocketState):
    @property
    def state(self):
        return "response"

    @staticmethod
    def connect(web_socket, uri, origin=None, protocols=[]):
        if not web_socket._recv():
            raise Exception("Socket closed unexpectedly")

        if web_socket._recv_buffer.find('\r\n\r\n'.encode("ascii")) == -1:
            raise WebSocketWantReadError

        (request, web_socket._recv_buffer) = web_socket._recv_buffer.split(
            '\r\n'.encode("ascii"), 1)
        request = request.decode("latin-1")

        words = request.split()
        if (len(words) < 2) or (words[0] != "HTTP/1.1"):
            raise Exception("Invalid response")
        if words[1] != "101":
            raise Exception("WebSocket request denied: %s" %
                            " ".join(words[1:]))

        (headers, web_socket._recv_buffer) = web_socket._recv_buffer.split(
            '\r\n\r\n'.encode("ascii"), 1)
        headers = headers.decode('latin-1') + '\r\n'
        headers = email.message_from_string(headers)

        if headers.get("Upgrade", "").lower() != "websocket":
            print(type(headers))
            raise Exception("Missing or incorrect upgrade header")

        accept = headers.get('Sec-WebSocket-Accept')
        if accept is None:
            raise Exception("Missing Sec-WebSocket-Accept header")

        expected = sha1(
            (web_socket._key + web_socket.GUID).encode("ascii")).digest()
        expected = b64encode(expected).decode("ascii")

        del web_socket._key

        if accept != expected:
            raise Exception("Invalid Sec-WebSocket-Accept header")

        web_socket.protocol = headers.get('Sec-WebSocket-Protocol')
        if len(protocols) == 0:
            if web_socket.protocol is not None:
                raise Exception("Unexpected Sec-WebSocket-Protocol header")
        else:
            if web_socket.protocol not in protocols:
                raise Exception("Invalid protocol chosen by server")

        web_socket.state = "done"


class WebSocketDoneState(WebSocketState):
    @property
    def state(self):
        return "done"

    @staticmethod
    def connect(web_socket, uri, origin=None, protocols=[]):
        raise Exception("WebSocket is in an invalid state")


class WebSocketFlushState(WebSocketState):
    @property
    def state(self):
        return "flush"

    @staticmethod
    def connect(web_socket, uri, origin=None, protocols=[]):
        raise Exception("WebSocket is in an invalid state")


class WebSocket(object):
    @property
    def state_obj(self):
        return self._state

    @property
    def state(self):
        return self._state.state
    #

    @state.setter
    def state(self, value):
        self._state = WebSocketState.factory(value)
    #

    @state.deleter
    def state(self):  # again, name must be the same
        del self._state

    def connect(self, uri, origin=None, protocols=[]):

        self.state_obj.connect(self, uri, origin=None, protocols=[])