Python priority queue, inherits from collections.MutableMapping, and allows adding and removing items.
import collections
"""
This is based on HeapDict 1.0.0, but includes a few modifications.
https://pypi.org/project/HeapDict/
"""
def doc(s):
if hasattr(s, '__call__'):
s = s.__doc__
def f(g):
g.__doc__ = s
return g
return f
class heapdict(collections.MutableMapping):
__marker = object()
@staticmethod
def _parent(i):
return ((i - 1) >> 1)
@staticmethod
def _left(i):
return ((i << 1) + 1)
@staticmethod
def _right(i):
return ((i + 1) << 1)
def __init__(self, *args, **kw):
self.heap = []
self.d = {}
self.update(*args, **kw)
@doc(dict.clear)
def clear(self):
self.heap.clear()
self.d.clear()
@doc(dict.__setitem__)
def __setitem__(self, key, value):
if key in self.d:
self.pop(key)
wrapper = [value, key, len(self)]
self.d[key] = wrapper
self.heap.append(wrapper)
self._decrease_key(len(self.heap) - 1)
def _min_heapify(self, i):
l = self._left(i)
r = self._right(i)
n = len(self.heap)
if l < n and self.heap[l][0] < self.heap[i][0]:
low = l
else:
low = i
if r < n and self.heap[r][0] < self.heap[low][0]:
low = r
if low != i:
self._swap(i, low)
self._min_heapify(low)
def _decrease_key(self, i):
while i:
parent = self._parent(i)
if self.heap[parent][0] < self.heap[i][0]: break
self._swap(i, parent)
i = parent
def _swap(self, i, j):
self.heap[i], self.heap[j] = self.heap[j], self.heap[i]
self.heap[i][2] = i
self.heap[j][2] = j
@doc(dict.__delitem__)
def __delitem__(self, key):
wrapper = self.d[key]
while wrapper[2]:
parentpos = self._parent(wrapper[2])
parent = self.heap[parentpos]
self._swap(wrapper[2], parent[2])
self.popitem()
@doc(dict.__getitem__)
def __getitem__(self, key):
return self.d[key][0]
@doc(dict.__iter__)
def __iter__(self):
return iter(self.d)
def popitem(self, default=None):
"""
D.popitem() -> (k, v)
removes and returns the (key, value) pair with lowest value
returns `default` value if the heap is empty
"""
if len(self.heap) == 0:
return default
wrapper = self.heap[0]
if len(self.heap) == 1:
self.heap.pop()
else:
self.heap[0] = self.heap.pop(-1)
self.heap[0][2] = 0
self._min_heapify(0)
del self.d[wrapper[1]]
return wrapper[1], wrapper[0]
@doc(dict.__len__)
def __len__(self):
return len(self.d)
def peekitem(self, default=None):
"""
D.peekitem() -> (k, v)
returns the (key, value) pair with lowest value
returns `default` value if the heap is empty
"""
if len(self.heap) == 0:
return default
return self.heap[0][1], self.heap[0][0]
def empty(self) -> bool:
"""
:return: whether the heap is empty
"""
return len(self.heap) == 0
del doc
__all__ = ['heapdict']