Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/util/topological.py: 17%

48 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# util/topological.py 

2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Topological sorting algorithms.""" 

9 

10from .. import util 

11from ..exc import CircularDependencyError 

12 

13__all__ = ["sort", "sort_as_subsets", "find_cycles"] 

14 

15 

16def sort_as_subsets(tuples, allitems): 

17 

18 edges = util.defaultdict(set) 

19 for parent, child in tuples: 

20 edges[child].add(parent) 

21 

22 todo = list(allitems) 

23 todo_set = set(allitems) 

24 

25 while todo_set: 

26 output = [] 

27 for node in todo: 

28 if todo_set.isdisjoint(edges[node]): 

29 output.append(node) 

30 

31 if not output: 

32 raise CircularDependencyError( 

33 "Circular dependency detected.", 

34 find_cycles(tuples, allitems), 

35 _gen_edges(edges), 

36 ) 

37 

38 todo_set.difference_update(output) 

39 todo = [t for t in todo if t in todo_set] 

40 yield output 

41 

42 

43def sort(tuples, allitems, deterministic_order=True): 

44 """sort the given list of items by dependency. 

45 

46 'tuples' is a list of tuples representing a partial ordering. 

47 

48 deterministic_order is no longer used, the order is now always 

49 deterministic given the order of "allitems". the flag is there 

50 for backwards compatibility with Alembic. 

51 

52 """ 

53 

54 for set_ in sort_as_subsets(tuples, allitems): 

55 for s in set_: 

56 yield s 

57 

58 

59def find_cycles(tuples, allitems): 

60 # adapted from: 

61 # https://neopythonic.blogspot.com/2009/01/detecting-cycles-in-directed-graph.html 

62 

63 edges = util.defaultdict(set) 

64 for parent, child in tuples: 

65 edges[parent].add(child) 

66 nodes_to_test = set(edges) 

67 

68 output = set() 

69 

70 # we'd like to find all nodes that are 

71 # involved in cycles, so we do the full 

72 # pass through the whole thing for each 

73 # node in the original list. 

74 

75 # we can go just through parent edge nodes. 

76 # if a node is only a child and never a parent, 

77 # by definition it can't be part of a cycle. same 

78 # if it's not in the edges at all. 

79 for node in nodes_to_test: 

80 stack = [node] 

81 todo = nodes_to_test.difference(stack) 

82 while stack: 

83 top = stack[-1] 

84 for node in edges[top]: 

85 if node in stack: 

86 cyc = stack[stack.index(node) :] 

87 todo.difference_update(cyc) 

88 output.update(cyc) 

89 

90 if node in todo: 

91 stack.append(node) 

92 todo.remove(node) 

93 break 

94 else: 

95 node = stack.pop() 

96 return output 

97 

98 

99def _gen_edges(edges): 

100 return set([(right, left) for left in edges for right in edges[left]])