Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/networkx/algorithms/flow/utils.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

87 statements  

1""" 

2Utility classes and functions for network flow algorithms. 

3""" 

4 

5from collections import deque 

6 

7import networkx as nx 

8 

9__all__ = [ 

10 "CurrentEdge", 

11 "Level", 

12 "GlobalRelabelThreshold", 

13 "build_residual_network", 

14 "detect_unboundedness", 

15 "build_flow_dict", 

16] 

17 

18 

19class CurrentEdge: 

20 """Mechanism for iterating over out-edges incident to a node in a circular 

21 manner. StopIteration exception is raised when wraparound occurs. 

22 """ 

23 

24 __slots__ = ("_edges", "_it", "_curr") 

25 

26 def __init__(self, edges): 

27 self._edges = edges 

28 if self._edges: 

29 self._rewind() 

30 

31 def get(self): 

32 return self._curr 

33 

34 def move_to_next(self): 

35 try: 

36 self._curr = next(self._it) 

37 except StopIteration: 

38 self._rewind() 

39 raise 

40 

41 def _rewind(self): 

42 self._it = iter(self._edges.items()) 

43 self._curr = next(self._it) 

44 

45 def __eq__(self, other): 

46 return (getattr(self, "_curr", None), self._edges) == ( 

47 (getattr(other, "_curr", None), other._edges) 

48 ) 

49 

50 

51class Level: 

52 """Active and inactive nodes in a level.""" 

53 

54 __slots__ = ("active", "inactive") 

55 

56 def __init__(self): 

57 self.active = set() 

58 self.inactive = set() 

59 

60 

61class GlobalRelabelThreshold: 

62 """Measurement of work before the global relabeling heuristic should be 

63 applied. 

64 """ 

65 

66 def __init__(self, n, m, freq): 

67 self._threshold = (n + m) / freq if freq else float("inf") 

68 self._work = 0 

69 

70 def add_work(self, work): 

71 self._work += work 

72 

73 def is_reached(self): 

74 return self._work >= self._threshold 

75 

76 def clear_work(self): 

77 self._work = 0 

78 

79 

80@nx._dispatchable( 

81 edge_attrs={"capacity": float("inf")}, returns_graph=True, preserve_edge_attrs=True 

82) 

83def build_residual_network(G, capacity): 

84 """Build a residual network and initialize a zero flow. 

85 

86 The residual network :samp:`R` from an input graph :samp:`G` has the 

87 same nodes as :samp:`G`. :samp:`R` is a DiGraph that contains a pair 

88 of edges :samp:`(u, v)` and :samp:`(v, u)` iff :samp:`(u, v)` is not a 

89 self-loop, and at least one of :samp:`(u, v)` and :samp:`(v, u)` exists 

90 in :samp:`G`. 

91 

92 For each edge :samp:`(u, v)` in :samp:`R`, :samp:`R[u][v]['capacity']` 

93 is equal to the capacity of :samp:`(u, v)` in :samp:`G` if it exists 

94 in :samp:`G` or zero otherwise. If the capacity is infinite, 

95 :samp:`R[u][v]['capacity']` will have a high arbitrary finite value 

96 that does not affect the solution of the problem. This value is stored in 

97 :samp:`R.graph['inf']`. For each edge :samp:`(u, v)` in :samp:`R`, 

98 :samp:`R[u][v]['flow']` represents the flow function of :samp:`(u, v)` and 

99 satisfies :samp:`R[u][v]['flow'] == -R[v][u]['flow']`. 

100 

101 The flow value, defined as the total flow into :samp:`t`, the sink, is 

102 stored in :samp:`R.graph['flow_value']`. If :samp:`cutoff` is not 

103 specified, reachability to :samp:`t` using only edges :samp:`(u, v)` such 

104 that :samp:`R[u][v]['flow'] < R[u][v]['capacity']` induces a minimum 

105 :samp:`s`-:samp:`t` cut. 

106 

107 """ 

108 if G.is_multigraph(): 

109 raise nx.NetworkXError("MultiGraph and MultiDiGraph not supported (yet).") 

110 

111 R = nx.DiGraph() 

112 R.__networkx_cache__ = None # Disable caching 

113 R.add_nodes_from(G) 

114 

115 capacity = _capacity_function(G, capacity) 

116 # Extract edges with positive capacities. Self loops excluded. 

117 edge_list = [ 

118 (u, v, cap) 

119 for u, v, attr in G.edges(data=True) 

120 if u != v and (cap := capacity(u, v, attr)) is not None and cap > 0 

121 ] 

