1# sql/ddl.py
2# Copyright (C) 2009-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: allow-untyped-defs, allow-untyped-calls
8
9"""
10Provides the hierarchy of DDL-defining schema items as well as routines
11to invoke them for a create/drop call.
12
13"""
14from __future__ import annotations
15
16import contextlib
17import typing
18from typing import Any
19from typing import Callable
20from typing import Generic
21from typing import Iterable
22from typing import List
23from typing import Optional
24from typing import Protocol
25from typing import Sequence as typing_Sequence
26from typing import Tuple
27from typing import TypeVar
28from typing import Union
29
30from . import roles
31from .base import _generative
32from .base import Executable
33from .base import SchemaVisitor
34from .elements import ClauseElement
35from .. import exc
36from .. import util
37from ..util import topological
38from ..util.typing import Self
39
40if typing.TYPE_CHECKING:
41 from .compiler import Compiled
42 from .compiler import DDLCompiler
43 from .elements import BindParameter
44 from .schema import Column
45 from .schema import Constraint
46 from .schema import ForeignKeyConstraint
47 from .schema import Index
48 from .schema import SchemaItem
49 from .schema import Sequence as Sequence # noqa: F401
50 from .schema import Table
51 from .selectable import TableClause
52 from ..engine.base import Connection
53 from ..engine.interfaces import CacheStats
54 from ..engine.interfaces import CompiledCacheType
55 from ..engine.interfaces import Dialect
56 from ..engine.interfaces import SchemaTranslateMapType
57
58_SI = TypeVar("_SI", bound=Union["SchemaItem", str])
59
60
61class BaseDDLElement(ClauseElement):
62 """The root of DDL constructs, including those that are sub-elements
63 within the "create table" and other processes.
64
65 .. versionadded:: 2.0
66
67 """
68
69 _hierarchy_supports_caching = False
70 """disable cache warnings for all _DDLCompiles subclasses. """
71
72 def _compiler(self, dialect, **kw):
73 """Return a compiler appropriate for this ClauseElement, given a
74 Dialect."""
75
76 return dialect.ddl_compiler(dialect, self, **kw)
77
78 def _compile_w_cache(
79 self,
80 dialect: Dialect,
81 *,
82 compiled_cache: Optional[CompiledCacheType],
83 column_keys: List[str],
84 for_executemany: bool = False,
85 schema_translate_map: Optional[SchemaTranslateMapType] = None,
86 **kw: Any,
87 ) -> Tuple[
88 Compiled, Optional[typing_Sequence[BindParameter[Any]]], CacheStats
89 ]:
90 raise NotImplementedError()
91
92
93class DDLIfCallable(Protocol):
94 def __call__(
95 self,
96 ddl: BaseDDLElement,
97 target: Union[SchemaItem, str],
98 bind: Optional[Connection],
99 tables: Optional[List[Table]] = None,
100 state: Optional[Any] = None,
101 *,
102 dialect: Dialect,
103 compiler: Optional[DDLCompiler] = ...,
104 checkfirst: bool,
105 ) -> bool: ...
106
107
108class DDLIf(typing.NamedTuple):
109 dialect: Optional[str]
110 callable_: Optional[DDLIfCallable]
111 state: Optional[Any]
112
113 def _should_execute(
114 self,
115 ddl: BaseDDLElement,
116 target: Union[SchemaItem, str],
117 bind: Optional[Connection],
118 compiler: Optional[DDLCompiler] = None,
119 **kw: Any,
120 ) -> bool:
121 if bind is not None:
122 dialect = bind.dialect
123 elif compiler is not None:
124 dialect = compiler.dialect
125 else:
126 assert False, "compiler or dialect is required"
127
128 if isinstance(self.dialect, str):
129 if self.dialect != dialect.name:
130 return False
131 elif isinstance(self.dialect, (tuple, list, set)):
132 if dialect.name not in self.dialect:
133 return False
134 if self.callable_ is not None and not self.callable_(
135 ddl,
136 target,
137 bind,
138 state=self.state,
139 dialect=dialect,
140 compiler=compiler,
141 **kw,
142 ):
143 return False
144
145 return True
146
147
148class ExecutableDDLElement(roles.DDLRole, Executable, BaseDDLElement):
149 """Base class for standalone executable DDL expression constructs.
150
151 This class is the base for the general purpose :class:`.DDL` class,
152 as well as the various create/drop clause constructs such as
153 :class:`.CreateTable`, :class:`.DropTable`, :class:`.AddConstraint`,
154 etc.
155
156 .. versionchanged:: 2.0 :class:`.ExecutableDDLElement` is renamed from
157 :class:`.DDLElement`, which still exists for backwards compatibility.
158
159 :class:`.ExecutableDDLElement` integrates closely with SQLAlchemy events,
160 introduced in :ref:`event_toplevel`. An instance of one is
161 itself an event receiving callable::
162
163 event.listen(
164 users,
165 "after_create",
166 AddConstraint(constraint).execute_if(dialect="postgresql"),
167 )
168
169 .. seealso::
170
171 :class:`.DDL`
172
173 :class:`.DDLEvents`
174
175 :ref:`event_toplevel`
176
177 :ref:`schema_ddl_sequences`
178
179 """
180
181 _ddl_if: Optional[DDLIf] = None
182 target: Union[SchemaItem, str, None] = None
183
184 def _execute_on_connection(
185 self, connection, distilled_params, execution_options
186 ):
187 return connection._execute_ddl(
188 self, distilled_params, execution_options
189 )
190
191 @_generative
192 def against(self, target: SchemaItem) -> Self:
193 """Return a copy of this :class:`_schema.ExecutableDDLElement` which
194 will include the given target.
195
196 This essentially applies the given item to the ``.target`` attribute of
197 the returned :class:`_schema.ExecutableDDLElement` object. This target
198 is then usable by event handlers and compilation routines in order to
199 provide services such as tokenization of a DDL string in terms of a
200 particular :class:`_schema.Table`.
201
202 When a :class:`_schema.ExecutableDDLElement` object is established as
203 an event handler for the :meth:`_events.DDLEvents.before_create` or
204 :meth:`_events.DDLEvents.after_create` events, and the event then
205 occurs for a given target such as a :class:`_schema.Constraint` or
206 :class:`_schema.Table`, that target is established with a copy of the
207 :class:`_schema.ExecutableDDLElement` object using this method, which
208 then proceeds to the :meth:`_schema.ExecutableDDLElement.execute`
209 method in order to invoke the actual DDL instruction.
210
211 :param target: a :class:`_schema.SchemaItem` that will be the subject
212 of a DDL operation.
213
214 :return: a copy of this :class:`_schema.ExecutableDDLElement` with the
215 ``.target`` attribute assigned to the given
216 :class:`_schema.SchemaItem`.
217
218 .. seealso::
219
220 :class:`_schema.DDL` - uses tokenization against the "target" when
221 processing the DDL string.
222
223 """
224 self.target = target
225 return self
226
227 @_generative
228 def execute_if(
229 self,
230 dialect: Optional[str] = None,
231 callable_: Optional[DDLIfCallable] = None,
232 state: Optional[Any] = None,
233 ) -> Self:
234 r"""Return a callable that will execute this
235 :class:`_ddl.ExecutableDDLElement` conditionally within an event
236 handler.
237
238 Used to provide a wrapper for event listening::
239
240 event.listen(
241 metadata,
242 "before_create",
243 DDL("my_ddl").execute_if(dialect="postgresql"),
244 )
245
246 :param dialect: May be a string or tuple of strings.
247 If a string, it will be compared to the name of the
248 executing database dialect::
249
250 DDL("something").execute_if(dialect="postgresql")
251
252 If a tuple, specifies multiple dialect names::
253
254 DDL("something").execute_if(dialect=("postgresql", "mysql"))
255
256 :param callable\_: A callable, which will be invoked with
257 three positional arguments as well as optional keyword
258 arguments:
259
260 :ddl:
261 This DDL element.
262
263 :target:
264 The :class:`_schema.Table` or :class:`_schema.MetaData`
265 object which is the
266 target of this event. May be None if the DDL is executed
267 explicitly.
268
269 :bind:
270 The :class:`_engine.Connection` being used for DDL execution.
271 May be None if this construct is being created inline within
272 a table, in which case ``compiler`` will be present.
273
274 :tables:
275 Optional keyword argument - a list of Table objects which are to
276 be created/ dropped within a MetaData.create_all() or drop_all()
277 method call.
278
279 :dialect: keyword argument, but always present - the
280 :class:`.Dialect` involved in the operation.
281
282 :compiler: keyword argument. Will be ``None`` for an engine
283 level DDL invocation, but will refer to a :class:`.DDLCompiler`
284 if this DDL element is being created inline within a table.
285
286 :state:
287 Optional keyword argument - will be the ``state`` argument
288 passed to this function.
289
290 :checkfirst:
291 Keyword argument, will be True if the 'checkfirst' flag was
292 set during the call to ``create()``, ``create_all()``,
293 ``drop()``, ``drop_all()``.
294
295 If the callable returns a True value, the DDL statement will be
296 executed.
297
298 :param state: any value which will be passed to the callable\_
299 as the ``state`` keyword argument.
300
301 .. seealso::
302
303 :meth:`.SchemaItem.ddl_if`
304
305 :class:`.DDLEvents`
306
307 :ref:`event_toplevel`
308
309 """
310 self._ddl_if = DDLIf(dialect, callable_, state)
311 return self
312
313 def _should_execute(self, target, bind, **kw):
314 if self._ddl_if is None:
315 return True
316 else:
317 return self._ddl_if._should_execute(self, target, bind, **kw)
318
319 def _invoke_with(self, bind):
320 if self._should_execute(self.target, bind):
321 return bind.execute(self)
322
323 def __call__(self, target, bind, **kw):
324 """Execute the DDL as a ddl_listener."""
325
326 self.against(target)._invoke_with(bind)
327
328 def _generate(self):
329 s = self.__class__.__new__(self.__class__)
330 s.__dict__ = self.__dict__.copy()
331 return s
332
333
334DDLElement = ExecutableDDLElement
335""":class:`.DDLElement` is renamed to :class:`.ExecutableDDLElement`."""
336
337
338class DDL(ExecutableDDLElement):
339 """A literal DDL statement.
340
341 Specifies literal SQL DDL to be executed by the database. DDL objects
342 function as DDL event listeners, and can be subscribed to those events
343 listed in :class:`.DDLEvents`, using either :class:`_schema.Table` or
344 :class:`_schema.MetaData` objects as targets.
345 Basic templating support allows
346 a single DDL instance to handle repetitive tasks for multiple tables.
347
348 Examples::
349
350 from sqlalchemy import event, DDL
351
352 tbl = Table("users", metadata, Column("uid", Integer))
353 event.listen(tbl, "before_create", DDL("DROP TRIGGER users_trigger"))
354
355 spow = DDL("ALTER TABLE %(table)s SET secretpowers TRUE")
356 event.listen(tbl, "after_create", spow.execute_if(dialect="somedb"))
357
358 drop_spow = DDL("ALTER TABLE users SET secretpowers FALSE")
359 connection.execute(drop_spow)
360
361 When operating on Table events, the following ``statement``
362 string substitutions are available:
363
364 .. sourcecode:: text
365
366 %(table)s - the Table name, with any required quoting applied
367 %(schema)s - the schema name, with any required quoting applied
368 %(fullname)s - the Table name including schema, quoted if needed
369
370 The DDL's "context", if any, will be combined with the standard
371 substitutions noted above. Keys present in the context will override
372 the standard substitutions.
373
374 """
375
376 __visit_name__ = "ddl"
377
378 def __init__(self, statement, context=None):
379 """Create a DDL statement.
380
381 :param statement:
382 A string or unicode string to be executed. Statements will be
383 processed with Python's string formatting operator using
384 a fixed set of string substitutions, as well as additional
385 substitutions provided by the optional :paramref:`.DDL.context`
386 parameter.
387
388 A literal '%' in a statement must be escaped as '%%'.
389
390 SQL bind parameters are not available in DDL statements.
391
392 :param context:
393 Optional dictionary, defaults to None. These values will be
394 available for use in string substitutions on the DDL statement.
395
396 .. seealso::
397
398 :class:`.DDLEvents`
399
400 :ref:`event_toplevel`
401
402 """
403
404 if not isinstance(statement, str):
405 raise exc.ArgumentError(
406 "Expected a string or unicode SQL statement, got '%r'"
407 % statement
408 )
409
410 self.statement = statement
411 self.context = context or {}
412
413 def __repr__(self):
414 parts = [repr(self.statement)]
415 if self.context:
416 parts.append(f"context={self.context}")
417
418 return "<%s@%s; %s>" % (
419 type(self).__name__,
420 id(self),
421 ", ".join(parts),
422 )
423
424
425class _CreateDropBase(ExecutableDDLElement, Generic[_SI]):
426 """Base class for DDL constructs that represent CREATE and DROP or
427 equivalents.
428
429 The common theme of _CreateDropBase is a single
430 ``element`` attribute which refers to the element
431 to be created or dropped.
432
433 """
434
435 element: _SI
436
437 def __init__(self, element: _SI) -> None:
438 self.element = self.target = element
439 self._ddl_if = getattr(element, "_ddl_if", None)
440
441 @property
442 def stringify_dialect(self): # type: ignore[override]
443 assert not isinstance(self.element, str)
444 return self.element.create_drop_stringify_dialect
445
446 def _create_rule_disable(self, compiler):
447 """Allow disable of _create_rule using a callable.
448
449 Pass to _create_rule using
450 util.portable_instancemethod(self._create_rule_disable)
451 to retain serializability.
452
453 """
454 return False
455
456
457class _CreateBase(_CreateDropBase[_SI]):
458 def __init__(self, element: _SI, if_not_exists: bool = False) -> None:
459 super().__init__(element)
460 self.if_not_exists = if_not_exists
461
462
463class _DropBase(_CreateDropBase[_SI]):
464 def __init__(self, element: _SI, if_exists: bool = False) -> None:
465 super().__init__(element)
466 self.if_exists = if_exists
467
468
469class CreateSchema(_CreateBase[str]):
470 """Represent a CREATE SCHEMA statement.
471
472 The argument here is the string name of the schema.
473
474 """
475
476 __visit_name__ = "create_schema"
477
478 stringify_dialect = "default"
479
480 def __init__(
481 self,
482 name: str,
483 if_not_exists: bool = False,
484 ) -> None:
485 """Create a new :class:`.CreateSchema` construct."""
486
487 super().__init__(element=name, if_not_exists=if_not_exists)
488
489
490class DropSchema(_DropBase[str]):
491 """Represent a DROP SCHEMA statement.
492
493 The argument here is the string name of the schema.
494
495 """
496
497 __visit_name__ = "drop_schema"
498
499 stringify_dialect = "default"
500
501 def __init__(
502 self,
503 name: str,
504 cascade: bool = False,
505 if_exists: bool = False,
506 ) -> None:
507 """Create a new :class:`.DropSchema` construct."""
508
509 super().__init__(element=name, if_exists=if_exists)
510 self.cascade = cascade
511
512
513class CreateTable(_CreateBase["Table"]):
514 """Represent a CREATE TABLE statement."""
515
516 __visit_name__ = "create_table"
517
518 def __init__(
519 self,
520 element: Table,
521 include_foreign_key_constraints: Optional[
522 typing_Sequence[ForeignKeyConstraint]
523 ] = None,
524 if_not_exists: bool = False,
525 ) -> None:
526 """Create a :class:`.CreateTable` construct.
527
528 :param element: a :class:`_schema.Table` that's the subject
529 of the CREATE
530 :param on: See the description for 'on' in :class:`.DDL`.
531 :param include_foreign_key_constraints: optional sequence of
532 :class:`_schema.ForeignKeyConstraint` objects that will be included
533 inline within the CREATE construct; if omitted, all foreign key
534 constraints that do not specify use_alter=True are included.
535
536 :param if_not_exists: if True, an IF NOT EXISTS operator will be
537 applied to the construct.
538
539 .. versionadded:: 1.4.0b2
540
541 """
542 super().__init__(element, if_not_exists=if_not_exists)
543 self.columns = [CreateColumn(column) for column in element.columns]
544 self.include_foreign_key_constraints = include_foreign_key_constraints
545
546
547class _DropView(_DropBase["Table"]):
548 """Semi-public 'DROP VIEW' construct.
549
550 Used by the test suite for dialect-agnostic drops of views.
551 This object will eventually be part of a public "view" API.
552
553 """
554
555 __visit_name__ = "drop_view"
556
557
558class CreateConstraint(BaseDDLElement):
559 element: Constraint
560
561 def __init__(self, element: Constraint) -> None:
562 self.element = element
563
564
565class CreateColumn(BaseDDLElement):
566 """Represent a :class:`_schema.Column`
567 as rendered in a CREATE TABLE statement,
568 via the :class:`.CreateTable` construct.
569
570 This is provided to support custom column DDL within the generation
571 of CREATE TABLE statements, by using the
572 compiler extension documented in :ref:`sqlalchemy.ext.compiler_toplevel`
573 to extend :class:`.CreateColumn`.
574
575 Typical integration is to examine the incoming :class:`_schema.Column`
576 object, and to redirect compilation if a particular flag or condition
577 is found::
578
579 from sqlalchemy import schema
580 from sqlalchemy.ext.compiler import compiles
581
582
583 @compiles(schema.CreateColumn)
584 def compile(element, compiler, **kw):
585 column = element.element
586
587 if "special" not in column.info:
588 return compiler.visit_create_column(element, **kw)
589
590 text = "%s SPECIAL DIRECTIVE %s" % (
591 column.name,
592 compiler.type_compiler.process(column.type),
593 )
594 default = compiler.get_column_default_string(column)
595 if default is not None:
596 text += " DEFAULT " + default
597
598 if not column.nullable:
599 text += " NOT NULL"
600
601 if column.constraints:
602 text += " ".join(
603 compiler.process(const) for const in column.constraints
604 )
605 return text
606
607 The above construct can be applied to a :class:`_schema.Table`
608 as follows::
609
610 from sqlalchemy import Table, Metadata, Column, Integer, String
611 from sqlalchemy import schema
612
613 metadata = MetaData()
614
615 table = Table(
616 "mytable",
617 MetaData(),
618 Column("x", Integer, info={"special": True}, primary_key=True),
619 Column("y", String(50)),
620 Column("z", String(20), info={"special": True}),
621 )
622
623 metadata.create_all(conn)
624
625 Above, the directives we've added to the :attr:`_schema.Column.info`
626 collection
627 will be detected by our custom compilation scheme:
628
629 .. sourcecode:: sql
630
631 CREATE TABLE mytable (
632 x SPECIAL DIRECTIVE INTEGER NOT NULL,
633 y VARCHAR(50),
634 z SPECIAL DIRECTIVE VARCHAR(20),
635 PRIMARY KEY (x)
636 )
637
638 The :class:`.CreateColumn` construct can also be used to skip certain
639 columns when producing a ``CREATE TABLE``. This is accomplished by
640 creating a compilation rule that conditionally returns ``None``.
641 This is essentially how to produce the same effect as using the
642 ``system=True`` argument on :class:`_schema.Column`, which marks a column
643 as an implicitly-present "system" column.
644
645 For example, suppose we wish to produce a :class:`_schema.Table`
646 which skips
647 rendering of the PostgreSQL ``xmin`` column against the PostgreSQL
648 backend, but on other backends does render it, in anticipation of a
649 triggered rule. A conditional compilation rule could skip this name only
650 on PostgreSQL::
651
652 from sqlalchemy.schema import CreateColumn
653
654
655 @compiles(CreateColumn, "postgresql")
656 def skip_xmin(element, compiler, **kw):
657 if element.element.name == "xmin":
658 return None
659 else:
660 return compiler.visit_create_column(element, **kw)
661
662
663 my_table = Table(
664 "mytable",
665 metadata,
666 Column("id", Integer, primary_key=True),
667 Column("xmin", Integer),
668 )
669
670 Above, a :class:`.CreateTable` construct will generate a ``CREATE TABLE``
671 which only includes the ``id`` column in the string; the ``xmin`` column
672 will be omitted, but only against the PostgreSQL backend.
673
674 """
675
676 __visit_name__ = "create_column"
677
678 element: Column[Any]
679
680 def __init__(self, element: Column[Any]) -> None:
681 self.element = element
682
683
684class DropTable(_DropBase["Table"]):
685 """Represent a DROP TABLE statement."""
686
687 __visit_name__ = "drop_table"
688
689 def __init__(self, element: Table, if_exists: bool = False) -> None:
690 """Create a :class:`.DropTable` construct.
691
692 :param element: a :class:`_schema.Table` that's the subject
693 of the DROP.
694 :param on: See the description for 'on' in :class:`.DDL`.
695 :param if_exists: if True, an IF EXISTS operator will be applied to the
696 construct.
697
698 .. versionadded:: 1.4.0b2
699
700 """
701 super().__init__(element, if_exists=if_exists)
702
703
704class CreateSequence(_CreateBase["Sequence"]):
705 """Represent a CREATE SEQUENCE statement."""
706
707 __visit_name__ = "create_sequence"
708
709
710class DropSequence(_DropBase["Sequence"]):
711 """Represent a DROP SEQUENCE statement."""
712
713 __visit_name__ = "drop_sequence"
714
715
716class CreateIndex(_CreateBase["Index"]):
717 """Represent a CREATE INDEX statement."""
718
719 __visit_name__ = "create_index"
720
721 def __init__(self, element: Index, if_not_exists: bool = False) -> None:
722 """Create a :class:`.Createindex` construct.
723
724 :param element: a :class:`_schema.Index` that's the subject
725 of the CREATE.
726 :param if_not_exists: if True, an IF NOT EXISTS operator will be
727 applied to the construct.
728
729 .. versionadded:: 1.4.0b2
730
731 """
732 super().__init__(element, if_not_exists=if_not_exists)
733
734
735class DropIndex(_DropBase["Index"]):
736 """Represent a DROP INDEX statement."""
737
738 __visit_name__ = "drop_index"
739
740 def __init__(self, element: Index, if_exists: bool = False) -> None:
741 """Create a :class:`.DropIndex` construct.
742
743 :param element: a :class:`_schema.Index` that's the subject
744 of the DROP.
745 :param if_exists: if True, an IF EXISTS operator will be applied to the
746 construct.
747
748 .. versionadded:: 1.4.0b2
749
750 """
751 super().__init__(element, if_exists=if_exists)
752
753
754class AddConstraint(_CreateBase["Constraint"]):
755 """Represent an ALTER TABLE ADD CONSTRAINT statement."""
756
757 __visit_name__ = "add_constraint"
758
759 def __init__(
760 self,
761 element: Constraint,
762 *,
763 isolate_from_table: bool = True,
764 ) -> None:
765 """Construct a new :class:`.AddConstraint` construct.
766
767 :param element: a :class:`.Constraint` object
768
769 :param isolate_from_table: optional boolean, defaults to True. Has
770 the effect of the incoming constraint being isolated from being
771 included in a CREATE TABLE sequence when associated with a
772 :class:`.Table`.
773
774 .. versionadded:: 2.0.39 - added
775 :paramref:`.AddConstraint.isolate_from_table`, defaulting
776 to True. Previously, the behavior of this parameter was implicitly
777 turned on in all cases.
778
779 """
780 super().__init__(element)
781
782 if isolate_from_table:
783 element._create_rule = self._create_rule_disable
784
785
786class DropConstraint(_DropBase["Constraint"]):
787 """Represent an ALTER TABLE DROP CONSTRAINT statement."""
788
789 __visit_name__ = "drop_constraint"
790
791 def __init__(
792 self,
793 element: Constraint,
794 *,
795 cascade: bool = False,
796 if_exists: bool = False,
797 isolate_from_table: bool = True,
798 **kw: Any,
799 ) -> None:
800 """Construct a new :class:`.DropConstraint` construct.
801
802 :param element: a :class:`.Constraint` object
803 :param cascade: optional boolean, indicates backend-specific
804 "CASCADE CONSTRAINT" directive should be rendered if available
805 :param if_exists: optional boolean, indicates backend-specific
806 "IF EXISTS" directive should be rendered if available
807 :param isolate_from_table: optional boolean, defaults to True. Has
808 the effect of the incoming constraint being isolated from being
809 included in a CREATE TABLE sequence when associated with a
810 :class:`.Table`.
811
812 .. versionadded:: 2.0.39 - added
813 :paramref:`.DropConstraint.isolate_from_table`, defaulting
814 to True. Previously, the behavior of this parameter was implicitly
815 turned on in all cases.
816
817 """
818 self.cascade = cascade
819 super().__init__(element, if_exists=if_exists, **kw)
820
821 if isolate_from_table:
822 element._create_rule = self._create_rule_disable
823
824
825class SetTableComment(_CreateDropBase["Table"]):
826 """Represent a COMMENT ON TABLE IS statement."""
827
828 __visit_name__ = "set_table_comment"
829
830
831class DropTableComment(_CreateDropBase["Table"]):
832 """Represent a COMMENT ON TABLE '' statement.
833
834 Note this varies a lot across database backends.
835
836 """
837
838 __visit_name__ = "drop_table_comment"
839
840
841class SetColumnComment(_CreateDropBase["Column[Any]"]):
842 """Represent a COMMENT ON COLUMN IS statement."""
843
844 __visit_name__ = "set_column_comment"
845
846
847class DropColumnComment(_CreateDropBase["Column[Any]"]):
848 """Represent a COMMENT ON COLUMN IS NULL statement."""
849
850 __visit_name__ = "drop_column_comment"
851
852
853class SetConstraintComment(_CreateDropBase["Constraint"]):
854 """Represent a COMMENT ON CONSTRAINT IS statement."""
855
856 __visit_name__ = "set_constraint_comment"
857
858
859class DropConstraintComment(_CreateDropBase["Constraint"]):
860 """Represent a COMMENT ON CONSTRAINT IS NULL statement."""
861
862 __visit_name__ = "drop_constraint_comment"
863
864
865class InvokeDDLBase(SchemaVisitor):
866 def __init__(self, connection, **kw):
867 self.connection = connection
868 assert not kw, f"Unexpected keywords: {kw.keys()}"
869
870 @contextlib.contextmanager
871 def with_ddl_events(self, target, **kw):
872 """helper context manager that will apply appropriate DDL events
873 to a CREATE or DROP operation."""
874
875 raise NotImplementedError()
876
877
878class InvokeCreateDDLBase(InvokeDDLBase):
879 @contextlib.contextmanager
880 def with_ddl_events(self, target, **kw):
881 """helper context manager that will apply appropriate DDL events
882 to a CREATE or DROP operation."""
883
884 target.dispatch.before_create(
885 target, self.connection, _ddl_runner=self, **kw
886 )
887 yield
888 target.dispatch.after_create(
889 target, self.connection, _ddl_runner=self, **kw
890 )
891
892
893class InvokeDropDDLBase(InvokeDDLBase):
894 @contextlib.contextmanager
895 def with_ddl_events(self, target, **kw):
896 """helper context manager that will apply appropriate DDL events
897 to a CREATE or DROP operation."""
898
899 target.dispatch.before_drop(
900 target, self.connection, _ddl_runner=self, **kw
901 )
902 yield
903 target.dispatch.after_drop(
904 target, self.connection, _ddl_runner=self, **kw
905 )
906
907
908class SchemaGenerator(InvokeCreateDDLBase):
909 def __init__(
910 self, dialect, connection, checkfirst=False, tables=None, **kwargs
911 ):
912 super().__init__(connection, **kwargs)
913 self.checkfirst = checkfirst
914 self.tables = tables
915 self.preparer = dialect.identifier_preparer
916 self.dialect = dialect
917 self.memo = {}
918
919 def _can_create_table(self, table):
920 self.dialect.validate_identifier(table.name)
921 effective_schema = self.connection.schema_for_object(table)
922 if effective_schema:
923 self.dialect.validate_identifier(effective_schema)
924 return not self.checkfirst or not self.dialect.has_table(
925 self.connection, table.name, schema=effective_schema
926 )
927
928 def _can_create_index(self, index):
929 effective_schema = self.connection.schema_for_object(index.table)
930 if effective_schema:
931 self.dialect.validate_identifier(effective_schema)
932 return not self.checkfirst or not self.dialect.has_index(
933 self.connection,
934 index.table.name,
935 index.name,
936 schema=effective_schema,
937 )
938
939 def _can_create_sequence(self, sequence):
940 effective_schema = self.connection.schema_for_object(sequence)
941
942 return self.dialect.supports_sequences and (
943 (not self.dialect.sequences_optional or not sequence.optional)
944 and (
945 not self.checkfirst
946 or not self.dialect.has_sequence(
947 self.connection, sequence.name, schema=effective_schema
948 )
949 )
950 )
951
952 def visit_metadata(self, metadata):
953 if self.tables is not None:
954 tables = self.tables
955 else:
956 tables = list(metadata.tables.values())
957
958 collection = sort_tables_and_constraints(
959 [t for t in tables if self._can_create_table(t)]
960 )
961
962 seq_coll = [
963 s
964 for s in metadata._sequences.values()
965 if s.column is None and self._can_create_sequence(s)
966 ]
967
968 event_collection = [t for (t, fks) in collection if t is not None]
969
970 with self.with_ddl_events(
971 metadata,
972 tables=event_collection,
973 checkfirst=self.checkfirst,
974 ):
975 for seq in seq_coll:
976 self.traverse_single(seq, create_ok=True)
977
978 for table, fkcs in collection:
979 if table is not None:
980 self.traverse_single(
981 table,
982 create_ok=True,
983 include_foreign_key_constraints=fkcs,
984 _is_metadata_operation=True,
985 )
986 else:
987 for fkc in fkcs:
988 self.traverse_single(fkc)
989
990 def visit_table(
991 self,
992 table,
993 create_ok=False,
994 include_foreign_key_constraints=None,
995 _is_metadata_operation=False,
996 ):
997 if not create_ok and not self._can_create_table(table):
998 return
999
1000 with self.with_ddl_events(
1001 table,
1002 checkfirst=self.checkfirst,
1003 _is_metadata_operation=_is_metadata_operation,
1004 ):
1005 for column in table.columns:
1006 if column.default is not None:
1007 self.traverse_single(column.default)
1008
1009 if not self.dialect.supports_alter:
1010 # e.g., don't omit any foreign key constraints
1011 include_foreign_key_constraints = None
1012
1013 CreateTable(
1014 table,
1015 include_foreign_key_constraints=(
1016 include_foreign_key_constraints
1017 ),
1018 )._invoke_with(self.connection)
1019
1020 if hasattr(table, "indexes"):
1021 for index in table.indexes:
1022 self.traverse_single(index, create_ok=True)
1023
1024 if (
1025 self.dialect.supports_comments
1026 and not self.dialect.inline_comments
1027 ):
1028 if table.comment is not None:
1029 SetTableComment(table)._invoke_with(self.connection)
1030
1031 for column in table.columns:
1032 if column.comment is not None:
1033 SetColumnComment(column)._invoke_with(self.connection)
1034
1035 if self.dialect.supports_constraint_comments:
1036 for constraint in table.constraints:
1037 if constraint.comment is not None:
1038 self.connection.execute(
1039 SetConstraintComment(constraint)
1040 )
1041
1042 def visit_foreign_key_constraint(self, constraint):
1043 if not self.dialect.supports_alter:
1044 return
1045
1046 with self.with_ddl_events(constraint):
1047 AddConstraint(constraint)._invoke_with(self.connection)
1048
1049 def visit_sequence(self, sequence, create_ok=False):
1050 if not create_ok and not self._can_create_sequence(sequence):
1051 return
1052 with self.with_ddl_events(sequence):
1053 CreateSequence(sequence)._invoke_with(self.connection)
1054
1055 def visit_index(self, index, create_ok=False):
1056 if not create_ok and not self._can_create_index(index):
1057 return
1058 with self.with_ddl_events(index):
1059 CreateIndex(index)._invoke_with(self.connection)
1060
1061
1062class SchemaDropper(InvokeDropDDLBase):
1063 def __init__(
1064 self, dialect, connection, checkfirst=False, tables=None, **kwargs
1065 ):
1066 super().__init__(connection, **kwargs)
1067 self.checkfirst = checkfirst
1068 self.tables = tables
1069 self.preparer = dialect.identifier_preparer
1070 self.dialect = dialect
1071 self.memo = {}
1072
1073 def visit_metadata(self, metadata):
1074 if self.tables is not None:
1075 tables = self.tables
1076 else:
1077 tables = list(metadata.tables.values())
1078
1079 try:
1080 unsorted_tables = [t for t in tables if self._can_drop_table(t)]
1081 collection = list(
1082 reversed(
1083 sort_tables_and_constraints(
1084 unsorted_tables,
1085 filter_fn=lambda constraint: (
1086 False
1087 if not self.dialect.supports_alter
1088 or constraint.name is None
1089 else None
1090 ),
1091 )
1092 )
1093 )
1094 except exc.CircularDependencyError as err2:
1095 if not self.dialect.supports_alter:
1096 util.warn(
1097 "Can't sort tables for DROP; an "
1098 "unresolvable foreign key "
1099 "dependency exists between tables: %s; and backend does "
1100 "not support ALTER. To restore at least a partial sort, "
1101 "apply use_alter=True to ForeignKey and "
1102 "ForeignKeyConstraint "
1103 "objects involved in the cycle to mark these as known "
1104 "cycles that will be ignored."
1105 % (", ".join(sorted([t.fullname for t in err2.cycles])))
1106 )
1107 collection = [(t, ()) for t in unsorted_tables]
1108 else:
1109 raise exc.CircularDependencyError(
1110 err2.args[0],
1111 err2.cycles,
1112 err2.edges,
1113 msg="Can't sort tables for DROP; an "
1114 "unresolvable foreign key "
1115 "dependency exists between tables: %s. Please ensure "
1116 "that the ForeignKey and ForeignKeyConstraint objects "
1117 "involved in the cycle have "
1118 "names so that they can be dropped using "
1119 "DROP CONSTRAINT."
1120 % (", ".join(sorted([t.fullname for t in err2.cycles]))),
1121 ) from err2
1122
1123 seq_coll = [
1124 s
1125 for s in metadata._sequences.values()
1126 if self._can_drop_sequence(s)
1127 ]
1128
1129 event_collection = [t for (t, fks) in collection if t is not None]
1130
1131 with self.with_ddl_events(
1132 metadata,
1133 tables=event_collection,
1134 checkfirst=self.checkfirst,
1135 ):
1136 for table, fkcs in collection:
1137 if table is not None:
1138 self.traverse_single(
1139 table,
1140 drop_ok=True,
1141 _is_metadata_operation=True,
1142 _ignore_sequences=seq_coll,
1143 )
1144 else:
1145 for fkc in fkcs:
1146 self.traverse_single(fkc)
1147
1148 for seq in seq_coll:
1149 self.traverse_single(seq, drop_ok=seq.column is None)
1150
1151 def _can_drop_table(self, table):
1152 self.dialect.validate_identifier(table.name)
1153 effective_schema = self.connection.schema_for_object(table)
1154 if effective_schema:
1155 self.dialect.validate_identifier(effective_schema)
1156 return not self.checkfirst or self.dialect.has_table(
1157 self.connection, table.name, schema=effective_schema
1158 )
1159
1160 def _can_drop_index(self, index):
1161 effective_schema = self.connection.schema_for_object(index.table)
1162 if effective_schema:
1163 self.dialect.validate_identifier(effective_schema)
1164 return not self.checkfirst or self.dialect.has_index(
1165 self.connection,
1166 index.table.name,
1167 index.name,
1168 schema=effective_schema,
1169 )
1170
1171 def _can_drop_sequence(self, sequence):
1172 effective_schema = self.connection.schema_for_object(sequence)
1173 return self.dialect.supports_sequences and (
1174 (not self.dialect.sequences_optional or not sequence.optional)
1175 and (
1176 not self.checkfirst
1177 or self.dialect.has_sequence(
1178 self.connection, sequence.name, schema=effective_schema
1179 )
1180 )
1181 )
1182
1183 def visit_index(self, index, drop_ok=False):
1184 if not drop_ok and not self._can_drop_index(index):
1185 return
1186
1187 with self.with_ddl_events(index):
1188 DropIndex(index)(index, self.connection)
1189
1190 def visit_table(
1191 self,
1192 table,
1193 drop_ok=False,
1194 _is_metadata_operation=False,
1195 _ignore_sequences=(),
1196 ):
1197 if not drop_ok and not self._can_drop_table(table):
1198 return
1199
1200 with self.with_ddl_events(
1201 table,
1202 checkfirst=self.checkfirst,
1203 _is_metadata_operation=_is_metadata_operation,
1204 ):
1205 DropTable(table)._invoke_with(self.connection)
1206
1207 # traverse client side defaults which may refer to server-side
1208 # sequences. noting that some of these client side defaults may
1209 # also be set up as server side defaults
1210 # (see https://docs.sqlalchemy.org/en/
1211 # latest/core/defaults.html
1212 # #associating-a-sequence-as-the-server-side-
1213 # default), so have to be dropped after the table is dropped.
1214 for column in table.columns:
1215 if (
1216 column.default is not None
1217 and column.default not in _ignore_sequences
1218 ):
1219 self.traverse_single(column.default)
1220
1221 def visit_foreign_key_constraint(self, constraint):
1222 if not self.dialect.supports_alter:
1223 return
1224 with self.with_ddl_events(constraint):
1225 DropConstraint(constraint)._invoke_with(self.connection)
1226
1227 def visit_sequence(self, sequence, drop_ok=False):
1228 if not drop_ok and not self._can_drop_sequence(sequence):
1229 return
1230 with self.with_ddl_events(sequence):
1231 DropSequence(sequence)._invoke_with(self.connection)
1232
1233
1234def sort_tables(
1235 tables: Iterable[TableClause],
1236 skip_fn: Optional[Callable[[ForeignKeyConstraint], bool]] = None,
1237 extra_dependencies: Optional[
1238 typing_Sequence[Tuple[TableClause, TableClause]]
1239 ] = None,
1240) -> List[Table]:
1241 """Sort a collection of :class:`_schema.Table` objects based on
1242 dependency.
1243
1244 This is a dependency-ordered sort which will emit :class:`_schema.Table`
1245 objects such that they will follow their dependent :class:`_schema.Table`
1246 objects.
1247 Tables are dependent on another based on the presence of
1248 :class:`_schema.ForeignKeyConstraint`
1249 objects as well as explicit dependencies
1250 added by :meth:`_schema.Table.add_is_dependent_on`.
1251
1252 .. warning::
1253
1254 The :func:`._schema.sort_tables` function cannot by itself
1255 accommodate automatic resolution of dependency cycles between
1256 tables, which are usually caused by mutually dependent foreign key
1257 constraints. When these cycles are detected, the foreign keys
1258 of these tables are omitted from consideration in the sort.
1259 A warning is emitted when this condition occurs, which will be an
1260 exception raise in a future release. Tables which are not part
1261 of the cycle will still be returned in dependency order.
1262
1263 To resolve these cycles, the
1264 :paramref:`_schema.ForeignKeyConstraint.use_alter` parameter may be
1265 applied to those constraints which create a cycle. Alternatively,
1266 the :func:`_schema.sort_tables_and_constraints` function will
1267 automatically return foreign key constraints in a separate
1268 collection when cycles are detected so that they may be applied
1269 to a schema separately.
1270
1271 :param tables: a sequence of :class:`_schema.Table` objects.
1272
1273 :param skip_fn: optional callable which will be passed a
1274 :class:`_schema.ForeignKeyConstraint` object; if it returns True, this
1275 constraint will not be considered as a dependency. Note this is
1276 **different** from the same parameter in
1277 :func:`.sort_tables_and_constraints`, which is
1278 instead passed the owning :class:`_schema.ForeignKeyConstraint` object.
1279
1280 :param extra_dependencies: a sequence of 2-tuples of tables which will
1281 also be considered as dependent on each other.
1282
1283 .. seealso::
1284
1285 :func:`.sort_tables_and_constraints`
1286
1287 :attr:`_schema.MetaData.sorted_tables` - uses this function to sort
1288
1289
1290 """
1291
1292 if skip_fn is not None:
1293 fixed_skip_fn = skip_fn
1294
1295 def _skip_fn(fkc):
1296 for fk in fkc.elements:
1297 if fixed_skip_fn(fk):
1298 return True
1299 else:
1300 return None
1301
1302 else:
1303 _skip_fn = None # type: ignore
1304
1305 return [
1306 t
1307 for (t, fkcs) in sort_tables_and_constraints(
1308 tables,
1309 filter_fn=_skip_fn,
1310 extra_dependencies=extra_dependencies,
1311 _warn_for_cycles=True,
1312 )
1313 if t is not None
1314 ]
1315
1316
1317def sort_tables_and_constraints(
1318 tables, filter_fn=None, extra_dependencies=None, _warn_for_cycles=False
1319):
1320 """Sort a collection of :class:`_schema.Table` /
1321 :class:`_schema.ForeignKeyConstraint`
1322 objects.
1323
1324 This is a dependency-ordered sort which will emit tuples of
1325 ``(Table, [ForeignKeyConstraint, ...])`` such that each
1326 :class:`_schema.Table` follows its dependent :class:`_schema.Table`
1327 objects.
1328 Remaining :class:`_schema.ForeignKeyConstraint`
1329 objects that are separate due to
1330 dependency rules not satisfied by the sort are emitted afterwards
1331 as ``(None, [ForeignKeyConstraint ...])``.
1332
1333 Tables are dependent on another based on the presence of
1334 :class:`_schema.ForeignKeyConstraint` objects, explicit dependencies
1335 added by :meth:`_schema.Table.add_is_dependent_on`,
1336 as well as dependencies
1337 stated here using the :paramref:`~.sort_tables_and_constraints.skip_fn`
1338 and/or :paramref:`~.sort_tables_and_constraints.extra_dependencies`
1339 parameters.
1340
1341 :param tables: a sequence of :class:`_schema.Table` objects.
1342
1343 :param filter_fn: optional callable which will be passed a
1344 :class:`_schema.ForeignKeyConstraint` object,
1345 and returns a value based on
1346 whether this constraint should definitely be included or excluded as
1347 an inline constraint, or neither. If it returns False, the constraint
1348 will definitely be included as a dependency that cannot be subject
1349 to ALTER; if True, it will **only** be included as an ALTER result at
1350 the end. Returning None means the constraint is included in the
1351 table-based result unless it is detected as part of a dependency cycle.
1352
1353 :param extra_dependencies: a sequence of 2-tuples of tables which will
1354 also be considered as dependent on each other.
1355
1356 .. seealso::
1357
1358 :func:`.sort_tables`
1359
1360
1361 """
1362
1363 fixed_dependencies = set()
1364 mutable_dependencies = set()
1365
1366 if extra_dependencies is not None:
1367 fixed_dependencies.update(extra_dependencies)
1368
1369 remaining_fkcs = set()
1370 for table in tables:
1371 for fkc in table.foreign_key_constraints:
1372 if fkc.use_alter is True:
1373 remaining_fkcs.add(fkc)
1374 continue
1375
1376 if filter_fn:
1377 filtered = filter_fn(fkc)
1378
1379 if filtered is True:
1380 remaining_fkcs.add(fkc)
1381 continue
1382
1383 dependent_on = fkc.referred_table
1384 if dependent_on is not table:
1385 mutable_dependencies.add((dependent_on, table))
1386
1387 fixed_dependencies.update(
1388 (parent, table) for parent in table._extra_dependencies
1389 )
1390
1391 try:
1392 candidate_sort = list(
1393 topological.sort(
1394 fixed_dependencies.union(mutable_dependencies),
1395 tables,
1396 )
1397 )
1398 except exc.CircularDependencyError as err:
1399 if _warn_for_cycles:
1400 util.warn(
1401 "Cannot correctly sort tables; there are unresolvable cycles "
1402 'between tables "%s", which is usually caused by mutually '
1403 "dependent foreign key constraints. Foreign key constraints "
1404 "involving these tables will not be considered; this warning "
1405 "may raise an error in a future release."
1406 % (", ".join(sorted(t.fullname for t in err.cycles)),)
1407 )
1408 for edge in err.edges:
1409 if edge in mutable_dependencies:
1410 table = edge[1]
1411 if table not in err.cycles:
1412 continue
1413 can_remove = [
1414 fkc
1415 for fkc in table.foreign_key_constraints
1416 if filter_fn is None or filter_fn(fkc) is not False
1417 ]
1418 remaining_fkcs.update(can_remove)
1419 for fkc in can_remove:
1420 dependent_on = fkc.referred_table
1421 if dependent_on is not table:
1422 mutable_dependencies.discard((dependent_on, table))
1423 candidate_sort = list(
1424 topological.sort(
1425 fixed_dependencies.union(mutable_dependencies),
1426 tables,
1427 )
1428 )
1429
1430 return [
1431 (table, table.foreign_key_constraints.difference(remaining_fkcs))
1432 for table in candidate_sort
1433 ] + [(None, list(remaining_fkcs))]