USGS Earthquake Data - https://www.google.com/fusiontables/DataSource?snapid=S327323orMC
from datetime import datetime
import csv
import math
import sys
import time
class Record(object):
EARTH_RADIUS_IN_KM = 6371
# cheney
START_COORDINATE = (47.4875, 117.5747)
date = None
coordinate = None
magnitude = None
depth = None
source = None
impact = None
distance = None
def __init__(self):
super(Record, self).__init__()
# def __getitem__(self, attr):
# return self[attr]
@staticmethod
def calculate_impact(record):
if not isinstance(record, Record):
raise TypeError("Invalid type '%s'" % type(record))
return max(0, (1000 - record.depth) * record.magnitude)
@staticmethod
def convert_date(elements):
# pad zeros for time (hhmmss.mm)
elements[3] = elements[3].zfill(6)
date_str = ",".join(elements[0:4])
formatter = Record.get_date_format(date_str)
date_object = time.strptime(date_str, formatter)
return datetime.fromtimestamp(time.mktime(date_object))
@staticmethod
def get_date_format(s):
if "." in s:
return "%Y,%m,%d,%H%M%S.%f"
return "%Y,%m,%d,%H%M%S"
@staticmethod
def get_distance(start_coordinate, end_coordinate):
lat1, long1 = map(math.radians, start_coordinate)
lat2, long2 = map(math.radians, end_coordinate)
dlat = lat2 - lat1
dlong = long2 - long1
a = math.sin(dlat / 2) ** 2 + \
math.cos(lat1) * math.cos(lat2) * \
math.sin(dlong / 2) ** 2
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
return Record.EARTH_RADIUS_IN_KM * c
@staticmethod
def load(filename, limit=None, progress=2500):
all_magnitudes = []
data = []
sys.stdout.write("Loading file")
with open(filename, "r") as f:
# skip headers
f.readline()
for index, line in enumerate(csv.reader(f)):
if limit is not None and index > limit:
break
if index % progress == 0:
sys.stdout.write(".")
sys.stdout.flush()
try:
record = Record.parse(line)
if record:
data.append(record)
all_magnitudes.append(record.magnitude)
except:
pass
sys.stdout.write("\n")
return (data, all_magnitudes)
@staticmethod
def parse(elements):
if len(elements) != 10:
raise Exception("Invalid number of elements")
record = Record()
record.date = Record.convert_date(elements)
record.coordinate = tuple(map(float, elements[4:6]))
# coordinates as a string
# elements[6]
record.magnitude = float(elements[7])
record.depth = int(elements[8]) if elements[8] else 0
record.source = elements[9]
record.impact = Record.calculate_impact(record)
record.distance = Record.get_distance(Record.START_COORDINATE,
record.coordinate)
return record
import math
class Statistics(object):
@staticmethod
def statistics(elements):
result = (Statistics.mean(elements), Statistics.median(elements), Statistics.std_dev(elements))
print("""
mean: %.2f
median: %.2f
std. dev: %.2f\n""" % result)
@staticmethod
def mean(elements):
return sum(elements) / len(elements)
@staticmethod
def median(elements):
elements = sorted(elements)
midpoint = int((1 + (len(elements) - 1)) / 2)
if len(elements) % 2 == 0:
return (elements[midpoint] + elements[midpoint - 1]) / 2.0
return elements[midpoint / 2]
@staticmethod
def std_dev(elements):
diff = 0.0
average = Statistics.mean(elements)
for x in elements:
diff += (x - average) ** 2
return math.sqrt(diff / len(elements))
#!/usr/bin/env python
import os
import sys
import operator
from Record import Record
from Statistics import Statistics
class Application(object):
data = None
all_magnitudes = None
"""docstring for Application"""
def __init__(self, filename, limit=None, progress=2500):
super(Application, self).__init__()
self.menu = Menu()
self.data, self.all_magnitudes = Record.load(filename, limit, progress)
def select_option(self):
while True:
choice = raw_input(self.menu.display)
if choice:
item = self.menu.get(choice.upper())
if item:
item.comparator(self.data if choice != "4" else self.all_magnitudes, item)
class Menu(object):
class Item(object):
"""docstring for Item"""
def __init__(self, option, title, comparator=None, renderer=None):
super(Menu.Item, self).__init__()
self.option = option
self.title = title
self.comparator = comparator
self.renderer = renderer
def render(self):
return " %s. %s" % (self.option, self.title)
"""docstring for Menu"""
def __init__(self):
super(Menu, self).__init__()
items = [
self.Item("1", "Top 100 Magnitude Earthquakes (date, magnitude, depth)",
lambda data, item: Menu.sort("magnitude", data, item, 100),
lambda x: "%s\t%.2f\t%d" % (str(x.date), x.magnitude, x.depth)),
self.Item("2", "Top 50 Shallowest Earthquakes (date, magnitude, depth)",
lambda data, item: Menu.sort("depth", data, item, 50, True),
lambda x: "%s\t%.2f\t%d" % (str(x.date), x.magnitude, x.depth)),
self.Item("3", "Top 50 Earthquakes closest to Cheney, WA (date, coordinates, distance from Cheney)",
lambda data, item: Menu.sort("distance", data, item, 50, True),
lambda x: "%s\t%s\t%.2f" % (str(x.date), x.coordinate, x.distance)),
self.Item("4", "Mean, Midpoint, Median and Standard Deviation of Earthquake Magnitude",
lambda data, item: Statistics.statistics(data),
None),
self.Item("5", "Top 50 Earthquakes by Impact (date, magnitude, depth, impact)",
lambda data, item: Menu.sort("impact", data, item, 50),
lambda x: "%s\t%.2f\t%d\t%.2f" % (str(x.date), x.magnitude, x.depth, x.impact)),
self.Item("Q", "Exit",
lambda x, y: sys.exit(0))
]
self.options = dict((x.option, x) for x in items)
self.display = "\n".join([x.render() for x in items]) + "\n\nSelect an option: "
def get(self, option):
return self.options.get(option)
def display(self):
return self.display
# todo: separate
@staticmethod
def sort(key_name, data, item, count=50, descending=False):
print(item.title)
if not item.renderer:
return
direction = False if descending else True
key_name = key_name.lower()
data.sort(key=operator.attrgetter(key_name), reverse=direction)
for x in data[:count]:
print(item.renderer(x))
# return data[:count]
def main():
app = Application("eqdata.csv", progress=20000)
os.system("cls" if os.name == "nt" else "clear")
app.select_option()
if __name__ == '__main__':
main()