Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/networkx/algorithms/flow/utils.py: 31%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

81 statements  

1""" 

2Utility classes and functions for network flow algorithms. 

3""" 

4 

5from collections import deque 

6 

7import networkx as nx 

8 

9__all__ = [ 

10 "CurrentEdge", 

11 "Level", 

12 "GlobalRelabelThreshold", 

13 "build_residual_network", 

14 "detect_unboundedness", 

15 "build_flow_dict", 

16] 

17 

18 

19class CurrentEdge: 

20 """Mechanism for iterating over out-edges incident to a node in a circular 

21 manner. StopIteration exception is raised when wraparound occurs. 

22 """ 

23 

24 __slots__ = ("_edges", "_it", "_curr") 

25 

26 def __init__(self, edges): 

27 self._edges = edges 

28 if self._edges: 

29 self._rewind() 

30 

31 def get(self): 

32 return self._curr 

33 

34 def move_to_next(self): 

35 try: 

36 self._curr = next(self._it) 

37 except StopIteration: 

38 self._rewind() 

39 raise 

40 

41 def _rewind(self): 

42 self._it = iter(self._edges.items()) 

43 self._curr = next(self._it) 

44 

45 def __eq__(self, other): 

46 return (getattr(self, "_curr", None), self._edges) == ( 

47 (getattr(other, "_curr", None), other._edges) 

48 ) 

49 

50 

51class Level: 

52 """Active and inactive nodes in a level.""" 

53 

54 __slots__ = ("active", "inactive") 

55 

56 def __init__(self): 

57 self.active = set() 

58 self.inactive = set() 

59 

60 

61class GlobalRelabelThreshold: 

62 """Measurement of work before the global relabeling heuristic should be 

63 applied. 

64 """ 

65 

66 def __init__(self, n, m, freq): 

67 self._threshold = (n + m) / freq if freq else float("inf") 

68 self._work = 0 

69 

70 def add_work(self, work): 

71 self._work += work 

72 

73 def is_reached(self): 

74 return self._work >= self._threshold 

75 

76 def clear_work(self): 

77 self._work = 0 

78 

79 

80@nx._dispatchable(edge_attrs={"capacity": float("inf")}, returns_graph=True) 

81def build_residual_network(G, capacity): 

82 """Build a residual network and initialize a zero flow. 

83 

84 The residual network :samp:`R` from an input graph :samp:`G` has the 

85 same nodes as :samp:`G`. :samp:`R` is a DiGraph that contains a pair 

86 of edges :samp:`(u, v)` and :samp:`(v, u)` iff :samp:`(u, v)` is not a 

87 self-loop, and at least one of :samp:`(u, v)` and :samp:`(v, u)` exists 

88 in :samp:`G`. 

89 

90 For each edge :samp:`(u, v)` in :samp:`R`, :samp:`R[u][v]['capacity']` 

91 is equal to the capacity of :samp:`(u, v)` in :samp:`G` if it exists 

92 in :samp:`G` or zero otherwise. If the capacity is infinite, 

93 :samp:`R[u][v]['capacity']` will have a high arbitrary finite value 

94 that does not affect the solution of the problem. This value is stored in 

95 :samp:`R.graph['inf']`. For each edge :samp:`(u, v)` in :samp:`R`, 

96 :samp:`R[u][v]['flow']` represents the flow function of :samp:`(u, v)` and 

97 satisfies :samp:`R[u][v]['flow'] == -R[v][u]['flow']`. 

98 

99 The flow value, defined as the total flow into :samp:`t`, the sink, is 

100 stored in :samp:`R.graph['flow_value']`. If :samp:`cutoff` is not 

101 specified, reachability to :samp:`t` using only edges :samp:`(u, v)` such 

102 that :samp:`R[u][v]['flow'] < R[u][v]['capacity']` induces a minimum 

103 :samp:`s`-:samp:`t` cut. 

104 

105 """ 

106 if G.is_multigraph(): 

107 raise nx.NetworkXError("MultiGraph and MultiDiGraph not supported (yet).") 

108 

109 R = nx.DiGraph() 

110 R.__networkx_cache__ = None # Disable caching 

111 R.add_nodes_from(G) 

112 

113 inf = float("inf") 

114 # Extract edges with positive capacities. Self loops excluded. 

115 edge_list = [ 

116 (u, v, attr) 

117 for u, v, attr in G.edges(data=True) 

118 if u != v and attr.get(capacity, inf) > 0 

119 ] 

120 # Simulate infinity with three times the sum of the finite edge capacities 

121 # or any positive value if the sum is zero. This allows the 

122 # infinite-capacity edges to be distinguished for unboundedness detection 

123 # and directly participate in residual capacity calculation. If the maximum 

124 # flow is finite, these edges cannot appear in the minimum cut and thus 

125 # guarantee correctness. Since the residual capacity of an 

126 # infinite-capacity edge is always at least 2/3 of inf, while that of an 

127 # finite-capacity edge is at most 1/3 of inf, if an operation moves more 

128 # than 1/3 of inf units of flow to t, there must be an infinite-capacity 

129 # s-t path in G. 

130 inf = ( 

131 3 

132 * sum( 

133 attr[capacity] 

134 for u, v, attr in edge_list 

135 if capacity in attr and attr[capacity] != inf 

136 ) 

137 or 1 

138 ) 

139 if G.is_directed(): 

140 for u, v, attr in edge_list: 

141 r = min(attr.get(capacity, inf), inf) 

142 if not R.has_edge(u, v): 

143 # Both (u, v) and (v, u) must be present in the residual 

144 # network. 

145 R.add_edge(u, v, capacity=r) 

146 R.add_edge(v, u, capacity=0) 

147 else: 

148 # The edge (u, v) was added when (v, u) was visited. 

149 R[u][v]["capacity"] = r 

150 else: 

151 for u, v, attr in edge_list: 

152 # Add a pair of edges with equal residual capacities. 

153 r = min(attr.get(capacity, inf), inf) 

154 R.add_edge(u, v, capacity=r) 

155 R.add_edge(v, u, capacity=r) 

156 

157 # Record the value simulating infinity. 

158 R.graph["inf"] = inf 

159 

160 return R 

161 

162 

163@nx._dispatchable( 

164 graphs="R", 

165 preserve_edge_attrs={"R": {"capacity": float("inf")}}, 

166 preserve_graph_attrs=True, 

167) 

168def detect_unboundedness(R, s, t): 

169 """Detect an infinite-capacity s-t path in R.""" 

170 q = deque([s]) 

171 seen = {s} 

172 inf = R.graph["inf"] 

173 while q: 

174 u = q.popleft() 

175 for v, attr in R[u].items(): 

176 if attr["capacity"] == inf and v not in seen: 

177 if v == t: 

178 raise nx.NetworkXUnbounded( 

179 "Infinite capacity path, flow unbounded above." 

180 ) 

181 seen.add(v) 

182 q.append(v) 

183 

184 

185@nx._dispatchable(graphs={"G": 0, "R": 1}, preserve_edge_attrs={"R": {"flow": None}}) 

186def build_flow_dict(G, R): 

187 """Build a flow dictionary from a residual network.""" 

188 flow_dict = {} 

189 for u in G: 

190 flow_dict[u] = {v: 0 for v in G[u]} 

191 flow_dict[u].update( 

192 (v, attr["flow"]) for v, attr in R[u].items() if attr["flow"] > 0 

193 ) 

194 return flow_dict