1"""
2Utility classes and functions for network flow algorithms.
3"""
4
5from collections import deque
6
7import networkx as nx
8
9__all__ = [
10 "CurrentEdge",
11 "Level",
12 "GlobalRelabelThreshold",
13 "build_residual_network",
14 "detect_unboundedness",
15 "build_flow_dict",
16]
17
18
19class CurrentEdge:
20 """Mechanism for iterating over out-edges incident to a node in a circular
21 manner. StopIteration exception is raised when wraparound occurs.
22 """
23
24 __slots__ = ("_edges", "_it", "_curr")
25
26 def __init__(self, edges):
27 self._edges = edges
28 if self._edges:
29 self._rewind()
30
31 def get(self):
32 return self._curr
33
34 def move_to_next(self):
35 try:
36 self._curr = next(self._it)
37 except StopIteration:
38 self._rewind()
39 raise
40
41 def _rewind(self):
42 self._it = iter(self._edges.items())
43 self._curr = next(self._it)
44
45 def __eq__(self, other):
46 return (getattr(self, "_curr", None), self._edges) == (
47 (getattr(other, "_curr", None), other._edges)
48 )
49
50
51class Level:
52 """Active and inactive nodes in a level."""
53
54 __slots__ = ("active", "inactive")
55
56 def __init__(self):
57 self.active = set()
58 self.inactive = set()
59
60
61class GlobalRelabelThreshold:
62 """Measurement of work before the global relabeling heuristic should be
63 applied.
64 """
65
66 def __init__(self, n, m, freq):
67 self._threshold = (n + m) / freq if freq else float("inf")
68 self._work = 0
69
70 def add_work(self, work):
71 self._work += work
72
73 def is_reached(self):
74 return self._work >= self._threshold
75
76 def clear_work(self):
77 self._work = 0
78
79
80@nx._dispatchable(edge_attrs={"capacity": float("inf")}, returns_graph=True)
81def build_residual_network(G, capacity):
82 """Build a residual network and initialize a zero flow.
83
84 The residual network :samp:`R` from an input graph :samp:`G` has the
85 same nodes as :samp:`G`. :samp:`R` is a DiGraph that contains a pair
86 of edges :samp:`(u, v)` and :samp:`(v, u)` iff :samp:`(u, v)` is not a
87 self-loop, and at least one of :samp:`(u, v)` and :samp:`(v, u)` exists
88 in :samp:`G`.
89
90 For each edge :samp:`(u, v)` in :samp:`R`, :samp:`R[u][v]['capacity']`
91 is equal to the capacity of :samp:`(u, v)` in :samp:`G` if it exists
92 in :samp:`G` or zero otherwise. If the capacity is infinite,
93 :samp:`R[u][v]['capacity']` will have a high arbitrary finite value
94 that does not affect the solution of the problem. This value is stored in
95 :samp:`R.graph['inf']`. For each edge :samp:`(u, v)` in :samp:`R`,
96 :samp:`R[u][v]['flow']` represents the flow function of :samp:`(u, v)` and
97 satisfies :samp:`R[u][v]['flow'] == -R[v][u]['flow']`.
98
99 The flow value, defined as the total flow into :samp:`t`, the sink, is
100 stored in :samp:`R.graph['flow_value']`. If :samp:`cutoff` is not
101 specified, reachability to :samp:`t` using only edges :samp:`(u, v)` such
102 that :samp:`R[u][v]['flow'] < R[u][v]['capacity']` induces a minimum
103 :samp:`s`-:samp:`t` cut.
104
105 """
106 if G.is_multigraph():
107 raise nx.NetworkXError("MultiGraph and MultiDiGraph not supported (yet).")
108
109 R = nx.DiGraph()
110 R.__networkx_cache__ = None # Disable caching
111 R.add_nodes_from(G)
112
113 inf = float("inf")
114 # Extract edges with positive capacities. Self loops excluded.
115 edge_list = [
116 (u, v, attr)
117 for u, v, attr in G.edges(data=True)
118 if u != v and attr.get(capacity, inf) > 0
119 ]
120 # Simulate infinity with three times the sum of the finite edge capacities
121 # or any positive value if the sum is zero. This allows the
122 # infinite-capacity edges to be distinguished for unboundedness detection
123 # and directly participate in residual capacity calculation. If the maximum
124 # flow is finite, these edges cannot appear in the minimum cut and thus
125 # guarantee correctness. Since the residual capacity of an
126 # infinite-capacity edge is always at least 2/3 of inf, while that of an
127 # finite-capacity edge is at most 1/3 of inf, if an operation moves more
128 # than 1/3 of inf units of flow to t, there must be an infinite-capacity
129 # s-t path in G.
130 inf = (
131 3
132 * sum(
133 attr[capacity]
134 for u, v, attr in edge_list
135 if capacity in attr and attr[capacity] != inf
136 )
137 or 1
138 )
139 if G.is_directed():
140 for u, v, attr in edge_list:
141 r = min(attr.get(capacity, inf), inf)
142 if not R.has_edge(u, v):
143 # Both (u, v) and (v, u) must be present in the residual
144 # network.
145 R.add_edge(u, v, capacity=r)
146 R.add_edge(v, u, capacity=0)
147 else:
148 # The edge (u, v) was added when (v, u) was visited.
149 R[u][v]["capacity"] = r
150 else:
151 for u, v, attr in edge_list:
152 # Add a pair of edges with equal residual capacities.
153 r = min(attr.get(capacity, inf), inf)
154 R.add_edge(u, v, capacity=r)
155 R.add_edge(v, u, capacity=r)
156
157 # Record the value simulating infinity.
158 R.graph["inf"] = inf
159
160 return R
161
162
163@nx._dispatchable(
164 graphs="R",
165 preserve_edge_attrs={"R": {"capacity": float("inf")}},
166 preserve_graph_attrs=True,
167)
168def detect_unboundedness(R, s, t):
169 """Detect an infinite-capacity s-t path in R."""
170 q = deque([s])
171 seen = {s}
172 inf = R.graph["inf"]
173 while q:
174 u = q.popleft()
175 for v, attr in R[u].items():
176 if attr["capacity"] == inf and v not in seen:
177 if v == t:
178 raise nx.NetworkXUnbounded(
179 "Infinite capacity path, flow unbounded above."
180 )
181 seen.add(v)
182 q.append(v)
183
184
185@nx._dispatchable(graphs={"G": 0, "R": 1}, preserve_edge_attrs={"R": {"flow": None}})
186def build_flow_dict(G, R):
187 """Build a flow dictionary from a residual network."""
188 flow_dict = {}
189 for u in G:
190 flow_dict[u] = {v: 0 for v in G[u]}
191 flow_dict[u].update(
192 (v, attr["flow"]) for v, attr in R[u].items() if attr["flow"] > 0
193 )
194 return flow_dict