1# sql/naming.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""Establish constraint and index naming conventions."""
10
11from __future__ import annotations
12
13import re
14
15from . import events # noqa
16from .base import _NONE_NAME
17from .elements import conv as conv
18from .schema import CheckConstraint
19from .schema import Column
20from .schema import Constraint
21from .schema import ForeignKeyConstraint
22from .schema import Index
23from .schema import PrimaryKeyConstraint
24from .schema import Table
25from .schema import UniqueConstraint
26from .. import event
27from .. import exc
28
29
30class ConventionDict:
31 def __init__(self, const, table, convention):
32 self.const = const
33 self._is_fk = isinstance(const, ForeignKeyConstraint)
34 self.table = table
35 self.convention = convention
36 self._const_name = const.name
37
38 def _key_table_name(self):
39 return self.table.name
40
41 def _column_X(self, idx, attrname):
42 if self._is_fk:
43 try:
44 fk = self.const.elements[idx]
45 except IndexError:
46 return ""
47 else:
48 return getattr(fk.parent, attrname)
49 else:
50 cols = list(self.const.columns)
51 try:
52 col = cols[idx]
53 except IndexError:
54 return ""
55 else:
56 return getattr(col, attrname)
57
58 def _key_constraint_name(self):
59 if self._const_name in (None, _NONE_NAME):
60 raise exc.InvalidRequestError(
61 "Naming convention including "
62 "%(constraint_name)s token requires that "
63 "constraint is explicitly named."
64 )
65 if not isinstance(self._const_name, conv):
66 self.const.name = None
67 return self._const_name
68
69 def _key_column_X_key(self, idx):
70 # note this method was missing before
71 # [ticket:3989], meaning tokens like ``%(column_0_key)s`` weren't
72 # working even though documented.
73 return self._column_X(idx, "key")
74
75 def _key_column_X_name(self, idx):
76 return self._column_X(idx, "name")
77
78 def _key_column_X_label(self, idx):
79 return self._column_X(idx, "_ddl_label")
80
81 def _key_referred_table_name(self):
82 fk = self.const.elements[0]
83 refs = fk.target_fullname.split(".")
84 if len(refs) == 3:
85 refschema, reftable, refcol = refs
86 else:
87 reftable, refcol = refs
88 return reftable
89
90 def _key_referred_column_X_name(self, idx):
91 fk = self.const.elements[idx]
92 # note that before [ticket:3989], this method was returning
93 # the specification for the :class:`.ForeignKey` itself, which normally
94 # would be using the ``.key`` of the column, not the name.
95 return fk.column.name
96
97 def __getitem__(self, key):
98 if key in self.convention:
99 return self.convention[key](self.const, self.table)
100 elif hasattr(self, "_key_%s" % key):
101 return getattr(self, "_key_%s" % key)()
102 else:
103 col_template = re.match(r".*_?column_(\d+)(_?N)?_.+", key)
104 if col_template:
105 idx = col_template.group(1)
106 multiples = col_template.group(2)
107
108 if multiples:
109 if self._is_fk:
110 elems = self.const.elements
111 else:
112 elems = list(self.const.columns)
113 tokens = []
114 for idx, elem in enumerate(elems):
115 attr = "_key_" + key.replace("0" + multiples, "X")
116 try:
117 tokens.append(getattr(self, attr)(idx))
118 except AttributeError:
119 raise KeyError(key)
120 sep = "_" if multiples.startswith("_") else ""
121 return sep.join(tokens)
122 else:
123 attr = "_key_" + key.replace(idx, "X")
124 idx = int(idx)
125 if hasattr(self, attr):
126 return getattr(self, attr)(idx)
127 raise KeyError(key)
128
129
130_prefix_dict = {
131 Index: "ix",
132 PrimaryKeyConstraint: "pk",
133 CheckConstraint: "ck",
134 UniqueConstraint: "uq",
135 ForeignKeyConstraint: "fk",
136}
137
138
139def _get_convention(dict_, key):
140 for super_ in key.__mro__:
141 if super_ in _prefix_dict and _prefix_dict[super_] in dict_:
142 return dict_[_prefix_dict[super_]]
143 elif super_ in dict_:
144 return dict_[super_]
145 else:
146 return None
147
148
149def _constraint_name_for_table(const, table):
150 metadata = table.metadata
151 convention = _get_convention(metadata.naming_convention, type(const))
152
153 if isinstance(const.name, conv):
154 return const.name
155 elif (
156 convention is not None
157 and not isinstance(const.name, conv)
158 and (
159 const.name is None
160 or "constraint_name" in convention
161 or const.name is _NONE_NAME
162 )
163 ):
164 return conv(
165 convention
166 % ConventionDict(const, table, metadata.naming_convention)
167 )
168 elif convention is _NONE_NAME:
169 return None
170
171
172@event.listens_for(
173 PrimaryKeyConstraint, "_sa_event_column_added_to_pk_constraint"
174)
175def _column_added_to_pk_constraint(pk_constraint, col):
176 if pk_constraint._implicit_generated:
177 # only operate upon the "implicit" pk constraint for now,
178 # as we have to force the name to None to reset it. the
179 # "implicit" constraint will only have a naming convention name
180 # if at all.
181 table = pk_constraint.table
182 pk_constraint.name = None
183 newname = _constraint_name_for_table(pk_constraint, table)
184 if newname:
185 pk_constraint.name = newname
186
187
188@event.listens_for(Constraint, "after_parent_attach")
189@event.listens_for(Index, "after_parent_attach")
190def _constraint_name(const, table):
191 if isinstance(table, Column):
192 # this path occurs for a CheckConstraint linked to a Column
193
194 # for column-attached constraint, set another event
195 # to link the column attached to the table as this constraint
196 # associated with the table.
197 event.listen(
198 table,
199 "after_parent_attach",
200 lambda col, table: _constraint_name(const, table),
201 )
202
203 elif isinstance(table, Table):
204 if isinstance(const.name, conv) or const.name is _NONE_NAME:
205 return
206
207 newname = _constraint_name_for_table(const, table)
208 if newname:
209 const.name = newname