Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.9/dist-packages/networkx/utils/mapped_queue.py: 28%
146 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-20 07:00 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-10-20 07:00 +0000
1"""Priority queue class with updatable priorities.
2"""
4import heapq
6__all__ = ["MappedQueue"]
9class _HeapElement:
10 """This proxy class separates the heap element from its priority.
12 The idea is that using a 2-tuple (priority, element) works
13 for sorting, but not for dict lookup because priorities are
14 often floating point values so round-off can mess up equality.
16 So, we need inequalities to look at the priority (for sorting)
17 and equality (and hash) to look at the element to enable
18 updates to the priority.
20 Unfortunately, this class can be tricky to work with if you forget that
21 `__lt__` compares the priority while `__eq__` compares the element.
22 In `greedy_modularity_communities()` the following code is
23 used to check that two _HeapElements differ in either element or priority:
25 if d_oldmax != row_max or d_oldmax.priority != row_max.priority:
27 If the priorities are the same, this implementation uses the element
28 as a tiebreaker. This provides compatibility with older systems that
29 use tuples to combine priority and elements.
30 """
32 __slots__ = ["priority", "element", "_hash"]
34 def __init__(self, priority, element):
35 self.priority = priority
36 self.element = element
37 self._hash = hash(element)
39 def __lt__(self, other):
40 try:
41 other_priority = other.priority
42 except AttributeError:
43 return self.priority < other
44 # assume comparing to another _HeapElement
45 if self.priority == other_priority:
46 try:
47 return self.element < other.element
48 except TypeError as err:
49 raise TypeError(
50 "Consider using a tuple, with a priority value that can be compared."
51 )
52 return self.priority < other_priority
54 def __gt__(self, other):
55 try:
56 other_priority = other.priority
57 except AttributeError:
58 return self.priority > other
59 # assume comparing to another _HeapElement
60 if self.priority == other_priority:
61 try:
62 return self.element > other.element
63 except TypeError as err:
64 raise TypeError(
65 "Consider using a tuple, with a priority value that can be compared."
66 )
67 return self.priority > other_priority
69 def __eq__(self, other):
70 try:
71 return self.element == other.element
72 except AttributeError:
73 return self.element == other
75 def __hash__(self):
76 return self._hash
78 def __getitem__(self, indx):
79 return self.priority if indx == 0 else self.element[indx - 1]
81 def __iter__(self):
82 yield self.priority
83 try:
84 yield from self.element
85 except TypeError:
86 yield self.element
88 def __repr__(self):
89 return f"_HeapElement({self.priority}, {self.element})"
92class MappedQueue:
93 """The MappedQueue class implements a min-heap with removal and update-priority.
95 The min heap uses heapq as well as custom written _siftup and _siftdown
96 methods to allow the heap positions to be tracked by an additional dict
97 keyed by element to position. The smallest element can be popped in O(1) time,
98 new elements can be pushed in O(log n) time, and any element can be removed
99 or updated in O(log n) time. The queue cannot contain duplicate elements
100 and an attempt to push an element already in the queue will have no effect.
102 MappedQueue complements the heapq package from the python standard
103 library. While MappedQueue is designed for maximum compatibility with
104 heapq, it adds element removal, lookup, and priority update.
106 Parameters
107 ----------
108 data : dict or iterable
110 Examples
111 --------
113 A `MappedQueue` can be created empty, or optionally, given a dictionary
114 of initial elements and priorities. The methods `push`, `pop`,
115 `remove`, and `update` operate on the queue.
117 >>> colors_nm = {'red':665, 'blue': 470, 'green': 550}
118 >>> q = MappedQueue(colors_nm)
119 >>> q.remove('red')
120 >>> q.update('green', 'violet', 400)
121 >>> q.push('indigo', 425)
122 True
123 >>> [q.pop().element for i in range(len(q.heap))]
124 ['violet', 'indigo', 'blue']
126 A `MappedQueue` can also be initialized with a list or other iterable. The priority is assumed
127 to be the sort order of the items in the list.
129 >>> q = MappedQueue([916, 50, 4609, 493, 237])
130 >>> q.remove(493)
131 >>> q.update(237, 1117)
132 >>> [q.pop() for i in range(len(q.heap))]
133 [50, 916, 1117, 4609]
135 An exception is raised if the elements are not comparable.
137 >>> q = MappedQueue([100, 'a'])
138 Traceback (most recent call last):
139 ...
140 TypeError: '<' not supported between instances of 'int' and 'str'
142 To avoid the exception, use a dictionary to assign priorities to the elements.
144 >>> q = MappedQueue({100: 0, 'a': 1 })
146 References
147 ----------
148 .. [1] Cormen, T. H., Leiserson, C. E., Rivest, R. L., & Stein, C. (2001).
149 Introduction to algorithms second edition.
150 .. [2] Knuth, D. E. (1997). The art of computer programming (Vol. 3).
151 Pearson Education.
152 """
154 def __init__(self, data=None):
155 """Priority queue class with updatable priorities."""
156 if data is None:
157 self.heap = []
158 elif isinstance(data, dict):
159 self.heap = [_HeapElement(v, k) for k, v in data.items()]
160 else:
161 self.heap = list(data)
162 self.position = {}
163 self._heapify()
165 def _heapify(self):
166 """Restore heap invariant and recalculate map."""
167 heapq.heapify(self.heap)
168 self.position = {elt: pos for pos, elt in enumerate(self.heap)}
169 if len(self.heap) != len(self.position):
170 raise AssertionError("Heap contains duplicate elements")
172 def __len__(self):
173 return len(self.heap)
175 def push(self, elt, priority=None):
176 """Add an element to the queue."""
177 if priority is not None:
178 elt = _HeapElement(priority, elt)
179 # If element is already in queue, do nothing
180 if elt in self.position:
181 return False
182 # Add element to heap and dict
183 pos = len(self.heap)
184 self.heap.append(elt)
185 self.position[elt] = pos
186 # Restore invariant by sifting down
187 self._siftdown(0, pos)
188 return True
190 def pop(self):
191 """Remove and return the smallest element in the queue."""
192 # Remove smallest element
193 elt = self.heap[0]
194 del self.position[elt]
195 # If elt is last item, remove and return
196 if len(self.heap) == 1:
197 self.heap.pop()
198 return elt
199 # Replace root with last element
200 last = self.heap.pop()
201 self.heap[0] = last
202 self.position[last] = 0
203 # Restore invariant by sifting up
204 self._siftup(0)
205 # Return smallest element
206 return elt
208 def update(self, elt, new, priority=None):
209 """Replace an element in the queue with a new one."""
210 if priority is not None:
211 new = _HeapElement(priority, new)
212 # Replace
213 pos = self.position[elt]
214 self.heap[pos] = new
215 del self.position[elt]
216 self.position[new] = pos
217 # Restore invariant by sifting up
218 self._siftup(pos)
220 def remove(self, elt):
221 """Remove an element from the queue."""
222 # Find and remove element
223 try:
224 pos = self.position[elt]
225 del self.position[elt]
226 except KeyError:
227 # Not in queue
228 raise
229 # If elt is last item, remove and return
230 if pos == len(self.heap) - 1:
231 self.heap.pop()
232 return
233 # Replace elt with last element
234 last = self.heap.pop()
235 self.heap[pos] = last
236 self.position[last] = pos
237 # Restore invariant by sifting up
238 self._siftup(pos)
240 def _siftup(self, pos):
241 """Move smaller child up until hitting a leaf.
243 Built to mimic code for heapq._siftup
244 only updating position dict too.
245 """
246 heap, position = self.heap, self.position
247 end_pos = len(heap)
248 startpos = pos
249 newitem = heap[pos]
250 # Shift up the smaller child until hitting a leaf
251 child_pos = (pos << 1) + 1 # start with leftmost child position
252 while child_pos < end_pos:
253 # Set child_pos to index of smaller child.
254 child = heap[child_pos]
255 right_pos = child_pos + 1
256 if right_pos < end_pos:
257 right = heap[right_pos]
258 if not child < right:
259 child = right
260 child_pos = right_pos
261 # Move the smaller child up.
262 heap[pos] = child
263 position[child] = pos
264 pos = child_pos
265 child_pos = (pos << 1) + 1
266 # pos is a leaf position. Put newitem there, and bubble it up
267 # to its final resting place (by sifting its parents down).
268 while pos > 0:
269 parent_pos = (pos - 1) >> 1
270 parent = heap[parent_pos]
271 if not newitem < parent:
272 break
273 heap[pos] = parent
274 position[parent] = pos
275 pos = parent_pos
276 heap[pos] = newitem
277 position[newitem] = pos
279 def _siftdown(self, start_pos, pos):
280 """Restore invariant. keep swapping with parent until smaller.
282 Built to mimic code for heapq._siftdown
283 only updating position dict too.
284 """
285 heap, position = self.heap, self.position
286 newitem = heap[pos]
287 # Follow the path to the root, moving parents down until finding a place
288 # newitem fits.
289 while pos > start_pos:
290 parent_pos = (pos - 1) >> 1
291 parent = heap[parent_pos]
292 if not newitem < parent:
293 break
294 heap[pos] = parent
295 position[parent] = pos
296 pos = parent_pos
297 heap[pos] = newitem
298 position[newitem] = pos