1# util/topological.py
2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: http://www.opensource.org/licenses/mit-license.php
7
8"""Topological sorting algorithms."""
9
10from .. import util
11from ..exc import CircularDependencyError
12
13
14__all__ = ["sort", "sort_as_subsets", "find_cycles"]
15
16
17def sort_as_subsets(tuples, allitems, deterministic_order=False):
18
19 edges = util.defaultdict(set)
20 for parent, child in tuples:
21 edges[child].add(parent)
22
23 Set = util.OrderedSet if deterministic_order else set
24
25 todo = Set(allitems)
26
27 while todo:
28 output = Set()
29 for node in todo:
30 if todo.isdisjoint(edges[node]):
31 output.add(node)
32
33 if not output:
34 raise CircularDependencyError(
35 "Circular dependency detected.",
36 find_cycles(tuples, allitems),
37 _gen_edges(edges),
38 )
39
40 todo.difference_update(output)
41 yield output
42
43
44def sort(tuples, allitems, deterministic_order=False):
45 """sort the given list of items by dependency.
46
47 'tuples' is a list of tuples representing a partial ordering.
48 'deterministic_order' keeps items within a dependency tier in list order.
49 """
50
51 for set_ in sort_as_subsets(tuples, allitems, deterministic_order):
52 for s in set_:
53 yield s
54
55
56def find_cycles(tuples, allitems):
57 # adapted from:
58 # http://neopythonic.blogspot.com/2009/01/detecting-cycles-in-directed-graph.html
59
60 edges = util.defaultdict(set)
61 for parent, child in tuples:
62 edges[parent].add(child)
63 nodes_to_test = set(edges)
64
65 output = set()
66
67 # we'd like to find all nodes that are
68 # involved in cycles, so we do the full
69 # pass through the whole thing for each
70 # node in the original list.
71
72 # we can go just through parent edge nodes.
73 # if a node is only a child and never a parent,
74 # by definition it can't be part of a cycle. same
75 # if it's not in the edges at all.
76 for node in nodes_to_test:
77 stack = [node]
78 todo = nodes_to_test.difference(stack)
79 while stack:
80 top = stack[-1]
81 for node in edges[top]:
82 if node in stack:
83 cyc = stack[stack.index(node) :]
84 todo.difference_update(cyc)
85 output.update(cyc)
86
87 if node in todo:
88 stack.append(node)
89 todo.remove(node)
90 break
91 else:
92 node = stack.pop()
93 return output
94
95
96def _gen_edges(edges):
97 return set([(right, left) for left in edges for right in edges[left]])