Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/naming.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

123 statements  

1# sql/naming.py 

2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Establish constraint and index naming conventions. 

10 

11 

12""" 

13 

14from __future__ import annotations 

15 

16import re 

17 

18from . import events # noqa 

19from .base import _NONE_NAME 

20from .elements import conv as conv 

21from .schema import CheckConstraint 

22from .schema import Column 

23from .schema import Constraint 

24from .schema import ForeignKeyConstraint 

25from .schema import Index 

26from .schema import PrimaryKeyConstraint 

27from .schema import Table 

28from .schema import UniqueConstraint 

29from .. import event 

30from .. import exc 

31 

32 

33class ConventionDict: 

34 def __init__(self, const, table, convention): 

35 self.const = const 

36 self._is_fk = isinstance(const, ForeignKeyConstraint) 

37 self.table = table 

38 self.convention = convention 

39 self._const_name = const.name 

40 

41 def _key_table_name(self): 

42 return self.table.name 

43 

44 def _column_X(self, idx, attrname): 

45 if self._is_fk: 

46 try: 

47 fk = self.const.elements[idx] 

48 except IndexError: 

49 return "" 

50 else: 

51 return getattr(fk.parent, attrname) 

52 else: 

53 cols = list(self.const.columns) 

54 try: 

55 col = cols[idx] 

56 except IndexError: 

57 return "" 

58 else: 

59 return getattr(col, attrname) 

60 

61 def _key_constraint_name(self): 

62 if self._const_name in (None, _NONE_NAME): 

63 raise exc.InvalidRequestError( 

64 "Naming convention including " 

65 "%(constraint_name)s token requires that " 

66 "constraint is explicitly named." 

67 ) 

68 if not isinstance(self._const_name, conv): 

69 self.const.name = None 

70 return self._const_name 

71 

72 def _key_column_X_key(self, idx): 

73 # note this method was missing before 

74 # [ticket:3989], meaning tokens like ``%(column_0_key)s`` weren't 

75 # working even though documented. 

76 return self._column_X(idx, "key") 

77 

78 def _key_column_X_name(self, idx): 

79 return self._column_X(idx, "name") 

80 

81 def _key_column_X_label(self, idx): 

82 return self._column_X(idx, "_ddl_label") 

83 

84 def _key_referred_table_name(self): 

85 fk = self.const.elements[0] 

86 refs = fk.target_fullname.split(".") 

87 if len(refs) == 3: 

88 refschema, reftable, refcol = refs 

89 else: 

90 reftable, refcol = refs 

91 return reftable 

92 

93 def _key_referred_column_X_name(self, idx): 

94 fk = self.const.elements[idx] 

95 # note that before [ticket:3989], this method was returning 

96 # the specification for the :class:`.ForeignKey` itself, which normally 

97 # would be using the ``.key`` of the column, not the name. 

98 return fk.column.name 

99 

100 def __getitem__(self, key): 

101 if key in self.convention: 

102 return self.convention[key](self.const, self.table) 

103 elif hasattr(self, "_key_%s" % key): 

104 return getattr(self, "_key_%s" % key)() 

105 else: 

106 col_template = re.match(r".*_?column_(\d+)(_?N)?_.+", key) 

107 if col_template: 

108 idx = col_template.group(1) 

109 multiples = col_template.group(2) 

110 

111 if multiples: 

112 if self._is_fk: 

113 elems = self.const.elements 

114 else: 

115 elems = list(self.const.columns) 

116 tokens = [] 

117 for idx, elem in enumerate(elems): 

118 attr = "_key_" + key.replace("0" + multiples, "X") 

119 try: 

120 tokens.append(getattr(self, attr)(idx)) 

121 except AttributeError: 

122 raise KeyError(key) 

123 sep = "_" if multiples.startswith("_") else "" 

124 return sep.join(tokens) 

125 else: 

126 attr = "_key_" + key.replace(idx, "X") 

127 idx = int(idx) 

128 if hasattr(self, attr): 

129 return getattr(self, attr)(idx) 

130 raise KeyError(key) 

131 

132 

133_prefix_dict = { 

134 Index: "ix", 

135 PrimaryKeyConstraint: "pk", 

136 CheckConstraint: "ck", 

137 UniqueConstraint: "uq", 

138 ForeignKeyConstraint: "fk", 

139} 

140 

141 

142def _get_convention(dict_, key): 

143 for super_ in key.__mro__: 

144 if super_ in _prefix_dict and _prefix_dict[super_] in dict_: 

145 return dict_[_prefix_dict[super_]] 

146 elif super_ in dict_: 

147 return dict_[super_] 

148 else: 

149 return None 

150 

151 

152def _constraint_name_for_table(const, table): 

153 metadata = table.metadata 

154 convention = _get_convention(metadata.naming_convention, type(const)) 

155 

156 if isinstance(const.name, conv): 

157 return const.name 

158 elif ( 

159 convention is not None 

160 and not isinstance(const.name, conv) 

161 and ( 

162 const.name is None 

163 or "constraint_name" in convention 

164 or const.name is _NONE_NAME 

165 ) 

166 ): 

167 return conv( 

168 convention 

169 % ConventionDict(const, table, metadata.naming_convention) 

170 ) 

171 elif convention is _NONE_NAME: 

172 return None 

173 

174 

175@event.listens_for( 

176 PrimaryKeyConstraint, "_sa_event_column_added_to_pk_constraint" 

177) 

178def _column_added_to_pk_constraint(pk_constraint, col): 

179 if pk_constraint._implicit_generated: 

180 # only operate upon the "implicit" pk constraint for now, 

181 # as we have to force the name to None to reset it. the 

182 # "implicit" constraint will only have a naming convention name 

183 # if at all. 

184 table = pk_constraint.table 

185 pk_constraint.name = None 

186 newname = _constraint_name_for_table(pk_constraint, table) 

187 if newname: 

188 pk_constraint.name = newname 

189 

190 

191@event.listens_for(Constraint, "after_parent_attach") 

192@event.listens_for(Index, "after_parent_attach") 

193def _constraint_name(const, table): 

194 if isinstance(table, Column): 

195 # this path occurs for a CheckConstraint linked to a Column 

196 

197 # for column-attached constraint, set another event 

198 # to link the column attached to the table as this constraint 

199 # associated with the table. 

200 event.listen( 

201 table, 

202 "after_parent_attach", 

203 lambda col, table: _constraint_name(const, table), 

204 ) 

205 

206 elif isinstance(table, Table): 

207 if isinstance(const.name, conv) or const.name is _NONE_NAME: 

208 return 

209 

210 newname = _constraint_name_for_table(const, table) 

211 if newname: 

212 const.name = newname