1"""Priority queue class with updatable priorities."""
2
3import heapq
4
5__all__ = ["MappedQueue"]
6
7
8class _HeapElement:
9 """This proxy class separates the heap element from its priority.
10
11 The idea is that using a 2-tuple (priority, element) works
12 for sorting, but not for dict lookup because priorities are
13 often floating point values so round-off can mess up equality.
14
15 So, we need inequalities to look at the priority (for sorting)
16 and equality (and hash) to look at the element to enable
17 updates to the priority.
18
19 Unfortunately, this class can be tricky to work with if you forget that
20 `__lt__` compares the priority while `__eq__` compares the element.
21 In `greedy_modularity_communities()` the following code is
22 used to check that two _HeapElements differ in either element or priority:
23
24 if d_oldmax != row_max or d_oldmax.priority != row_max.priority:
25
26 If the priorities are the same, this implementation uses the element
27 as a tiebreaker. This provides compatibility with older systems that
28 use tuples to combine priority and elements.
29 """
30
31 __slots__ = ["priority", "element", "_hash"]
32
33 def __init__(self, priority, element):
34 self.priority = priority
35 self.element = element
36 self._hash = hash(element)
37
38 def __lt__(self, other):
39 try:
40 other_priority = other.priority
41 except AttributeError:
42 return self.priority < other
43 # assume comparing to another _HeapElement
44 if self.priority == other_priority:
45 try:
46 return self.element < other.element
47 except TypeError as err:
48 raise TypeError(
49 "Consider using a tuple, with a priority value that can be compared."
50 )
51 return self.priority < other_priority
52
53 def __gt__(self, other):
54 try:
55 other_priority = other.priority
56 except AttributeError:
57 return self.priority > other
58 # assume comparing to another _HeapElement
59 if self.priority == other_priority:
60 try:
61 return self.element > other.element
62 except TypeError as err:
63 raise TypeError(
64 "Consider using a tuple, with a priority value that can be compared."
65 )
66 return self.priority > other_priority
67
68 def __eq__(self, other):
69 try:
70 return self.element == other.element
71 except AttributeError:
72 return self.element == other
73
74 def __hash__(self):
75 return self._hash
76
77 def __getitem__(self, indx):
78 return self.priority if indx == 0 else self.element[indx - 1]
79
80 def __iter__(self):
81 yield self.priority
82 try:
83 yield from self.element
84 except TypeError:
85 yield self.element
86
87 def __repr__(self):
88 return f"_HeapElement({self.priority}, {self.element})"
89
90
91class MappedQueue:
92 """The MappedQueue class implements a min-heap with removal and update-priority.
93
94 The min heap uses heapq as well as custom written _siftup and _siftdown
95 methods to allow the heap positions to be tracked by an additional dict
96 keyed by element to position. The smallest element can be popped in O(1) time,
97 new elements can be pushed in O(log n) time, and any element can be removed
98 or updated in O(log n) time. The queue cannot contain duplicate elements
99 and an attempt to push an element already in the queue will have no effect.
100
101 MappedQueue complements the heapq package from the python standard
102 library. While MappedQueue is designed for maximum compatibility with
103 heapq, it adds element removal, lookup, and priority update.
104
105 Parameters
106 ----------
107 data : dict or iterable
108
109 Examples
110 --------
111
112 A `MappedQueue` can be created empty, or optionally, given a dictionary
113 of initial elements and priorities. The methods `push`, `pop`,
114 `remove`, and `update` operate on the queue.
115
116 >>> colors_nm = {"red": 665, "blue": 470, "green": 550}
117 >>> q = MappedQueue(colors_nm)
118 >>> q.remove("red")
119 >>> q.update("green", "violet", 400)
120 >>> q.push("indigo", 425)
121 True
122 >>> [q.pop().element for i in range(len(q.heap))]
123 ['violet', 'indigo', 'blue']
124
125 A `MappedQueue` can also be initialized with a list or other iterable. The priority is assumed
126 to be the sort order of the items in the list.
127
128 >>> q = MappedQueue([916, 50, 4609, 493, 237])
129 >>> q.remove(493)
130 >>> q.update(237, 1117)
131 >>> [q.pop() for i in range(len(q.heap))]
132 [50, 916, 1117, 4609]
133
134 An exception is raised if the elements are not comparable.
135
136 >>> q = MappedQueue([100, "a"])
137 Traceback (most recent call last):
138 ...
139 TypeError: '<' not supported between instances of 'int' and 'str'
140
141 To avoid the exception, use a dictionary to assign priorities to the elements.
142
143 >>> q = MappedQueue({100: 0, "a": 1})
144
145 References
146 ----------
147 .. [1] Cormen, T. H., Leiserson, C. E., Rivest, R. L., & Stein, C. (2001).
148 Introduction to algorithms second edition.
149 .. [2] Knuth, D. E. (1997). The art of computer programming (Vol. 3).
150 Pearson Education.
151 """
152
153 def __init__(self, data=None):
154 """Priority queue class with updatable priorities."""
155 if data is None:
156 self.heap = []
157 elif isinstance(data, dict):
158 self.heap = [_HeapElement(v, k) for k, v in data.items()]
159 else:
160 self.heap = list(data)
161 self.position = {}
162 self._heapify()
163
164 def _heapify(self):
165 """Restore heap invariant and recalculate map."""
166 heapq.heapify(self.heap)
167 self.position = {elt: pos for pos, elt in enumerate(self.heap)}
168 if len(self.heap) != len(self.position):
169 raise AssertionError("Heap contains duplicate elements")
170
171 def __len__(self):
172 return len(self.heap)
173
174 def push(self, elt, priority=None):
175 """Add an element to the queue."""
176 if priority is not None:
177 elt = _HeapElement(priority, elt)
178 # If element is already in queue, do nothing
179 if elt in self.position:
180 return False
181 # Add element to heap and dict
182 pos = len(self.heap)
183 self.heap.append(elt)
184 self.position[elt] = pos
185 # Restore invariant by sifting down
186 self._siftdown(0, pos)
187 return True
188
189 def pop(self):
190 """Remove and return the smallest element in the queue."""
191 # Remove smallest element
192 elt = self.heap[0]
193 del self.position[elt]
194 # If elt is last item, remove and return
195 if len(self.heap) == 1:
196 self.heap.pop()
197 return elt
198 # Replace root with last element
199 last = self.heap.pop()
200 self.heap[0] = last
201 self.position[last] = 0
202 # Restore invariant by sifting up
203 self._siftup(0)
204 # Return smallest element
205 return elt
206
207 def update(self, elt, new, priority=None):
208 """Replace an element in the queue with a new one."""
209 if priority is not None:
210 new = _HeapElement(priority, new)
211 # Replace
212 pos = self.position[elt]
213 self.heap[pos] = new
214 del self.position[elt]
215 self.position[new] = pos
216 # Restore invariant by sifting up
217 self._siftup(pos)
218
219 def remove(self, elt):
220 """Remove an element from the queue."""
221 # Find and remove element
222 try:
223 pos = self.position[elt]
224 del self.position[elt]
225 except KeyError:
226 # Not in queue
227 raise
228 # If elt is last item, remove and return
229 if pos == len(self.heap) - 1:
230 self.heap.pop()
231 return
232 # Replace elt with last element
233 last = self.heap.pop()
234 self.heap[pos] = last
235 self.position[last] = pos
236 # Restore invariant by sifting up
237 self._siftup(pos)
238
239 def _siftup(self, pos):
240 """Move smaller child up until hitting a leaf.
241
242 Built to mimic code for heapq._siftup
243 only updating position dict too.
244 """
245 heap, position = self.heap, self.position
246 end_pos = len(heap)
247 startpos = pos
248 newitem = heap[pos]
249 # Shift up the smaller child until hitting a leaf
250 child_pos = (pos << 1) + 1 # start with leftmost child position
251 while child_pos < end_pos:
252 # Set child_pos to index of smaller child.
253 child = heap[child_pos]
254 right_pos = child_pos + 1
255 if right_pos < end_pos:
256 right = heap[right_pos]
257 if not child < right:
258 child = right
259 child_pos = right_pos
260 # Move the smaller child up.
261 heap[pos] = child
262 position[child] = pos
263 pos = child_pos
264 child_pos = (pos << 1) + 1
265 # pos is a leaf position. Put newitem there, and bubble it up
266 # to its final resting place (by sifting its parents down).
267 while pos > 0:
268 parent_pos = (pos - 1) >> 1
269 parent = heap[parent_pos]
270 if not newitem < parent:
271 break
272 heap[pos] = parent
273 position[parent] = pos
274 pos = parent_pos
275 heap[pos] = newitem
276 position[newitem] = pos
277
278 def _siftdown(self, start_pos, pos):
279 """Restore invariant. keep swapping with parent until smaller.
280
281 Built to mimic code for heapq._siftdown
282 only updating position dict too.
283 """
284 heap, position = self.heap, self.position
285 newitem = heap[pos]
286 # Follow the path to the root, moving parents down until finding a place
287 # newitem fits.
288 while pos > start_pos:
289 parent_pos = (pos - 1) >> 1
290 parent = heap[parent_pos]
291 if not newitem < parent:
292 break
293 heap[pos] = parent
294 position[parent] = pos
295 pos = parent_pos
296 heap[pos] = newitem
297 position[newitem] = pos