Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/networkx/utils/mapped_queue.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

147 statements  

1"""Priority queue class with updatable priorities.""" 

2 

3import heapq 

4 

5__all__ = ["MappedQueue"] 

6 

7 

8class _HeapElement: 

9 """This proxy class separates the heap element from its priority. 

10 

11 The idea is that using a 2-tuple (priority, element) works 

12 for sorting, but not for dict lookup because priorities are 

13 often floating point values so round-off can mess up equality. 

14 

15 So, we need inequalities to look at the priority (for sorting) 

16 and equality (and hash) to look at the element to enable 

17 updates to the priority. 

18 

19 Unfortunately, this class can be tricky to work with if you forget that 

20 `__lt__` compares the priority while `__eq__` compares the element. 

21 In `greedy_modularity_communities()` the following code is 

22 used to check that two _HeapElements differ in either element or priority: 

23 

24 if d_oldmax != row_max or d_oldmax.priority != row_max.priority: 

25 

26 If the priorities are the same, this implementation uses the element 

27 as a tiebreaker. This provides compatibility with older systems that 

28 use tuples to combine priority and elements. 

29 """ 

30 

31 __slots__ = ["priority", "element", "_hash"] 

32 

33 def __init__(self, priority, element): 

34 self.priority = priority 

35 self.element = element 

36 self._hash = hash(element) 

37 

38 def __lt__(self, other): 

39 try: 

40 other_priority = other.priority 

41 except AttributeError: 

42 return self.priority < other 

43 # assume comparing to another _HeapElement 

44 if self.priority == other_priority: 

45 try: 

46 return self.element < other.element 

47 except TypeError as err: 

48 raise TypeError( 

49 "Consider using a tuple, with a priority value that can be compared." 

50 ) 

51 return self.priority < other_priority 

52 

53 def __gt__(self, other): 

54 try: 

55 other_priority = other.priority 

56 except AttributeError: 

57 return self.priority > other 

58 # assume comparing to another _HeapElement 

59 if self.priority == other_priority: 

60 try: 

61 return self.element > other.element 

62 except TypeError as err: 

63 raise TypeError( 

64 "Consider using a tuple, with a priority value that can be compared." 

65 ) 

66 return self.priority > other_priority 

67 

68 def __eq__(self, other): 

69 try: 

70 return self.element == other.element 

71 except AttributeError: 

72 return self.element == other 

73 

74 def __hash__(self): 

75 return self._hash 

76 

77 def __getitem__(self, indx): 

78 return self.priority if indx == 0 else self.element[indx - 1] 

79 

80 def __iter__(self): 

81 yield self.priority 

82 try: 

83 yield from self.element 

84 except TypeError: 

85 yield self.element 

86 

87 def __repr__(self): 

88 return f"_HeapElement({self.priority}, {self.element})" 

89 

90 

91class MappedQueue: 

92 """The MappedQueue class implements a min-heap with removal and update-priority. 

93 

94 The min heap uses heapq as well as custom written _siftup and _siftdown 

95 methods to allow the heap positions to be tracked by an additional dict 

96 keyed by element to position. The smallest element can be popped in O(1) time, 

97 new elements can be pushed in O(log n) time, and any element can be removed 

98 or updated in O(log n) time. The queue cannot contain duplicate elements 

99 and an attempt to push an element already in the queue will have no effect. 

100 

101 MappedQueue complements the heapq package from the python standard 

102 library. While MappedQueue is designed for maximum compatibility with 

103 heapq, it adds element removal, lookup, and priority update. 

104 

105 Parameters 

106 ---------- 

107 data : dict or iterable 

108 

109 Examples 

110 -------- 

111 

112 A `MappedQueue` can be created empty, or optionally, given a dictionary 

113 of initial elements and priorities. The methods `push`, `pop`, 

114 `remove`, and `update` operate on the queue. 

115 

116 >>> colors_nm = {"red": 665, "blue": 470, "green": 550} 

117 >>> q = MappedQueue(colors_nm) 

118 >>> q.remove("red") 

119 >>> q.update("green", "violet", 400) 

120 >>> q.push("indigo", 425) 

121 True 

122 >>> [q.pop().element for i in range(len(q.heap))] 

123 ['violet', 'indigo', 'blue'] 

124 

125 A `MappedQueue` can also be initialized with a list or other iterable. The priority is assumed 

126 to be the sort order of the items in the list. 

127 

128 >>> q = MappedQueue([916, 50, 4609, 493, 237]) 

129 >>> q.remove(493) 

130 >>> q.update(237, 1117) 

131 >>> [q.pop() for i in range(len(q.heap))] 

132 [50, 916, 1117, 4609] 

133 

134 An exception is raised if the elements are not comparable. 

135 

136 >>> q = MappedQueue([100, "a"]) 

137 Traceback (most recent call last): 

138 ... 

139 TypeError: '<' not supported between instances of 'int' and 'str' 

140 

141 To avoid the exception, use a dictionary to assign priorities to the elements. 

142 

143 >>> q = MappedQueue({100: 0, "a": 1}) 

144 

145 References 

146 ---------- 

147 .. [1] Cormen, T. H., Leiserson, C. E., Rivest, R. L., & Stein, C. (2001). 

148 Introduction to algorithms second edition. 

149 .. [2] Knuth, D. E. (1997). The art of computer programming (Vol. 3). 

150 Pearson Education. 

151 """ 

