Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/sql/naming.py: 80%
122 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# sqlalchemy/naming.py
2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8"""Establish constraint and index naming conventions.
11"""
13import re
15from . import events # noqa
16from .elements import _NONE_NAME
17from .elements import conv
18from .schema import CheckConstraint
19from .schema import Column
20from .schema import Constraint
21from .schema import ForeignKeyConstraint
22from .schema import Index
23from .schema import PrimaryKeyConstraint
24from .schema import Table
25from .schema import UniqueConstraint
26from .. import event
27from .. import exc
30class ConventionDict(object):
31 def __init__(self, const, table, convention):
32 self.const = const
33 self._is_fk = isinstance(const, ForeignKeyConstraint)
34 self.table = table
35 self.convention = convention
36 self._const_name = const.name
38 def _key_table_name(self):
39 return self.table.name
41 def _column_X(self, idx, attrname):
42 if self._is_fk:
43 try:
44 fk = self.const.elements[idx]
45 except IndexError:
46 return ""
47 else:
48 return getattr(fk.parent, attrname)
49 else:
50 cols = list(self.const.columns)
51 try:
52 col = cols[idx]
53 except IndexError:
54 return ""
55 else:
56 return getattr(col, attrname)
58 def _key_constraint_name(self):
59 if self._const_name in (None, _NONE_NAME):
60 raise exc.InvalidRequestError(
61 "Naming convention including "
62 "%(constraint_name)s token requires that "
63 "constraint is explicitly named."
64 )
65 if not isinstance(self._const_name, conv):
66 self.const.name = None
67 return self._const_name
69 def _key_column_X_key(self, idx):
70 # note this method was missing before
71 # [ticket:3989], meaning tokens like ``%(column_0_key)s`` weren't
72 # working even though documented.
73 return self._column_X(idx, "key")
75 def _key_column_X_name(self, idx):
76 return self._column_X(idx, "name")
78 def _key_column_X_label(self, idx):
79 return self._column_X(idx, "_ddl_label")
81 def _key_referred_table_name(self):
82 fk = self.const.elements[0]
83 refs = fk.target_fullname.split(".")
84 if len(refs) == 3:
85 refschema, reftable, refcol = refs
86 else:
87 reftable, refcol = refs
88 return reftable
90 def _key_referred_column_X_name(self, idx):
91 fk = self.const.elements[idx]
92 # note that before [ticket:3989], this method was returning
93 # the specification for the :class:`.ForeignKey` itself, which normally
94 # would be using the ``.key`` of the column, not the name.
95 return fk.column.name
97 def __getitem__(self, key):
98 if key in self.convention:
99 return self.convention[key](self.const, self.table)
100 elif hasattr(self, "_key_%s" % key):
101 return getattr(self, "_key_%s" % key)()
102 else:
103 col_template = re.match(r".*_?column_(\d+)(_?N)?_.+", key)
104 if col_template:
105 idx = col_template.group(1)
106 multiples = col_template.group(2)
108 if multiples:
109 if self._is_fk:
110 elems = self.const.elements
111 else:
112 elems = list(self.const.columns)
113 tokens = []
114 for idx, elem in enumerate(elems):
115 attr = "_key_" + key.replace("0" + multiples, "X")
116 try:
117 tokens.append(getattr(self, attr)(idx))
118 except AttributeError:
119 raise KeyError(key)
120 sep = "_" if multiples.startswith("_") else ""
121 return sep.join(tokens)
122 else:
123 attr = "_key_" + key.replace(idx, "X")
124 idx = int(idx)
125 if hasattr(self, attr):
126 return getattr(self, attr)(idx)
127 raise KeyError(key)
130_prefix_dict = {
131 Index: "ix",
132 PrimaryKeyConstraint: "pk",
133 CheckConstraint: "ck",
134 UniqueConstraint: "uq",
135 ForeignKeyConstraint: "fk",
136}
139def _get_convention(dict_, key):
141 for super_ in key.__mro__:
142 if super_ in _prefix_dict and _prefix_dict[super_] in dict_:
143 return dict_[_prefix_dict[super_]]
144 elif super_ in dict_:
145 return dict_[super_]
146 else:
147 return None
150def _constraint_name_for_table(const, table):
151 metadata = table.metadata
152 convention = _get_convention(metadata.naming_convention, type(const))
154 if isinstance(const.name, conv):
155 return const.name
156 elif (
157 convention is not None
158 and not isinstance(const.name, conv)
159 and (
160 const.name is None
161 or "constraint_name" in convention
162 or const.name is _NONE_NAME
163 )
164 ):
165 return conv(
166 convention
167 % ConventionDict(const, table, metadata.naming_convention)
168 )
169 elif convention is _NONE_NAME:
170 return None
173@event.listens_for(
174 PrimaryKeyConstraint, "_sa_event_column_added_to_pk_constraint"
175)
176def _column_added_to_pk_constraint(pk_constraint, col):
177 if pk_constraint._implicit_generated:
178 # only operate upon the "implicit" pk constraint for now,
179 # as we have to force the name to None to reset it. the
180 # "implicit" constraint will only have a naming convention name
181 # if at all.
182 table = pk_constraint.table
183 pk_constraint.name = None
184 newname = _constraint_name_for_table(pk_constraint, table)
185 if newname:
186 pk_constraint.name = newname
189@event.listens_for(Constraint, "after_parent_attach")
190@event.listens_for(Index, "after_parent_attach")
191def _constraint_name(const, table):
192 if isinstance(table, Column):
193 # this path occurs for a CheckConstraint linked to a Column
195 # for column-attached constraint, set another event
196 # to link the column attached to the table as this constraint
197 # associated with the table.
198 event.listen(
199 table,
200 "after_parent_attach",
201 lambda col, table: _constraint_name(const, table),
202 )
204 elif isinstance(table, Table):
205 if isinstance(const.name, conv) or const.name is _NONE_NAME:
206 return
208 newname = _constraint_name_for_table(const, table)
209 if newname:
210 const.name = newname