1"""
2Utility classes and functions for network flow algorithms.
3"""
4
5from collections import deque
6
7import networkx as nx
8
9__all__ = [
10 "CurrentEdge",
11 "Level",
12 "GlobalRelabelThreshold",
13 "build_residual_network",
14 "detect_unboundedness",
15 "build_flow_dict",
16]
17
18
19class CurrentEdge:
20 """Mechanism for iterating over out-edges incident to a node in a circular
21 manner. StopIteration exception is raised when wraparound occurs.
22 """
23
24 __slots__ = ("_edges", "_it", "_curr")
25
26 def __init__(self, edges):
27 self._edges = edges
28 if self._edges:
29 self._rewind()
30
31 def get(self):
32 return self._curr
33
34 def move_to_next(self):
35 try:
36 self._curr = next(self._it)
37 except StopIteration:
38 self._rewind()
39 raise
40
41 def _rewind(self):
42 self._it = iter(self._edges.items())
43 self._curr = next(self._it)
44
45 def __eq__(self, other):
46 return (getattr(self, "_curr", None), self._edges) == (
47 (getattr(other, "_curr", None), other._edges)
48 )
49
50
51class Level:
52 """Active and inactive nodes in a level."""
53
54 __slots__ = ("active", "inactive")
55
56 def __init__(self):
57 self.active = set()
58 self.inactive = set()
59
60
61class GlobalRelabelThreshold:
62 """Measurement of work before the global relabeling heuristic should be
63 applied.
64 """
65
66 def __init__(self, n, m, freq):
67 self._threshold = (n + m) / freq if freq else float("inf")
68 self._work = 0
69
70 def add_work(self, work):
71 self._work += work
72
73 def is_reached(self):
74 return self._work >= self._threshold
75
76 def clear_work(self):
77 self._work = 0
78
79
80@nx._dispatchable(
81 edge_attrs={"capacity": float("inf")}, returns_graph=True, preserve_edge_attrs=True
82)
83def build_residual_network(G, capacity):
84 """Build a residual network and initialize a zero flow.
85
86 The residual network :samp:`R` from an input graph :samp:`G` has the
87 same nodes as :samp:`G`. :samp:`R` is a DiGraph that contains a pair
88 of edges :samp:`(u, v)` and :samp:`(v, u)` iff :samp:`(u, v)` is not a
89 self-loop, and at least one of :samp:`(u, v)` and :samp:`(v, u)` exists
90 in :samp:`G`.
91
92 For each edge :samp:`(u, v)` in :samp:`R`, :samp:`R[u][v]['capacity']`
93 is equal to the capacity of :samp:`(u, v)` in :samp:`G` if it exists
94 in :samp:`G` or zero otherwise. If the capacity is infinite,
95 :samp:`R[u][v]['capacity']` will have a high arbitrary finite value
96 that does not affect the solution of the problem. This value is stored in
97 :samp:`R.graph['inf']`. For each edge :samp:`(u, v)` in :samp:`R`,
98 :samp:`R[u][v]['flow']` represents the flow function of :samp:`(u, v)` and
99 satisfies :samp:`R[u][v]['flow'] == -R[v][u]['flow']`.
100
101 The flow value, defined as the total flow into :samp:`t`, the sink, is
102 stored in :samp:`R.graph['flow_value']`. If :samp:`cutoff` is not
103 specified, reachability to :samp:`t` using only edges :samp:`(u, v)` such
104 that :samp:`R[u][v]['flow'] < R[u][v]['capacity']` induces a minimum
105 :samp:`s`-:samp:`t` cut.
106
107 """
108 if G.is_multigraph():
109 raise nx.NetworkXError("MultiGraph and MultiDiGraph not supported (yet).")
110
111 R = nx.DiGraph()
112 R.__networkx_cache__ = None # Disable caching
113 R.add_nodes_from(G)
114
115 capacity = _capacity_function(G, capacity)
116 # Extract edges with positive capacities. Self loops excluded.
117 edge_list = [
118 (u, v, cap)
119 for u, v, attr in G.edges(data=True)
120 if u != v and (cap := capacity(u, v, attr)) is not None and cap > 0
121 ]
122
123 # Simulate infinity with three times the sum of the finite edge capacities
124 # or any positive value if the sum is zero. This allows the
125 # infinite-capacity edges to be distinguished for unboundedness detection
126 # and directly participate in residual capacity calculation. If the maximum
127 # flow is finite, these edges cannot appear in the minimum cut and thus
128 # guarantee correctness. Since the residual capacity of an
129 # infinite-capacity edge is always at least 2/3 of inf, while that of an
130 # finite-capacity edge is at most 1/3 of inf, if an operation moves more
131 # than 1/3 of inf units of flow to t, there must be an infinite-capacity
132 # s-t path in G.
133 inf = float("inf")
134 inf = 3 * sum(cap for u, v, cap in edge_list if cap != inf) or 1
135 if G.is_directed():
136 for u, v, cap in edge_list:
137 r = min(cap, inf)
138 if not R.has_edge(u, v):
139 # Both (u, v) and (v, u) must be present in the residual
140 # network.
141 R.add_edge(u, v, capacity=r)
142 R.add_edge(v, u, capacity=0)
143 else:
144 # The edge (u, v) was added when (v, u) was visited.
145 R[u][v]["capacity"] = r
146 else:
147 for u, v, cap in edge_list:
148 # Add a pair of edges with equal residual capacities.
149 r = min(cap, inf)
150 R.add_edge(u, v, capacity=r)
151 R.add_edge(v, u, capacity=r)
152
153 # Record the value simulating infinity.
154 R.graph["inf"] = inf
155
156 return R
157
158
159@nx._dispatchable(
160 graphs="R",
161 preserve_edge_attrs={"R": {"capacity": float("inf")}},
162 preserve_graph_attrs=True,
163)
164def detect_unboundedness(R, s, t):
165 """Detect an infinite-capacity s-t path in R."""
166 q = deque([s])
167 seen = {s}
168 inf = R.graph["inf"]
169 while q:
170 u = q.popleft()
171 for v, attr in R[u].items():
172 if attr["capacity"] == inf and v not in seen:
173 if v == t:
174 raise nx.NetworkXUnbounded(
175 "Infinite capacity path, flow unbounded above."
176 )
177 seen.add(v)
178 q.append(v)
179
180
181@nx._dispatchable(graphs={"G": 0, "R": 1}, preserve_edge_attrs={"R": {"flow": None}})
182def build_flow_dict(G, R):
183 """Build a flow dictionary from a residual network."""
184 flow_dict = {}
185 for u in G:
186 flow_dict[u] = {v: 0 for v in G[u]}
187 flow_dict[u].update(
188 (v, attr["flow"]) for v, attr in R[u].items() if attr["flow"] > 0
189 )
190 return flow_dict
191
192
193def _capacity_function(G, capacity):
194 """Returns a callable that returns the capacity of an edge.
195
196 Parameters
197 ----------
198 G : NetworkX graph
199 MultiGraph and MultiDiGraph are not supported.
200
201 capacity : string or callable
202 If callable, the callable is returned unchanged. For it to work
203 it must accept exactly three positional arguments
204 ``(u, v, edge_attrs)``, where `u` and `v` are the endpoints of
205 an edge and ``edge_attrs`` is the corresponding edge attribute
206 dictionary. The callable must return either None (to hide an edge)
207 or a number representing the capacity.
208
209 If string, it is interpreted as the key of an edge attribute storing
210 the capacity. In this case, a new callable is returned that accepts
211 ``(u, v, edge_attrs)`` and returns ``edge_attrs[capacity]``.
212
213 Returns
214 -------
215 callable
216 A callable with signature ``(u, v, edge_attrs)`` that returns the
217 capacity of the edge joining nodes `u` and `v`.
218
219 Notes
220 -----
221 If `capacity` is a string and the specified attribute is not present on
222 an edge, that edge is assumed to have infinite capacity.
223 """
224
225 inf = float("inf")
226 if callable(capacity):
227 return capacity
228 # If the capacity keyword argument is not callable, we assume it is a
229 # string representing the edge attribute containing the capacity of
230 # the edge.
231 return lambda u, v, data: data.get(capacity, inf)