class LightBulb
attr_reader :state
def initialize
self.state = :off
end
def turn_on
self.state = :on
end
private
attr_writer :state
end
class LightBulb
attr_reader :state
def initialize
self.state = :off
end
def turn_on
self.state = :on
end
def turn_off
self.state = :off
end
private
attr_writer :state
end
class LightBulb
attr_reader :state
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
private
attr_writer :state
end
class LightBulb
def initialize
self.state = :off
end
def turn_on
state.turn_on(self)
end
def turn_off
state.turn_off(self)
end
def state=(state)
@state = LightBulbState.for(state)
end
def state
@state
end
end
class LightBulbState
def self.for(state)
if state == :on
LightBulbOnState
elsif state == :off
LightBulbOffState
end.new
end
end
class LightBulbOnState < LightBulbState
attr_reader :state
def initialize
@state = :on
end
def turn_on(light_bulb)
false
end
def turn_off(light_bulb)
light_bulb.state = :off
true
end
end
class LightBulbOffState < LightBulbState
attr_reader :state
def initialize
@state = :off
end
def turn_on(light_bulb)
light_bulb.state = :on
true
end
def turn_off(light_bulb)
false
end
end
class LightBulb
attr_reader :state
def initialize
self.state = :off
end
def turn_on
if state == :on
false
else
self.state = :on
true
end
end
def turn_off
if state == :off
false
else
self.state = :off
true
end
end
def dim
if state == :off
false
elsif state == :dimmed
self.state = :on
true
else
self.state = :dimmed
true
end
end
private
attr_writer :state
end
class LightBulb
def initialize
self.state = :off
end
def turn_on
state.turn_on(self)
end
def turn_off
state.turn_off(self)
end
def dim
state.dim(self)
end
def state=(state)
@state = LightBulbState.for(state)
end
def state
@state
end
end
class LightBulbState
def self.for(state)
if state == :on
LightBulbOnState
elsif state == :off
LightBulbOffState
elsif state == :dimmed
LightBulbDimmedState
end.new
end
end
class LightBulbOnState < LightBulbState
attr_reader :state
def initialize
@state = :on
end
def turn_on(light_bulb)
false
end
def turn_off(light_bulb)
light_bulb.state = :off
true
end
def dim(light_bulb)
light_bulb.state = :dimmed
true
end
end
class LightBulbOffState < LightBulbState
attr_reader :state
def initialize
@state = :off
end
def turn_on(light_bulb)
light_bulb.state = :on
true
end
def turn_off(light_bulb)
false
end
def dim(light_bulb)
false
end
end
class LightBulbDimmedState < LightBulbState
attr_reader :state
def initialize
@state = :dimmed
end
def turn_on(light_bulb)
light_bulb.state = :on
true
end
def turn_off(light_bulb)
light_bulb.state = :off
true
end
def dim(light_bulb)
light_bulb.state = :on
true
end
end
class WebSocket(object):
def connect(self, uri, origin=None, protocols=[]):
"""Establishes a new connection to a WebSocket server.
This method connects to the host specified by uri and
negotiates a WebSocket connection. origin should be specified
in accordance with RFC 6454 if known. A list of valid
sub-protocols can be specified in the protocols argument.
The data will be sent in the clear if the "ws" scheme is used,
and encrypted if the "wss" scheme is used.
Both WebSocketWantReadError and WebSocketWantWriteError can be
raised whilst negotiating the connection. Repeated calls to
connect() must retain the same arguments.
"""
self.client = True;
uri = urlparse(uri)
port = uri.port
if uri.scheme in ("ws", "http"):
if not port:
port = 80
elif uri.scheme in ("wss", "https"):
if not port:
port = 443
else:
raise Exception("Unknown scheme '%s'" % uri.scheme)
# This is a state machine in order to handle
# WantRead/WantWrite events
if self._state == "new":
self.socket = socket.create_connection((uri.hostname, port))
if uri.scheme in ("wss", "https"):
self.socket = ssl.wrap_socket(self.socket)
self._state = "ssl_handshake"
else:
self._state = "headers"
if self._state == "ssl_handshake":
self.socket.do_handshake()
self._state = "headers"
if self._state == "headers":
self._key = ''
for i in range(16):
self._key += chr(random.randrange(256))
if sys.hexversion >= 0x3000000:
self._key = bytes(self._key, "latin-1")
self._key = b64encode(self._key).decode("ascii")
path = uri.path
if not path:
path = "/"
self._queue_str("GET %s HTTP/1.1\r\n" % path)
self._queue_str("Host: %s\r\n" % uri.hostname)
self._queue_str("Upgrade: websocket\r\n")
self._queue_str("Connection: upgrade\r\n")
self._queue_str("Sec-WebSocket-Key: %s\r\n" % self._key)
self._queue_str("Sec-WebSocket-Version: 13\r\n")
if origin is not None:
self._queue_str("Origin: %s\r\n" % origin)
if len(protocols) > 0:
self._queue_str("Sec-WebSocket-Protocol: %s\r\n" % ", ".join(protocols))
self._queue_str("\r\n")
self._state = "send_headers"
if self._state == "send_headers":
self._flush()
self._state = "response"
if self._state == "response":
if not self._recv():
raise Exception("Socket closed unexpectedly")
if self._recv_buffer.find('\r\n\r\n'.encode("ascii")) == -1:
raise WebSocketWantReadError
(request, self._recv_buffer) = self._recv_buffer.split('\r\n'.encode("ascii"), 1)
request = request.decode("latin-1")
words = request.split()
if (len(words) < 2) or (words[0] != "HTTP/1.1"):
raise Exception("Invalid response")
if words[1] != "101":
raise Exception("WebSocket request denied: %s" % " ".join(words[1:]))
(headers, self._recv_buffer) = self._recv_buffer.split('\r\n\r\n'.encode("ascii"), 1)
headers = headers.decode('latin-1') + '\r\n'
headers = email.message_from_string(headers)
if headers.get("Upgrade", "").lower() != "websocket":
print(type(headers))
raise Exception("Missing or incorrect upgrade header")
accept = headers.get('Sec-WebSocket-Accept')
if accept is None:
raise Exception("Missing Sec-WebSocket-Accept header");
expected = sha1((self._key + self.GUID).encode("ascii")).digest()
expected = b64encode(expected).decode("ascii")
del self._key
if accept != expected:
raise Exception("Invalid Sec-WebSocket-Accept header");
self.protocol = headers.get('Sec-WebSocket-Protocol')
if len(protocols) == 0:
if self.protocol is not None:
raise Exception("Unexpected Sec-WebSocket-Protocol header")
else:
if self.protocol not in protocols:
raise Exception("Invalid protocol chosen by server")
self._state = "done"
return
raise Exception("WebSocket is in an invalid state")
class WebSocketState(object):
@staticmethod
def factory(state):
if state == "new":
return WebSocketNewState()
if state == "ssl_handshake":
return WebSocketSSLHandshakeState()
if state == "headers":
return WebSocketHeadersState()
if state == "send_headers":
return WebSocketSendHeadersState()
if state == "response":
return WebSocketResponseState()
if state == "done":
return WebSocketDoneState()
if state == "flush":
return WebSocketFlushState()
class WebSocketNewState(WebSocketState):
@property
def state(self):
return "new"
@staticmethod
def connect(web_socket, uri, origin=None, protocols=[]):
web_socket.client = True
uri = urlparse(uri)
port = uri.port
if uri.scheme in ("ws", "http"):
if not port:
port = 80
elif uri.scheme in ("wss", "https"):
if not port:
port = 443
else:
raise Exception("Unknown scheme '%s'" % uri.scheme)
web_socket.socket = socket.create_connection((uri.hostname, port))
if uri.scheme in ("wss", "https"):
web_socket.socket = ssl.wrap_socket(web_socket.socket)
web_socket.state = "ssl_handshake"
else:
web_socket.state = "headers"
class WebSocketSSLHandshakeState(WebSocketState):
@property
def state(self):
return "ssl_handshake"
@staticmethod
def connect(web_socket, uri, origin=None, protocols=[]):
web_socket.socket.do_handshake()
web_socket.state = "headers"
class WebSocketHeadersState(WebSocketState):
@property
def state(self):
return "headers"
@staticmethod
def connect(web_socket, uri, origin=None, protocols=[]):
web_socket._key = ''
for i in range(16):
web_socket._key += chr(random.randrange(256))
if sys.hexversion >= 0x3000000:
web_socket._key = bytes(web_socket._key, "latin-1")
web_socket._key = b64encode(web_socket._key).decode("ascii")
path = uri.path
if not path:
path = "/"
web_socket._queue_str("GET %s HTTP/1.1\r\n" % path)
web_socket._queue_str("Host: %s\r\n" % uri.hostname)
web_socket._queue_str("Upgrade: websocket\r\n")
web_socket._queue_str("Connection: upgrade\r\n")
web_socket._queue_str("Sec-WebSocket-Key: %s\r\n" % web_socket._key)
web_socket._queue_str("Sec-WebSocket-Version: 13\r\n")
if origin is not None:
web_socket._queue_str("Origin: %s\r\n" % origin)
if len(protocols) > 0:
web_socket._queue_str(
"Sec-WebSocket-Protocol: %s\r\n" % ", ".join(protocols))
web_socket._queue_str("\r\n")
web_socket.state = "send_headers"
class WebSocketSendHeadersState(WebSocketState):
@property
def state(self):
return "send_headers"
@staticmethod
def connect(web_socket, uri, origin=None, protocols=[]):
web_socket._flush()
web_socket.state = "response"
class WebSocketResponseState(WebSocketState):
@property
def state(self):
return "response"
@staticmethod
def connect(web_socket, uri, origin=None, protocols=[]):
if not web_socket._recv():
raise Exception("Socket closed unexpectedly")
if web_socket._recv_buffer.find('\r\n\r\n'.encode("ascii")) == -1:
raise WebSocketWantReadError
(request, web_socket._recv_buffer) = web_socket._recv_buffer.split(
'\r\n'.encode("ascii"), 1)
request = request.decode("latin-1")
words = request.split()
if (len(words) < 2) or (words[0] != "HTTP/1.1"):
raise Exception("Invalid response")
if words[1] != "101":
raise Exception("WebSocket request denied: %s" %
" ".join(words[1:]))
(headers, web_socket._recv_buffer) = web_socket._recv_buffer.split(
'\r\n\r\n'.encode("ascii"), 1)
headers = headers.decode('latin-1') + '\r\n'
headers = email.message_from_string(headers)
if headers.get("Upgrade", "").lower() != "websocket":
print(type(headers))
raise Exception("Missing or incorrect upgrade header")
accept = headers.get('Sec-WebSocket-Accept')
if accept is None:
raise Exception("Missing Sec-WebSocket-Accept header")
expected = sha1(
(web_socket._key + web_socket.GUID).encode("ascii")).digest()
expected = b64encode(expected).decode("ascii")
del web_socket._key
if accept != expected:
raise Exception("Invalid Sec-WebSocket-Accept header")
web_socket.protocol = headers.get('Sec-WebSocket-Protocol')
if len(protocols) == 0:
if web_socket.protocol is not None:
raise Exception("Unexpected Sec-WebSocket-Protocol header")
else:
if web_socket.protocol not in protocols:
raise Exception("Invalid protocol chosen by server")
web_socket.state = "done"
class WebSocketDoneState(WebSocketState):
@property
def state(self):
return "done"
@staticmethod
def connect(web_socket, uri, origin=None, protocols=[]):
raise Exception("WebSocket is in an invalid state")
class WebSocketFlushState(WebSocketState):
@property
def state(self):
return "flush"
@staticmethod
def connect(web_socket, uri, origin=None, protocols=[]):
raise Exception("WebSocket is in an invalid state")
class WebSocket(object):
@property
def state_obj(self):
return self._state
@property
def state(self):
return self._state.state
#
@state.setter
def state(self, value):
self._state = WebSocketState.factory(value)
#
@state.deleter
def state(self): # again, name must be the same
del self._state
def connect(self, uri, origin=None, protocols=[]):
self.state_obj.connect(self, uri, origin=None, protocols=[])