Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/naming.py: 44%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

124 statements  

1# sql/naming.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9"""Establish constraint and index naming conventions.""" 

10 

11from __future__ import annotations 

12 

13import re 

14 

15from . import events # noqa 

16from .base import _NONE_NAME 

17from .elements import conv as conv 

18from .schema import CheckConstraint 

19from .schema import Column 

20from .schema import Constraint 

21from .schema import ForeignKeyConstraint 

22from .schema import Index 

23from .schema import PrimaryKeyConstraint 

24from .schema import Table 

25from .schema import UniqueConstraint 

26from .. import event 

27from .. import exc 

28 

29 

30class ConventionDict: 

31 def __init__(self, const, table, convention): 

32 self.const = const 

33 self._is_fk = isinstance(const, ForeignKeyConstraint) 

34 self.table = table 

35 self.convention = convention 

36 self._const_name = const.name 

37 

38 def _key_table_name(self): 

39 return self.table.name 

40 

41 def _column_X(self, idx, attrname): 

42 if self._is_fk: 

43 try: 

44 fk = self.const.elements[idx] 

45 except IndexError: 

46 return "" 

47 else: 

48 return getattr(fk.parent, attrname) 

49 else: 

50 cols = list(self.const.columns) 

51 try: 

52 col = cols[idx] 

53 except IndexError: 

54 return "" 

55 else: 

56 return getattr(col, attrname) 

57 

58 def _key_constraint_name(self): 

59 if self._const_name in (None, _NONE_NAME): 

60 raise exc.InvalidRequestError( 

61 "Naming convention including " 

62 "%(constraint_name)s token requires that " 

63 "constraint is explicitly named." 

64 ) 

65 if not isinstance(self._const_name, conv): 

66 self.const.name = None 

67 return self._const_name 

68 

69 def _key_column_X_key(self, idx): 

70 # note this method was missing before 

71 # [ticket:3989], meaning tokens like ``%(column_0_key)s`` weren't 

72 # working even though documented. 

73 return self._column_X(idx, "key") 

74 

75 def _key_column_X_name(self, idx): 

76 return self._column_X(idx, "name") 

77 

78 def _key_column_X_label(self, idx): 

79 return self._column_X(idx, "_ddl_label") 

80 

81 def _key_referred_table_name(self): 

82 fk = self.const.elements[0] 

83 refs = fk.target_fullname.split(".") 

84 if len(refs) == 3: 

85 refschema, reftable, refcol = refs 

86 else: 

87 reftable, refcol = refs 

88 return reftable 

89 

90 def _key_referred_column_X_name(self, idx): 

91 fk = self.const.elements[idx] 

92 # note that before [ticket:3989], this method was returning 

93 # the specification for the :class:`.ForeignKey` itself, which normally 

94 # would be using the ``.key`` of the column, not the name. 

95 return fk.column.name 

96 

97 def __getitem__(self, key): 

98 if key in self.convention: 

99 return self.convention[key](self.const, self.table) 

100 elif hasattr(self, "_key_%s" % key): 

101 return getattr(self, "_key_%s" % key)() 

102 else: 

103 col_template = re.match(r".*_?column_(\d+)(_?N)?_.+", key) 

104 if col_template: 

105 idx = col_template.group(1) 

106 multiples = col_template.group(2) 

107 

108 if multiples: 

109 if self._is_fk: 

110 elems = self.const.elements 

111 else: 

112 elems = list(self.const.columns) 

113 tokens = [] 

114 for idx, elem in enumerate(elems): 

115 attr = "_key_" + key.replace("0" + multiples, "X") 

116 try: 

117 tokens.append(getattr(self, attr)(idx)) 

118 except AttributeError: 

119 raise KeyError(key) 

120 sep = "_" if multiples.startswith("_") else "" 

121 return sep.join(tokens) 

122 else: 

123 attr = "_key_" + key.replace(idx, "X") 

124 idx = int(idx) 

125 if hasattr(self, attr): 

126 return getattr(self, attr)(idx) 

127 raise KeyError(key) 

128 

129 

130_prefix_dict = { 

131 Index: "ix", 

132 PrimaryKeyConstraint: "pk", 

133 CheckConstraint: "ck", 

134 UniqueConstraint: "uq", 

135 ForeignKeyConstraint: "fk", 

136} 

137 

138 

139def _get_convention(dict_, key): 

140 for super_ in key.__mro__: 

141 if super_ in _prefix_dict and _prefix_dict[super_] in dict_: 

142 return dict_[_prefix_dict[super_]] 

143 elif super_ in dict_: 

144 return dict_[super_] 

145 else: 

146 return None 

147 

148 

149def _constraint_name_for_table(const, table): 

150 metadata = table.metadata 

151 convention = _get_convention(metadata.naming_convention, type(const)) 

152 

153 if isinstance(const.name, conv): 

154 return const.name 

155 elif ( 

156 convention is not None 

157 and not isinstance(const.name, conv) 

158 and ( 

159 const.name is None 

160 or "constraint_name" in convention 

161 or const.name is _NONE_NAME 

162 ) 

163 ): 

164 return conv( 

165 convention 

166 % ConventionDict(const, table, metadata.naming_convention) 

167 ) 

168 elif convention is _NONE_NAME: 

169 return None 

170 

171 

172@event.listens_for( 

173 PrimaryKeyConstraint, "_sa_event_column_added_to_pk_constraint" 

174) 

175def _column_added_to_pk_constraint(pk_constraint, col): 

176 if pk_constraint._implicit_generated: 

177 # only operate upon the "implicit" pk constraint for now, 

178 # as we have to force the name to None to reset it. the 

179 # "implicit" constraint will only have a naming convention name 

180 # if at all. 

181 table = pk_constraint.table 

182 pk_constraint.name = None 

183 newname = _constraint_name_for_table(pk_constraint, table) 

184 if newname: 

185 pk_constraint.name = newname 

186 

187 

188@event.listens_for(Constraint, "after_parent_attach") 

189@event.listens_for(Index, "after_parent_attach") 

190def _constraint_name(const, table): 

191 if isinstance(table, Column): 

192 # this path occurs for a CheckConstraint linked to a Column 

193 

194 # for column-attached constraint, set another event 

195 # to link the column attached to the table as this constraint 

196 # associated with the table. 

197 event.listen( 

198 table, 

199 "after_parent_attach", 

200 lambda col, table: _constraint_name(const, table), 

201 ) 

202 

203 elif isinstance(table, Table): 

204 if isinstance(const.name, conv) or const.name is _NONE_NAME: 

205 return 

206 

207 newname = _constraint_name_for_table(const, table) 

208 if newname: 

209 const.name = newname