152 

153 def __init__(self, data=None): 

154 """Priority queue class with updatable priorities.""" 

155 if data is None: 

156 self.heap = [] 

157 elif isinstance(data, dict): 

158 self.heap = [_HeapElement(v, k) for k, v in data.items()] 

159 else: 

160 self.heap = list(data) 

161 self.position = {} 

162 self._heapify() 

163 

164 def _heapify(self): 

165 """Restore heap invariant and recalculate map.""" 

166 heapq.heapify(self.heap) 

167 self.position = {elt: pos for pos, elt in enumerate(self.heap)} 

168 if len(self.heap) != len(self.position): 

169 raise AssertionError("Heap contains duplicate elements") 

170 

171 def __len__(self): 

172 return len(self.heap) 

173 

174 def push(self, elt, priority=None): 

175 """Add an element to the queue.""" 

176 if priority is not None: 

177 elt = _HeapElement(priority, elt) 

178 # If element is already in queue, do nothing 

179 if elt in self.position: 

180 return False 

181 # Add element to heap and dict 

182 pos = len(self.heap) 

183 self.heap.append(elt) 

184 self.position[elt] = pos 

185 # Restore invariant by sifting down 

186 self._siftdown(0, pos) 

187 return True 

188 

189 def pop(self): 

190 """Remove and return the smallest element in the queue.""" 

191 # Remove smallest element 

192 elt = self.heap[0] 

193 del self.position[elt] 

194 # If elt is last item, remove and return 

195 if len(self.heap) == 1: 

196 self.heap.pop() 

197 return elt 

198 # Replace root with last element 

199 last = self.heap.pop() 

200 self.heap[0] = last 

201 self.position[last] = 0 

202 # Restore invariant by sifting up 

203 self._siftup(0) 

204 # Return smallest element 

205 return elt 

206 

207 def update(self, elt, new, priority=None): 

208 """Replace an element in the queue with a new one.""" 

209 if priority is not None: 

210 new = _HeapElement(priority, new) 

211 # Replace 

212 pos = self.position[elt] 

213 self.heap[pos] = new 

214 del self.position[elt] 

215 self.position[new] = pos 

216 # Restore invariant by sifting up 

217 self._siftup(pos) 

218 

219 def remove(self, elt): 

220 """Remove an element from the queue.""" 

221 # Find and remove element 

222 try: 

223 pos = self.position[elt] 

224 del self.position[elt] 

225 except KeyError: 

226 # Not in queue 

227 raise 

228 # If elt is last item, remove and return 

229 if pos == len(self.heap) - 1: 

230 self.heap.pop() 

231 return 

232 # Replace elt with last element 

233 last = self.heap.pop() 

234 self.heap[pos] = last 

235 self.position[last] = pos 

236 # Restore invariant by sifting up 

237 self._siftup(pos) 

238 

239 def _siftup(self, pos): 

240 """Move smaller child up until hitting a leaf. 

241 

242 Built to mimic code for heapq._siftup 

243 only updating position dict too. 

244 """ 

245 heap, position = self.heap, self.position 

246 end_pos = len(heap) 

247 startpos = pos 

248 newitem = heap[pos] 

249 # Shift up the smaller child until hitting a leaf 

250 child_pos = (pos << 1) + 1 # start with leftmost child position 

251 while child_pos < end_pos: 

252 # Set child_pos to index of smaller child. 

253 child = heap[child_pos] 

254 right_pos = child_pos + 1 

255 if right_pos < end_pos: 

256 right = heap[right_pos] 

257 if not child < right: 

258 child = right 

259 child_pos = right_pos 

260 # Move the smaller child up. 

261 heap[pos] = child 

262 position[child] = pos 

263 pos = child_pos 

264 child_pos = (pos << 1) + 1 

265 # pos is a leaf position. Put newitem there, and bubble it up 

266 # to its final resting place (by sifting its parents down). 

267 while pos > 0: 

268 parent_pos = (pos - 1) >> 1 

269 parent = heap[parent_pos] 

270 if not newitem < parent: 

271 break 

272 heap[pos] = parent 

273 position[parent] = pos 

274 pos = parent_pos 

275 heap[pos] = newitem 

276 position[newitem] = pos 

277 

278 def _siftdown(self, start_pos, pos): 

279 """Restore invariant. keep swapping with parent until smaller. 

280 

281 Built to mimic code for heapq._siftdown 

282 only updating position dict too. 

283 """ 

284 heap, position = self.heap, self.position 

285 newitem = heap[pos] 

286 # Follow the path to the root, moving parents down until finding a place 

287 # newitem fits. 

288 while pos > start_pos: 

289 parent_pos = (pos - 1) >> 1 

290 parent = heap[parent_pos] 

291 if not newitem < parent: 

292 break 

293 heap[pos] = parent 

294 position[parent] = pos 

295 pos = parent_pos 

296 heap[pos] = newitem 

297 position[newitem] = pos