1# sql/naming.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Establish constraint and index naming conventions.
10
11
12"""
13
14from __future__ import annotations
15
16import re
17
18from . import events # noqa
19from .base import _NONE_NAME
20from .elements import conv as conv
21from .schema import CheckConstraint
22from .schema import Column
23from .schema import Constraint
24from .schema import ForeignKeyConstraint
25from .schema import Index
26from .schema import PrimaryKeyConstraint
27from .schema import Table
28from .schema import UniqueConstraint
29from .. import event
30from .. import exc
31
32
33class ConventionDict:
34 def __init__(self, const, table, convention):
35 self.const = const
36 self._is_fk = isinstance(const, ForeignKeyConstraint)
37 self.table = table
38 self.convention = convention
39 self._const_name = const.name
40
41 def _key_table_name(self):
42 return self.table.name
43
44 def _column_X(self, idx, attrname):
45 if self._is_fk:
46 try:
47 fk = self.const.elements[idx]
48 except IndexError:
49 return ""
50 else:
51 return getattr(fk.parent, attrname)
52 else:
53 cols = list(self.const.columns)
54 try:
55 col = cols[idx]
56 except IndexError:
57 return ""
58 else:
59 return getattr(col, attrname)
60
61 def _key_constraint_name(self):
62 if self._const_name in (None, _NONE_NAME):
63 raise exc.InvalidRequestError(
64 "Naming convention including "
65 "%(constraint_name)s token requires that "
66 "constraint is explicitly named."
67 )
68 if not isinstance(self._const_name, conv):
69 self.const.name = None
70 return self._const_name
71
72 def _key_column_X_key(self, idx):
73 # note this method was missing before
74 # [ticket:3989], meaning tokens like ``%(column_0_key)s`` weren't
75 # working even though documented.
76 return self._column_X(idx, "key")
77
78 def _key_column_X_name(self, idx):
79 return self._column_X(idx, "name")
80
81 def _key_column_X_label(self, idx):
82 return self._column_X(idx, "_ddl_label")
83
84 def _key_referred_table_name(self):
85 fk = self.const.elements[0]
86 refs = fk.target_fullname.split(".")
87 if len(refs) == 3:
88 refschema, reftable, refcol = refs
89 else:
90 reftable, refcol = refs
91 return reftable
92
93 def _key_referred_column_X_name(self, idx):
94 fk = self.const.elements[idx]
95 # note that before [ticket:3989], this method was returning
96 # the specification for the :class:`.ForeignKey` itself, which normally
97 # would be using the ``.key`` of the column, not the name.
98 return fk.column.name
99
100 def __getitem__(self, key):
101 if key in self.convention:
102 return self.convention[key](self.const, self.table)
103 elif hasattr(self, "_key_%s" % key):
104 return getattr(self, "_key_%s" % key)()
105 else:
106 col_template = re.match(r".*_?column_(\d+)(_?N)?_.+", key)
107 if col_template:
108 idx = col_template.group(1)
109 multiples = col_template.group(2)
110
111 if multiples:
112 if self._is_fk:
113 elems = self.const.elements
114 else:
115 elems = list(self.const.columns)
116 tokens = []
117 for idx, elem in enumerate(elems):
118 attr = "_key_" + key.replace("0" + multiples, "X")
119 try:
120 tokens.append(getattr(self, attr)(idx))
121 except AttributeError:
122 raise KeyError(key)
123 sep = "_" if multiples.startswith("_") else ""
124 return sep.join(tokens)
125 else:
126 attr = "_key_" + key.replace(idx, "X")
127 idx = int(idx)
128 if hasattr(self, attr):
129 return getattr(self, attr)(idx)
130 raise KeyError(key)
131
132
133_prefix_dict = {
134 Index: "ix",
135 PrimaryKeyConstraint: "pk",
136 CheckConstraint: "ck",
137 UniqueConstraint: "uq",
138 ForeignKeyConstraint: "fk",
139}
140
141
142def _get_convention(dict_, key):
143 for super_ in key.__mro__:
144 if super_ in _prefix_dict and _prefix_dict[super_] in dict_:
145 return dict_[_prefix_dict[super_]]
146 elif super_ in dict_:
147 return dict_[super_]
148 else:
149 return None
150
151
152def _constraint_name_for_table(const, table):
153 metadata = table.metadata
154 convention = _get_convention(metadata.naming_convention, type(const))
155
156 if isinstance(const.name, conv):
157 return const.name
158 elif (
159 convention is not None
160 and not isinstance(const.name, conv)
161 and (
162 const.name is None
163 or "constraint_name" in convention
164 or const.name is _NONE_NAME
165 )
166 ):
167 return conv(
168 convention
169 % ConventionDict(const, table, metadata.naming_convention)
170 )
171 elif convention is _NONE_NAME:
172 return None
173
174
175@event.listens_for(
176 PrimaryKeyConstraint, "_sa_event_column_added_to_pk_constraint"
177)
178def _column_added_to_pk_constraint(pk_constraint, col):
179 if pk_constraint._implicit_generated:
180 # only operate upon the "implicit" pk constraint for now,
181 # as we have to force the name to None to reset it. the
182 # "implicit" constraint will only have a naming convention name
183 # if at all.
184 table = pk_constraint.table
185 pk_constraint.name = None
186 newname = _constraint_name_for_table(pk_constraint, table)
187 if newname:
188 pk_constraint.name = newname
189
190
191@event.listens_for(Constraint, "after_parent_attach")
192@event.listens_for(Index, "after_parent_attach")
193def _constraint_name(const, table):
194 if isinstance(table, Column):
195 # this path occurs for a CheckConstraint linked to a Column
196
197 # for column-attached constraint, set another event
198 # to link the column attached to the table as this constraint
199 # associated with the table.
200 event.listen(
201 table,
202 "after_parent_attach",
203 lambda col, table: _constraint_name(const, table),
204 )
205
206 elif isinstance(table, Table):
207 if isinstance(const.name, conv) or const.name is _NONE_NAME:
208 return
209
210 newname = _constraint_name_for_table(const, table)
211 if newname:
212 const.name = newname