Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/SQLAlchemy-1.3.25.dev0-py3.11-linux-x86_64.egg/sqlalchemy/util/topological.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

48 statements  

1# util/topological.py 

2# Copyright (C) 2005-2021 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: http://www.opensource.org/licenses/mit-license.php 

7 

8"""Topological sorting algorithms.""" 

9 

10from .. import util 

11from ..exc import CircularDependencyError 

12 

13 

14__all__ = ["sort", "sort_as_subsets", "find_cycles"] 

15 

16 

17def sort_as_subsets(tuples, allitems, deterministic_order=False): 

18 

19 edges = util.defaultdict(set) 

20 for parent, child in tuples: 

21 edges[child].add(parent) 

22 

23 Set = util.OrderedSet if deterministic_order else set 

24 

25 todo = Set(allitems) 

26 

27 while todo: 

28 output = Set() 

29 for node in todo: 

30 if todo.isdisjoint(edges[node]): 

31 output.add(node) 

32 

33 if not output: 

34 raise CircularDependencyError( 

35 "Circular dependency detected.", 

36 find_cycles(tuples, allitems), 

37 _gen_edges(edges), 

38 ) 

39 

40 todo.difference_update(output) 

41 yield output 

42 

43 

44def sort(tuples, allitems, deterministic_order=False): 

45 """sort the given list of items by dependency. 

46 

47 'tuples' is a list of tuples representing a partial ordering. 

48 'deterministic_order' keeps items within a dependency tier in list order. 

49 """ 

50 

51 for set_ in sort_as_subsets(tuples, allitems, deterministic_order): 

52 for s in set_: 

53 yield s 

54 

55 

56def find_cycles(tuples, allitems): 

57 # adapted from: 

58 # http://neopythonic.blogspot.com/2009/01/detecting-cycles-in-directed-graph.html 

59 

60 edges = util.defaultdict(set) 

61 for parent, child in tuples: 

62 edges[parent].add(child) 

63 nodes_to_test = set(edges) 

64 

65 output = set() 

66 

67 # we'd like to find all nodes that are 

68 # involved in cycles, so we do the full 

69 # pass through the whole thing for each 

70 # node in the original list. 

71 

72 # we can go just through parent edge nodes. 

73 # if a node is only a child and never a parent, 

74 # by definition it can't be part of a cycle. same 

75 # if it's not in the edges at all. 

76 for node in nodes_to_test: 

77 stack = [node] 

78 todo = nodes_to_test.difference(stack) 

79 while stack: 

80 top = stack[-1] 

81 for node in edges[top]: 

82 if node in stack: 

83 cyc = stack[stack.index(node) :] 

84 todo.difference_update(cyc) 

85 output.update(cyc) 

86 

87 if node in todo: 

88 stack.append(node) 

89 todo.remove(node) 

90 break 

91 else: 

92 node = stack.pop() 

93 return output 

94 

95 

96def _gen_edges(edges): 

97 return set([(right, left) for left in edges for right in edges[left]])