Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/naming.py: 80%

122 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1# sqlalchemy/naming.py 

2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7 

8"""Establish constraint and index naming conventions. 

9 

10 

11""" 

12 

13import re 

14 

15from . import events # noqa 

16from .elements import _NONE_NAME 

17from .elements import conv 

18from .schema import CheckConstraint 

19from .schema import Column 

20from .schema import Constraint 

21from .schema import ForeignKeyConstraint 

22from .schema import Index 

23from .schema import PrimaryKeyConstraint 

24from .schema import Table 

25from .schema import UniqueConstraint 

26from .. import event 

27from .. import exc 

28 

29 

30class ConventionDict(object): 

31 def __init__(self, const, table, convention): 

32 self.const = const 

33 self._is_fk = isinstance(const, ForeignKeyConstraint) 

34 self.table = table 

35 self.convention = convention 

36 self._const_name = const.name 

37 

38 def _key_table_name(self): 

39 return self.table.name 

40 

41 def _column_X(self, idx, attrname): 

42 if self._is_fk: 

43 try: 

44 fk = self.const.elements[idx] 

45 except IndexError: 

46 return "" 

47 else: 

48 return getattr(fk.parent, attrname) 

49 else: 

50 cols = list(self.const.columns) 

51 try: 

52 col = cols[idx] 

53 except IndexError: 

54 return "" 

55 else: 

56 return getattr(col, attrname) 

57 

58 def _key_constraint_name(self): 

59 if self._const_name in (None, _NONE_NAME): 

60 raise exc.InvalidRequestError( 

61 "Naming convention including " 

62 "%(constraint_name)s token requires that " 

63 "constraint is explicitly named." 

64 ) 

65 if not isinstance(self._const_name, conv): 

66 self.const.name = None 

67 return self._const_name 

68 

69 def _key_column_X_key(self, idx): 

70 # note this method was missing before 

71 # [ticket:3989], meaning tokens like ``%(column_0_key)s`` weren't 

72 # working even though documented. 

73 return self._column_X(idx, "key") 

74 

75 def _key_column_X_name(self, idx): 

76 return self._column_X(idx, "name") 

77 

78 def _key_column_X_label(self, idx): 

79 return self._column_X(idx, "_ddl_label") 

80 

81 def _key_referred_table_name(self): 

82 fk = self.const.elements[0] 

83 refs = fk.target_fullname.split(".") 

84 if len(refs) == 3: 

85 refschema, reftable, refcol = refs 

86 else: 

87 reftable, refcol = refs 

88 return reftable 

89 

90 def _key_referred_column_X_name(self, idx): 

91 fk = self.const.elements[idx] 

92 # note that before [ticket:3989], this method was returning 

93 # the specification for the :class:`.ForeignKey` itself, which normally 

94 # would be using the ``.key`` of the column, not the name. 

95 return fk.column.name 

96 

97 def __getitem__(self, key): 

98 if key in self.convention: 

99 return self.convention[key](self.const, self.table) 

100 elif hasattr(self, "_key_%s" % key): 

101 return getattr(self, "_key_%s" % key)() 

102 else: 

103 col_template = re.match(r".*_?column_(\d+)(_?N)?_.+", key) 

104 if col_template: 

105 idx = col_template.group(1) 

106 multiples = col_template.group(2) 

107 

108 if multiples: 

109 if self._is_fk: 

110 elems = self.const.elements 

111 else: 

112 elems = list(self.const.columns) 

113 tokens = [] 

114 for idx, elem in enumerate(elems): 

115 attr = "_key_" + key.replace("0" + multiples, "X") 

116 try: 

117 tokens.append(getattr(self, attr)(idx)) 

118 except AttributeError: 

119 raise KeyError(key) 

120 sep = "_" if multiples.startswith("_") else "" 

121 return sep.join(tokens) 

122 else: 

123 attr = "_key_" + key.replace(idx, "X") 

124 idx = int(idx) 

125 if hasattr(self, attr): 

126 return getattr(self, attr)(idx) 

127 raise KeyError(key) 

128 

129 

130_prefix_dict = { 

131 Index: "ix", 

132 PrimaryKeyConstraint: "pk", 

133 CheckConstraint: "ck", 

134 UniqueConstraint: "uq", 

135 ForeignKeyConstraint: "fk", 

136} 

137 

138 

139def _get_convention(dict_, key): 

140 

141 for super_ in key.__mro__: 

142 if super_ in _prefix_dict and _prefix_dict[super_] in dict_: 

143 return dict_[_prefix_dict[super_]] 

144 elif super_ in dict_: 

145 return dict_[super_] 

146 else: 

147 return None 

148 

149 

150def _constraint_name_for_table(const, table): 

151 metadata = table.metadata 

152 convention = _get_convention(metadata.naming_convention, type(const)) 

153 

154 if isinstance(const.name, conv): 

155 return const.name 

156 elif ( 

157 convention is not None 

158 and not isinstance(const.name, conv) 

159 and ( 

160 const.name is None 

161 or "constraint_name" in convention 

162 or const.name is _NONE_NAME 

163 ) 

164 ): 

165 return conv( 

166 convention 

167 % ConventionDict(const, table, metadata.naming_convention) 

168 ) 

169 elif convention is _NONE_NAME: 

170 return None 

171 

172 

173@event.listens_for( 

174 PrimaryKeyConstraint, "_sa_event_column_added_to_pk_constraint" 

175) 

176def _column_added_to_pk_constraint(pk_constraint, col): 

177 if pk_constraint._implicit_generated: 

178 # only operate upon the "implicit" pk constraint for now, 

179 # as we have to force the name to None to reset it. the 

180 # "implicit" constraint will only have a naming convention name 

181 # if at all. 

182 table = pk_constraint.table 

183 pk_constraint.name = None 

184 newname = _constraint_name_for_table(pk_constraint, table) 

185 if newname: 

186 pk_constraint.name = newname 

187 

188 

189@event.listens_for(Constraint, "after_parent_attach") 

190@event.listens_for(Index, "after_parent_attach") 

191def _constraint_name(const, table): 

192 if isinstance(table, Column): 

193 # this path occurs for a CheckConstraint linked to a Column 

194 

195 # for column-attached constraint, set another event 

196 # to link the column attached to the table as this constraint 

197 # associated with the table. 

198 event.listen( 

199 table, 

200 "after_parent_attach", 

201 lambda col, table: _constraint_name(const, table), 

202 ) 

203 

204 elif isinstance(table, Table): 

205 if isinstance(const.name, conv) or const.name is _NONE_NAME: 

206 return 

207 

208 newname = _constraint_name_for_table(const, table) 

209 if newname: 

210 const.name = newname