cledoux

8/27/2015 - 9:12 PM

A directed graph with timestamps on edges.

```
#!/usr/bin/env python
"""
Copyright (c) 2015 Charles LeDoux
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import pickle
from collections import defaultdict
from datetime import datetime
from functools import reduce
strptime = datetime.strptime
# Have to use a module level function for initialize the edges datastructure
# of TimeStampedDiGraph. This allows the class to be pickled.
def _init_internal_edges_dict():
return defaultdict(set)
class TimeStampedDiGraph(object):
""" A directed graph with timestamps on the edges.
Definitions
-----------
G is a graph
G = <V,E>
V = {v_i | v_i is a hashable object}
E = {e_i | <v_a, v_b, timestamp_i>; v_a, v_b \in V}
t_i = timestamp_i
"""
def __init__(self):
""" Initialize an empty graph.
Internal storage of the graph is a nested dict.
First level of dict is essentially adjacency list.
Second level stores the set of timestamps a given edge contains.
Example
-------
Given graph containing only edge <v_1, v_2, t_1>, dict is:
{ v_1: {
v_2: set(t_1)}}
Adding edge <v_1, v_2, t_2> results in dict:
{v_1:{
v_2: set(t_1, t_2)}}
Adding edge <v_1, v_3, t_3> results in:
{v_1: {
v_2: set(t_1, t_2),
v_3: set(t_3)}}
Adding edge <v_4, v_1, t_4> results in:
{v_1: {
v_2: set(t_1, t_2),
v_3: set(t_3)},
v_4: {
v_1: set(t_3)}}
"""
# Edges data structure
# Store both forward and backward so that we can easily traverse in both directions.
# Access to edges should be via _edges, not these datastructures.
self._forward_edges = defaultdict(_init_internal_edges_dict)
self._backward_edges = defaultdict(_init_internal_edges_dict)
self._edges = self._forward_edges
def __eq__(self, other):
return self._edges == other._edges
def add_edge(self, from_vertex, to_vertex, timestamp,
timestamp_format="%Y-%m-%d %H:%M:%S"):
""" Create the edge in the graph.
Will add the vertices to the set of vertices.
This is the only way to add a vertex to the graph. We do not support
adding disconnected vertices.
Parameters
----------
from_vertex: hashable
to_vertex: hashable
timestamp: str or datetime.datetime If str, will convert to datetime
timestamp_format: str Format string used to convert timestamp if str
"""
# Assume timestamp is a string.
try:
timestamp = strptime(timestamp, timestamp_format)
except TypeError:
# Make sure actually datetime and can ignore.
if type(timestamp) == datetime:
pass
# Exception was not for expected reason, reraise
else:
raise
# By this point, we should always have a datetime.
assert isinstance(timestamp, datetime)
self._forward_edges[from_vertex][to_vertex].add(timestamp)
self._backward_edges[to_vertex][from_vertex].add(timestamp)
def remove_edge(self, from_vertex, to_vertex, timestamp,
timestamp_format="%Y-%m-%d %H:%M:%S"):
# Assume timestamp is a string.
try:
timestamp = strptime(timestamp, timestamp_format)
except TypeError:
# Make sure actually datetime and can ignore.
if type(timestamp) == datetime:
pass
# Exception was not for expected reason, reraise
else:
raise
# By this point, we should always have a datetime.
assert isinstance(timestamp, datetime)
# Remove timestamps from sets.
self._forward_edges[from_vertex][to_vertex].remove(timestamp)
self._backward_edges[to_vertex][from_vertex].remove(timestamp)
# Remove empty structures.
if not len(self._forward_edges[from_vertex][to_vertex]):
del self._forward_edges[from_vertex][to_vertex]
if not len(self._forward_edges[from_vertex]):
del self._forward_edges[from_vertex]
if not len(self._backward_edges[to_vertex][from_vertex]):
del self._backward_edges[to_vertex][from_vertex]
if not len(self._backward_edges[from_vertex]):
del self._backward_edges[from_vertex]
def edges(self, reverse=False):
""" Iterate over edges.
Similar in concept to dict.keys(), dict.values(), etc.
Yields edge tuples as described in class docstring.
:param reverse: Yields edges in reversed direction, e.g. if edge (a, b) is in graph, yield (b, a)
"""
if reverse:
edges_dict = self._backward_edges
else:
edges_dict = self._edges
# The nested for loops are much easier to read than nested
# generator comprehension
# This is O(|E|)
for from_vertex in edges_dict.keys():
for to_vertex in edges_dict[from_vertex].keys():
for timestamp in edges_dict[from_vertex][to_vertex]:
yield (from_vertex, to_vertex, timestamp)
def is_connected(self, from_vertex, to_vertex,
start=datetime.min, end=datetime.max):
""" Is from_vertex connected to to_vertex?
Formally:
Given v_a,v_b \in V and start, end that are timestamps.
is_connected(v_a, v_b, start, end) -> True iff
\exists [e_i | e_i \in E; 1 \leq i \leq n] s.t.
e_i = (v_{i-1}, v_i, t_i); e_1 = (v_a, v_1, t_1);
e_n = (v_{n-1}, v_b, t_n); t_1 \leq start; t_n \leq end;
\forall i, t_{i-1} \leq t_i.
Note:
Yes, is_connected is implemented using descendants() despite the fact that descendants() is formally defined
in terms of is_connected. Isn't computer science fun?
"""
# Type checking on timestamps
assert isinstance(start, datetime)
assert isinstance(end, datetime)
# If start > end, we have an empty time window.
# Return False instead of error so I can easily check connection in both directions.
if not start <= end:
return False
# Add children to the stack.
for child, timestamp in self.descendants(from_vertex, start, end):
if child == to_vertex:
return True
# We've iterated the whole graph, return false.
return False
def _get_next_verticies(self, current_vertex, start=datetime.min, end=datetime.max, reverse=False):
""" Get a vertex immediate children or parents.
If reverse, get parents. If not reverse, get children.
Start and end define a time window in which we will look for the next vertices. Edges outside this window
will not be considered.
"""
# Type checking on timestamps
assert isinstance(start, datetime)
assert isinstance(end, datetime)
# Sanity check timestamps
assert start <= end
# If reverse, treat parents as children
if reverse:
edges_dict = self._backward_edges
else:
edges_dict = self._edges
# edges_dict is a default dict, so no need for catching KeyError
children = edges_dict[current_vertex]
for child in children.keys():
# Filter to only timestamps in the proper range.
filtered = filter(lambda timestamp: start <= timestamp <= end, children[child])
# I'm wary of min's behavior of throwing ValueError when given an
# empty list, so I'm doing the unforgivable and asking for
# permission.
n = next(filtered, None)
if n is not None:
yield child, reduce(min, filtered, n)
def children(self, parent, start=datetime.min, end=datetime.max):
""" Return immediate children of parents.
Does not recurse on children. Use descendants() if recursion is desired behavior.
Yields (vertex, timestamp) where \exists (parent, vertex, timestamp) \in E
s.t. start <= timestamp <= end
timestamp will be the smallest timestamp that satisfies the inequality.
Time Complexity:
\Theta(# Children) Retrieving a single child is constant time,
so retrieving all children is O(1) * # children
"""
yield from self._get_next_verticies(parent, start, end, reverse=False)
def parents(self, child, start=datetime.min, end=datetime.max):
""" Return immediate parents of parents.
Does not recurse on parents. Use ancestors() if recursion is desired behavior.
Yields (vertex, timestamp) where \exists (parent, vertex, timestamp) \in E
s.t. start <= timestamp <= end
timestamp will be the smallest timestamp that satisfies the inequality.
Time Complexity:
\Theta(# Parents)
"""
yield from self._get_next_verticies(child, start, end, reverse=True)
def descendants(self, parent, start=datetime.min, end=datetime.max, _visited=None):
""" Yield all children recursively.
A child is a descendant and any child of a descendant is also a descendant.
Yields {(vertex, timestamp) | is_connected(parent, vertex); timestamp smallest possible timestamp}
Time Complexity:
O(_depth_first_search(..., reversed=True))
Note:
Yes, is_connected is implemented using this function despite the fact that the formal definition of this
function is in terms of is_connected. Isn't computer science fun?
"""
for child, timestamp in self._depth_first_search(parent, start, end):
yield child, timestamp
def ancestors(self, child, start=datetime.min, end=datetime.max):
""" Yield all parents recursively
A direct parent is an ancestor and a parent of an ancestor is also a ancestor.
Yields {(vertex, timestamp) | is_connected(vertex, child); timestamp smallest possible}
"""
for child, timestamp in self._depth_first_search(child, start, end, reversed=True):
yield child, timestamp
def _depth_first_search(self, start_vertex, start=datetime.min, end=datetime.max, reversed=False, _visited=None):
""" Perform a depth first search yielding (vertex, timestamp) tuples in preprocessing order.
Arguments:
:param start_vertex: Vertex to start from
:param start: Start of timestamp range to consider
:param end: End of timestamp range to consider
:param reversed: bool Perform the search by traversing parents instead of children
:param _visited: Internal variable, do not use.
Yields:
:return: Yields (vertex, timestamp) tuples in preprocessing order
Time Complexity:
O(|V|) if graph fully connected.
"""
# Type checking on timestamps
assert isinstance(start, datetime)
assert isinstance(end, datetime)
# Sanity check timestamps
assert start <= end
# Deals with default parameters only being set once.
if _visited is None:
_visited = set()
_visited.add(start_vertex)
# Choose the correct getter function based on whether we are traversing children or parents
if reversed:
getter = self.parents
else:
getter = self.children
# DFS with preprocessing
for next_vertex, timestamp in getter(start_vertex, start, end):
if next_vertex in _visited:
continue
else:
_visited.add(next_vertex)
yield next_vertex, timestamp
# Python passes references, so edits to _visited in recursive call are seen here
if reversed:
yield from self._depth_first_search(next_vertex, start, timestamp, reversed, _visited)
else:
yield from self._depth_first_search(next_vertex, timestamp, end, reversed, _visited)
class CacheableTimeStampedDiGraph(TimeStampedDiGraph):
""" Social graph between identities """
def save_cache(self, cache_location):
""" Save to a cache
:param cache_location: str or File. If file object, must be opened with mode 'wb'
"""
if isinstance(cache_location, str):
cache_location = open(cache_location, 'wb')
# Pickle requires binary mode
assert cache_location.mode in {'wb', 'wb+', 'rb+'}
pickle.dump(self, cache_location)
@staticmethod
def load_cache(cache_location):
if isinstance(cache_location, str):
cache_location = open(cache_location, 'rb')
# Pickle requires binary mode
assert cache_location.mode in ['rb', 'wb+', 'rb+']
return pickle.load(cache_location)
if __name__ == '__main__':
import sys
sys.stderr.write("This is not a runnable script\n")
sys.exit(1)
```