122 

123 # Simulate infinity with three times the sum of the finite edge capacities 

124 # or any positive value if the sum is zero. This allows the 

125 # infinite-capacity edges to be distinguished for unboundedness detection 

126 # and directly participate in residual capacity calculation. If the maximum 

127 # flow is finite, these edges cannot appear in the minimum cut and thus 

128 # guarantee correctness. Since the residual capacity of an 

129 # infinite-capacity edge is always at least 2/3 of inf, while that of an 

130 # finite-capacity edge is at most 1/3 of inf, if an operation moves more 

131 # than 1/3 of inf units of flow to t, there must be an infinite-capacity 

132 # s-t path in G. 

133 inf = float("inf") 

134 inf = 3 * sum(cap for u, v, cap in edge_list if cap != inf) or 1 

135 if G.is_directed(): 

136 for u, v, cap in edge_list: 

137 r = min(cap, inf) 

138 if not R.has_edge(u, v): 

139 # Both (u, v) and (v, u) must be present in the residual 

140 # network. 

141 R.add_edge(u, v, capacity=r) 

142 R.add_edge(v, u, capacity=0) 

143 else: 

144 # The edge (u, v) was added when (v, u) was visited. 

145 R[u][v]["capacity"] = r 

146 else: 

147 for u, v, cap in edge_list: 

148 # Add a pair of edges with equal residual capacities. 

149 r = min(cap, inf) 

150 R.add_edge(u, v, capacity=r) 

151 R.add_edge(v, u, capacity=r) 

152 

153 # Record the value simulating infinity. 

154 R.graph["inf"] = inf 

155 

156 return R 

157 

158 

159@nx._dispatchable( 

160 graphs="R", 

161 preserve_edge_attrs={"R": {"capacity": float("inf")}}, 

162 preserve_graph_attrs=True, 

163) 

164def detect_unboundedness(R, s, t): 

165 """Detect an infinite-capacity s-t path in R.""" 

166 q = deque([s]) 

167 seen = {s} 

168 inf = R.graph["inf"] 

169 while q: 

170 u = q.popleft() 

171 for v, attr in R[u].items(): 

172 if attr["capacity"] == inf and v not in seen: 

173 if v == t: 

174 raise nx.NetworkXUnbounded( 

175 "Infinite capacity path, flow unbounded above." 

176 ) 

177 seen.add(v) 

178 q.append(v) 

179 

180 

181@nx._dispatchable(graphs={"G": 0, "R": 1}, preserve_edge_attrs={"R": {"flow": None}}) 

182def build_flow_dict(G, R): 

183 """Build a flow dictionary from a residual network.""" 

184 flow_dict = {} 

185 for u in G: 

186 flow_dict[u] = {v: 0 for v in G[u]} 

187 flow_dict[u].update( 

188 (v, attr["flow"]) for v, attr in R[u].items() if attr["flow"] > 0 

189 ) 

190 return flow_dict 

191 

192 

193def _capacity_function(G, capacity): 

194 """Returns a callable that returns the capacity of an edge. 

195 

196 Parameters 

197 ---------- 

198 G : NetworkX graph 

199 MultiGraph and MultiDiGraph are not supported. 

200 

201 capacity : string or callable 

202 If callable, the callable is returned unchanged. For it to work 

203 it must accept exactly three positional arguments 

204 ``(u, v, edge_attrs)``, where `u` and `v` are the endpoints of 

205 an edge and ``edge_attrs`` is the corresponding edge attribute 

206 dictionary. The callable must return either None (to hide an edge) 

207 or a number representing the capacity. 

208 

209 If string, it is interpreted as the key of an edge attribute storing 

210 the capacity. In this case, a new callable is returned that accepts 

211 ``(u, v, edge_attrs)`` and returns ``edge_attrs[capacity]``. 

212 

213 Returns 

214 ------- 

215 callable 

216 A callable with signature ``(u, v, edge_attrs)`` that returns the 

217 capacity of the edge joining nodes `u` and `v`. 

218 

219 Notes 

220 ----- 

221 If `capacity` is a string and the specified attribute is not present on 

222 an edge, that edge is assumed to have infinite capacity. 

223 """ 

224 

225 inf = float("inf") 

226 if callable(capacity): 

227 return capacity 

228 # If the capacity keyword argument is not callable, we assume it is a 

229 # string representing the edge attribute containing the capacity of 

230 # the edge. 

231 return lambda u, v, data: data.get(capacity, inf)