#!/usr/bin/env python
"""
This simple Python program implements a gossip network protocol. It is
intended only for fun and learning.
Written and directed by Stephane Bortzmeyer
<bortz@users.sourceforge.net> Licence as liberal as you want but no
warranty.
Gossip protocols are network protocols where each machine, each
*peer*, does not have a complete list of all peers. Instead, it knows
only a subset of them. In order to spread a message to all the peers,
every "gossiper" transmits the message to all the peers it knows, in
turn, each transmits it to all the peers it knows and so on. As long
as the set of peers is connected, the message will eventually reach
everyone.
The best-known examples of gossip protocols are the Network News (RFC
1036) and BGP (RFC 4271).
An important part of a gossip protocol is the history: peers must
remember which messages they sent, to avoid wasting time (or, worse,
creating endless loops) with peers which already know the message.
Note that a successful gossip protocol does not require every pair of
peers to communicate by the same means (Network News is a good
example: not everyone uses NNTP). But, in this simple example, the
protocol between two peers is fixed. Every peer has an ID, set at
startup. The "server" (the peer which replied to the connection) sends
its ID followed by a comma. The "client" (the peer which initiated the
connection) sends its ID followed by a comma and by the message (one
line only). In this implementation, for each peer, only a tuple (IP
address, port) is used to connect but it is not imposed by the
protocol (machines are identified by the ID, not by the IP address).
Peers remember the messages they have seen (in the global history) and
the messages they sent to each peer (in a per-peer history).
To use it, see the output of the -h option.
"""
import socket
import SocketServer
import time
import sys
import os
import signal
import threading
import Queue
import re
import getopt
import random
DEFAULT_PORT = 30480
DEFAULT_TIMESTAMP_DELAY = 300
DEFAULT_SIMULATION_DELAY = 60
DEFAULT_RETRY_DELAY = 120
DEFAULT_RETRY_ATTEMPTS = 10
DEFAULT_MAX_DISPLAY = 40
def current_time():
return time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(time.time()))
def log(msg, peer=None, size=None):
if peer is None:
peer_str = ""
else:
peer_str = "%s -" % peer[0]
if size is None:
size_str = ""
else:
size_str = " %i bytes - " % size
sys.stdout.write("%s %s%s%s\n" % (current_time(), peer_str, size_str,
msg))
def usage(msg=None):
sys.stderr.write("Usage: %s -i N [-p N] peer...\n" % sys.argv[0])
sys.stderr.write("Use -h to get detailed help\n")
if msg is not None:
sys.stderr.write("%s\n" % msg)
def help(short=True):
sys.stdout.write("""To use this program, there is one mandatory option,
-i (or --id) to set the ID of this instance. It must be unique among the swarm.
There is one mandatory argument (at least one must be given), the peer information.
Each peer is to be entered as n,X[:Y] where n is the ID of the peer, X, its IP
address and Y the facultative port. If X is an IPv6 address, it must be entered
between brackets. If the port is ommitted, it defaults to %i.
Three examples of legal peer information are 33,[2001:db8:1::dead:babe] (peer
ID 33, IPv6 address, default port), 629,192.0.2.1:8080 (peer ID 629, IPv4 address,
port 8080) and 9231,[2001:DB8:99::bad:dcaf]:40000 (peer ID 9231, IPv6 address, port
40000).
The principal facultative option is probably -p which allows you to listen on
a different port than the default one (%i).
Complete example of usage, with three peers:
python gossiper.py -i 2 -p 4242 1,\[::1\] 3,1.2.3.4 5,\[::1\]:6666
\n""" % \
(DEFAULT_PORT, DEFAULT_PORT))
if short:
sys.stdout.write("Use --long-help to obtain help on all the options.\n")
def long_help():
help(short=False)
sys.stdout.write("""The complete list of possible options is:
* -h or --help: get help
* -p N or --port=N: sets the listening port
* -i N or --id=N: sets the ID of this instance
* -d N or --delay=N: sets the artifical delay we use before connecting
to a peer (to makes things more realistic). The default
is %i seconds.
* -r N or -- retry=N: sets the delay we wait when a connection fails, before
retrying. the default is %i seconds.
* -t N or --timestamp=N: sets the delay between two timestamps log messages.
The default is %i seconds.
\n""" % (DEFAULT_SIMULATION_DELAY, DEFAULT_RETRY_DELAY, DEFAULT_TIMESTAMP_DELAY))
class Entity:
""" Models a peer on the network """
def __init__(self, family, address, port, id):
self.family = family
# TODO: allows a list of (address, port), not just one
self.address = address
self.port = port
self.id = id
class InvalidPeerID(Exception):
pass
class InvalidAddress(Exception):
pass
class InvalidPort(Exception):
pass
def parse(str):
str = str.strip()
try:
(id, loc) = str.split(",")
id = int(id)
except ValueError:
raise InvalidPeerID
# TODO: raw IPv6 addresses without brackets
if loc[0] == '[':
match = re.search("^\[([0-9A-Za-z:]+)\](:(.*))?$", loc)
if not match:
raise InvalidAddress
address = match.group(1)
port = match.group(3)
if port is None:
port = DEFAULT_PORT
try:
binary_address = socket.inet_pton(socket.AF_INET6, address)
port = int(port)
except socket.error:
raise InvalidAddress
except ValueError:
raise InvalidPort
return Entity(socket.AF_INET6,
address,
port,
id)
else:
if loc.find(':') >= 0:
(address, port) = loc.split(':')
else:
address = loc
port = DEFAULT_PORT
try:
binary_address = socket.inet_pton(socket.AF_INET, address)
port = int(port)
except socket.error:
raise InvalidAddress
except ValueError:
raise InvalidPort
return Entity(socket.AF_INET,
address,
port,
id)
def pretty(server):
if server.family == socket.AF_INET6:
return ("[%s]:%i" % (server.address,
server.port))
elif server.family == socket.AF_INET:
return ("%s:%i" % (server.address,
server.port))
else:
return ("Unknown address family %i" % server.family)
class Timestamper(threading.Thread):
def __init__(self, delay=DEFAULT_TIMESTAMP_DELAY):
self.delay = delay
threading.Thread.__init__(self)
def run(self):
while True:
time.sleep(self.delay)
log("Time stamp")
class Sender(threading.Thread):
def __init__(self, peer, channel, simulation_delay=DEFAULT_SIMULATION_DELAY,
retry_delay=DEFAULT_RETRY_DELAY):
self.peer = peer
self.channel = channel
self.history = {}
self.simulation_delay = simulation_delay
self.retry_delay = retry_delay
threading.Thread.__init__(self)
def run(self):
while True:
(myid, itsid, msg) = self.channel.get()
if msg in self.history:
continue
# This is to simulate network delays
time.sleep(generator.randint(1, self.simulation_delay))
log("Sender task %s received \"%s\" from %i, connecting to %i (%s)" % \
(self.getName(), msg, itsid, self.peer.id, pretty(self.peer)))
done = False
attempts = 0
while not done:
try:
attempts += 1
self.s = socket.socket(self.peer.family, socket.SOCK_STREAM)
self.s.connect((self.peer.address, self.peer.port))
outf = self.s.makefile('w')
inf = self.s.makefile('r')
outf.write("%i,%s\n" % (myid, msg))
outf.close()
# TODO: read the peer ID and check it to be sure it is
# the one we wanted to talk with
inf.close()
self.s.shutdown(socket.SHUT_RDWR)
self.history[msg] = True
done = True
except socket.error, error_msg:
log("Cannot connect to %s: %s" % (pretty(self.peer), error_msg))
if attempts > DEFAULT_RETRY_ATTEMPTS:
break
self.s.close()
time.sleep(generator.randint(self.retry_delay/2,self.retry_delay))
class RequestHandler(SocketServer.StreamRequestHandler):
def handle(self):
result = ""
data = "DUMMY"
size = 0
self.wfile.write("%i,\n" % self.server.id)
while data != "":
data = self.rfile.read(1)
if data == "\n" or data == "":
break
size = size + len(data)
result = result + data
try:
(peer_id, message) = result.split(',')
peer_id = int(peer_id)
if message not in self.server.history:
self.server.history[message] = True
log("NEW message received from peer %i: \"%s...\"" % \
(peer_id, message[:DEFAULT_MAX_DISPLAY]),
self.client_address, size)
for mysender in mysenders.keys():
if mysender != peer_id:
mysenders[mysender].channel.put((self.server.id,
peer_id, message))
else:
log("Ignoring known message from peer %i: \"%s...\"" % \
(peer_id, message[:DEFAULT_MAX_DISPLAY]),
self.client_address, size)
except ValueError: # Not a well-formatted message
log("Ignoring badly formatted message \"%s...\"" % \
message[:DEFAULT_MAX_DISPLAY], self.client_address, size)
# TODO: two servers, for IPv4 and IPv6?
class Server(SocketServer.ThreadingMixIn, SocketServer.TCPServer):
def __init__(self, address, handler, id, num):
self.id = id
self.history = {}
SocketServer.TCPServer.__init__(self, address, handler)
log("Starting server %i at address %s, %i peers...\n" % \
(id, address, num))
class Watcher:
"""
http://code.activestate.com/recipes/496735/
This class solves two problems with multithreaded
programs in Python, (1) a signal might be delivered
to any thread (which is just a malfeature) and (2) if
the thread that gets the signal is waiting, the signal
is ignored (which is a bug).
The watcher is a concurrent process (not thread) that
waits for a signal and the process that contains the
threads. See Appendix A of The Little Book of Semaphores.
http://greenteapress.com/semaphores/
I have only tested this on Linux. I would expect it to
work on the Macintosh and not work on Windows.
"""
def __init__(self):
""" Creates a child thread, which returns. The parent
thread waits for a KeyboardInterrupt and then kills
the child thread.
"""
self.child = os.fork()
if self.child == 0:
return
else:
self.watch()
def watch(self):
try:
os.wait()
except KeyboardInterrupt:
log("KeyBoardInterrupt")
self.kill()
sys.exit()
def kill(self):
try:
os.kill(self.child, signal.SIGKILL)
except OSError: pass
port = DEFAULT_PORT
myid = None
timestamp_delay = DEFAULT_TIMESTAMP_DELAY
simulation_delay = DEFAULT_SIMULATION_DELAY
retry_delay = DEFAULT_RETRY_DELAY
try:
optlist, args = getopt.getopt (sys.argv[1:], "p:i:t:d:r:h",
["port=", "id=", "delay=", "retry=",
"timestamp=", "help", "long-help"])
for option, value in optlist:
if option == "--help" or option == "-h":
help(short=True)
sys.exit(0)
elif option == "--long-help":
long_help()
sys.exit(0)
elif option == "--port" or option == "-p":
port = int(value) # TODO: handle the possible conversion exception
# to provide a better error message?
elif option == "--retry" or option == "-r":
retry_delay = int(value) # TODO: handle the possible conversion exception
# to provide a better error message?
elif option == "--delay" or option == "-d":
simulation_delay = int(value) # TODO: handle the possible conversion exception
# to provide a better error message?
elif option == "--timestamp" or option == "-t":
timestamp_delay = int(value) # TODO: handle the possible conversion exception
# to provide a better error message?
elif option == "--id" or option == "-i":
myid = int(value) # TODO: handle the possible conversion exception
# to provide a better error message?
else:
# Should never occur, it is trapped by getopt
usage("Unhandled option %s" % option)
sys.exit(1)
except getopt.error, reason:
usage(reason)
sys.exit(1)
if len(args) == 0:
usage("Not enough peers indicated")
sys.exit(1)
if myid is None:
usage("No ID indicated")
sys.exit(1)
peers = []
mysenders = {}
for arg in args:
try:
peer = parse(arg)
peers.append(peer)
except InvalidPeerID:
sys.stderr.write("No peer ID in %s (must precede the IP address and a comma)\n" % arg)
sys.exit(1)
except InvalidAddress:
sys.stderr.write("No legal IP address in %s\n" % arg)
sys.exit(1)
except InvalidPort:
sys.stderr.write("No legal port in %s\n" % arg)
sys.exit(1)
generator = random.Random()
Watcher()
for peer in peers:
channel = Queue.Queue()
mysenders[peer.id] = Sender(peer, channel, simulation_delay, retry_delay)
mysenders[peer.id].start()
Server.allow_reuse_address = True
# To have as many IP addresses as we want, we use IPV6. Depending on
# the system used and on options like Linux's sys.net.ipv6.bindv6only,
# the use of AF_INET6 may or may not allow also IPv4 connections.
Server.address_family = socket.AF_INET6
# TODO: an option to bind on a specific address
myserver = Server(("", port), RequestHandler, myid, len(mysenders))
stamper = Timestamper(timestamp_delay)
stamper.start()
run_server = threading.Thread(target=myserver.serve_forever)
run_server.start()