Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/sql/ddl.py: 51%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

460 statements  

1# sql/ddl.py 

2# Copyright (C) 2009-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: allow-untyped-defs, allow-untyped-calls 

8 

9""" 

10Provides the hierarchy of DDL-defining schema items as well as routines 

11to invoke them for a create/drop call. 

12 

13""" 

14from __future__ import annotations 

15 

16import contextlib 

17from enum import auto 

18from enum import Flag 

19import typing 

20from typing import Any 

21from typing import Callable 

22from typing import Generic 

23from typing import Iterable 

24from typing import List 

25from typing import Optional 

26from typing import Protocol 

27from typing import Sequence as typing_Sequence 

28from typing import Tuple 

29from typing import TypeVar 

30from typing import Union 

31 

32from . import coercions 

33from . import roles 

34from .base import _generative 

35from .base import Executable 

36from .base import SchemaVisitor 

37from .elements import ClauseElement 

38from .selectable import SelectBase 

39from .selectable import TableClause 

40from .. import exc 

41from .. import util 

42from ..util import topological 

43from ..util.typing import Self 

44 

45if typing.TYPE_CHECKING: 

46 from .compiler import Compiled 

47 from .compiler import DDLCompiler 

48 from .elements import BindParameter 

49 from .schema import Column 

50 from .schema import Constraint 

51 from .schema import ForeignKeyConstraint 

52 from .schema import Index 

53 from .schema import MetaData 

54 from .schema import SchemaItem 

55 from .schema import Sequence as Sequence # noqa: F401 

56 from .schema import Table 

57 from ..engine.base import Connection 

58 from ..engine.interfaces import CacheStats 

59 from ..engine.interfaces import CompiledCacheType 

60 from ..engine.interfaces import Dialect 

61 from ..engine.interfaces import SchemaTranslateMapType 

62 

63_SI = TypeVar("_SI", bound=Union["SchemaItem", str]) 

64 

65 

66class BaseDDLElement(ClauseElement): 

67 """The root of DDL constructs, including those that are sub-elements 

68 within the "create table" and other processes. 

69 

70 .. versionadded:: 2.0 

71 

72 """ 

73 

74 _hierarchy_supports_caching = False 

75 """disable cache warnings for all _DDLCompiles subclasses. """ 

76 

77 def _compiler(self, dialect, **kw): 

78 """Return a compiler appropriate for this ClauseElement, given a 

79 Dialect.""" 

80 

81 return dialect.ddl_compiler(dialect, self, **kw) 

82 

83 def _compile_w_cache( 

84 self, 

85 dialect: Dialect, 

86 *, 

87 compiled_cache: Optional[CompiledCacheType], 

88 column_keys: List[str], 

89 for_executemany: bool = False, 

90 schema_translate_map: Optional[SchemaTranslateMapType] = None, 

91 **kw: Any, 

92 ) -> Tuple[ 

93 Compiled, Optional[typing_Sequence[BindParameter[Any]]], CacheStats 

94 ]: 

95 raise NotImplementedError() 

96 

97 

98class DDLIfCallable(Protocol): 

99 def __call__( 

100 self, 

101 ddl: BaseDDLElement, 

102 target: Union[SchemaItem, str], 

103 bind: Optional[Connection], 

104 tables: Optional[List[Table]] = None, 

105 state: Optional[Any] = None, 

106 *, 

107 dialect: Dialect, 

108 compiler: Optional[DDLCompiler] = ..., 

109 checkfirst: bool, 

110 ) -> bool: ... 

111 

112 

113class DDLIf(typing.NamedTuple): 

114 dialect: Optional[str] 

115 callable_: Optional[DDLIfCallable] 

116 state: Optional[Any] 

117 

118 def _should_execute( 

119 self, 

120 ddl: BaseDDLElement, 

121 target: Union[SchemaItem, str], 

122 bind: Optional[Connection], 

123 compiler: Optional[DDLCompiler] = None, 

124 **kw: Any, 

125 ) -> bool: 

126 if bind is not None: 

127 dialect = bind.dialect 

128 elif compiler is not None: 

129 dialect = compiler.dialect 

130 else: 

131 assert False, "compiler or dialect is required" 

132 

133 if isinstance(self.dialect, str): 

134 if self.dialect != dialect.name: 

135 return False 

136 elif isinstance(self.dialect, (tuple, list, set)): 

137 if dialect.name not in self.dialect: 

138 return False 

139 if self.callable_ is not None and not self.callable_( 

140 ddl, 

141 target, 

142 bind, 

143 state=self.state, 

144 dialect=dialect, 

145 compiler=compiler, 

146 **kw, 

147 ): 

148 return False 

149 

150 return True 

151 

152 

153class ExecutableDDLElement(roles.DDLRole, Executable, BaseDDLElement): 

154 """Base class for standalone executable DDL expression constructs. 

155 

156 This class is the base for the general purpose :class:`.DDL` class, 

157 as well as the various create/drop clause constructs such as 

158 :class:`.CreateTable`, :class:`.DropTable`, :class:`.AddConstraint`, 

159 etc. 

160 

161 .. versionchanged:: 2.0 :class:`.ExecutableDDLElement` is renamed from 

162 :class:`.DDLElement`, which still exists for backwards compatibility. 

163 

164 :class:`.ExecutableDDLElement` integrates closely with SQLAlchemy events, 

165 introduced in :ref:`event_toplevel`. An instance of one is 

166 itself an event receiving callable:: 

167 

168 event.listen( 

169 users, 

170 "after_create", 

171 AddConstraint(constraint).execute_if(dialect="postgresql"), 

172 ) 

173 

174 .. seealso:: 

175 

176 :class:`.DDL` 

177 

178 :class:`.DDLEvents` 

179 

180 :ref:`event_toplevel` 

181 

182 :ref:`schema_ddl_sequences` 

183 

184 """ 

185 

186 _ddl_if: Optional[DDLIf] = None 

187 target: Union[SchemaItem, str, None] = None 

188 

189 def _execute_on_connection( 

190 self, connection, distilled_params, execution_options 

191 ): 

192 return connection._execute_ddl( 

193 self, distilled_params, execution_options 

194 ) 

195 

196 @_generative 

197 def against(self, target: SchemaItem) -> Self: 

198 """Return a copy of this :class:`_schema.ExecutableDDLElement` which 

199 will include the given target. 

200 

201 This essentially applies the given item to the ``.target`` attribute of 

202 the returned :class:`_schema.ExecutableDDLElement` object. This target 

203 is then usable by event handlers and compilation routines in order to 

204 provide services such as tokenization of a DDL string in terms of a 

205 particular :class:`_schema.Table`. 

206 

207 When a :class:`_schema.ExecutableDDLElement` object is established as 

208 an event handler for the :meth:`_events.DDLEvents.before_create` or 

209 :meth:`_events.DDLEvents.after_create` events, and the event then 

210 occurs for a given target such as a :class:`_schema.Constraint` or 

211 :class:`_schema.Table`, that target is established with a copy of the 

212 :class:`_schema.ExecutableDDLElement` object using this method, which 

213 then proceeds to the :meth:`_schema.ExecutableDDLElement.execute` 

214 method in order to invoke the actual DDL instruction. 

215 

216 :param target: a :class:`_schema.SchemaItem` that will be the subject 

217 of a DDL operation. 

218 

219 :return: a copy of this :class:`_schema.ExecutableDDLElement` with the 

220 ``.target`` attribute assigned to the given 

221 :class:`_schema.SchemaItem`. 

222 

223 .. seealso:: 

224 

225 :class:`_schema.DDL` - uses tokenization against the "target" when 

226 processing the DDL string. 

227 

228 """ 

229 self.target = target 

230 return self 

231 

232 @_generative 

233 def execute_if( 

234 self, 

235 dialect: Optional[str] = None, 

236 callable_: Optional[DDLIfCallable] = None, 

237 state: Optional[Any] = None, 

238 ) -> Self: 

239 r"""Return a callable that will execute this 

240 :class:`_ddl.ExecutableDDLElement` conditionally within an event 

241 handler. 

242 

243 Used to provide a wrapper for event listening:: 

244 

245 event.listen( 

246 metadata, 

247 "before_create", 

248 DDL("my_ddl").execute_if(dialect="postgresql"), 

249 ) 

250 

251 :param dialect: May be a string or tuple of strings. 

252 If a string, it will be compared to the name of the 

253 executing database dialect:: 

254 

255 DDL("something").execute_if(dialect="postgresql") 

256 

257 If a tuple, specifies multiple dialect names:: 

258 

259 DDL("something").execute_if(dialect=("postgresql", "mysql")) 

260 

261 :param callable\_: A callable, which will be invoked with 

262 three positional arguments as well as optional keyword 

263 arguments: 

264 

265 :ddl: 

266 This DDL element. 

267 

268 :target: 

269 The :class:`_schema.Table` or :class:`_schema.MetaData` 

270 object which is the 

271 target of this event. May be None if the DDL is executed 

272 explicitly. 

273 

274 :bind: 

275 The :class:`_engine.Connection` being used for DDL execution. 

276 May be None if this construct is being created inline within 

277 a table, in which case ``compiler`` will be present. 

278 

279 :tables: 

280 Optional keyword argument - a list of Table objects which are to 

281 be created/ dropped within a MetaData.create_all() or drop_all() 

282 method call. 

283 

284 :dialect: keyword argument, but always present - the 

285 :class:`.Dialect` involved in the operation. 

286 

287 :compiler: keyword argument. Will be ``None`` for an engine 

288 level DDL invocation, but will refer to a :class:`.DDLCompiler` 

289 if this DDL element is being created inline within a table. 

290 

291 :state: 

292 Optional keyword argument - will be the ``state`` argument 

293 passed to this function. 

294 

295 :checkfirst: 

296 Keyword argument, will be True if the 'checkfirst' flag was 

297 set during the call to ``create()``, ``create_all()``, 

298 ``drop()``, ``drop_all()``. 

299 

300 If the callable returns a True value, the DDL statement will be 

301 executed. 

302 

303 :param state: any value which will be passed to the callable\_ 

304 as the ``state`` keyword argument. 

305 

306 .. seealso:: 

307 

308 :meth:`.SchemaItem.ddl_if` 

309 

310 :class:`.DDLEvents` 

311 

312 :ref:`event_toplevel` 

313 

314 """ 

315 self._ddl_if = DDLIf(dialect, callable_, state) 

316 return self 

317 

318 def _should_execute(self, target, bind, **kw): 

319 if self._ddl_if is None: 

320 return True 

321 else: 

322 return self._ddl_if._should_execute(self, target, bind, **kw) 

323 

324 def _invoke_with(self, bind): 

325 if self._should_execute(self.target, bind): 

326 return bind.execute(self) 

327 

328 def __call__(self, target, bind, **kw): 

329 """Execute the DDL as a ddl_listener.""" 

330 

331 self.against(target)._invoke_with(bind) 

332 

333 def _generate(self): 

334 s = self.__class__.__new__(self.__class__) 

335 s.__dict__ = self.__dict__.copy() 

336 return s 

337 

338 

339DDLElement = ExecutableDDLElement 

340""":class:`.DDLElement` is renamed to :class:`.ExecutableDDLElement`.""" 

341 

342 

343class DDL(ExecutableDDLElement): 

344 """A literal DDL statement. 

345 

346 Specifies literal SQL DDL to be executed by the database. DDL objects 

347 function as DDL event listeners, and can be subscribed to those events 

348 listed in :class:`.DDLEvents`, using either :class:`_schema.Table` or 

349 :class:`_schema.MetaData` objects as targets. 

350 Basic templating support allows 

351 a single DDL instance to handle repetitive tasks for multiple tables. 

352 

353 Examples:: 

354 

355 from sqlalchemy import event, DDL 

356 

357 tbl = Table("users", metadata, Column("uid", Integer)) 

358 event.listen(tbl, "before_create", DDL("DROP TRIGGER users_trigger")) 

359 

360 spow = DDL("ALTER TABLE %(table)s SET secretpowers TRUE") 

361 event.listen(tbl, "after_create", spow.execute_if(dialect="somedb")) 

362 

363 drop_spow = DDL("ALTER TABLE users SET secretpowers FALSE") 

364 connection.execute(drop_spow) 

365 

366 When operating on Table events, the following ``statement`` 

367 string substitutions are available: 

368 

369 .. sourcecode:: text 

370 

371 %(table)s - the Table name, with any required quoting applied 

372 %(schema)s - the schema name, with any required quoting applied 

373 %(fullname)s - the Table name including schema, quoted if needed 

374 

375 The DDL's "context", if any, will be combined with the standard 

376 substitutions noted above. Keys present in the context will override 

377 the standard substitutions. 

378 

379 """ 

380 

381 __visit_name__ = "ddl" 

382 

383 def __init__(self, statement, context=None): 

384 """Create a DDL statement. 

385 

386 :param statement: 

387 A string or unicode string to be executed. Statements will be 

388 processed with Python's string formatting operator using 

389 a fixed set of string substitutions, as well as additional 

390 substitutions provided by the optional :paramref:`.DDL.context` 

391 parameter. 

392 

393 A literal '%' in a statement must be escaped as '%%'. 

394 

395 SQL bind parameters are not available in DDL statements. 

396 

397 :param context: 

398 Optional dictionary, defaults to None. These values will be 

399 available for use in string substitutions on the DDL statement. 

400 

401 .. seealso:: 

402 

403 :class:`.DDLEvents` 

404 

405 :ref:`event_toplevel` 

406 

407 """ 

408 

409 if not isinstance(statement, str): 

410 raise exc.ArgumentError( 

411 "Expected a string or unicode SQL statement, got '%r'" 

412 % statement 

413 ) 

414 

415 self.statement = statement 

416 self.context = context or {} 

417 

418 def __repr__(self): 

419 parts = [repr(self.statement)] 

420 if self.context: 

421 parts.append(f"context={self.context}") 

422 

423 return "<%s@%s; %s>" % ( 

424 type(self).__name__, 

425 id(self), 

426 ", ".join(parts), 

427 ) 

428 

429 

430class _CreateDropBase(ExecutableDDLElement, Generic[_SI]): 

431 """Base class for DDL constructs that represent CREATE and DROP or 

432 equivalents. 

433 

434 The common theme of _CreateDropBase is a single 

435 ``element`` attribute which refers to the element 

436 to be created or dropped. 

437 

438 """ 

439 

440 element: _SI 

441 

442 def __init__(self, element: _SI) -> None: 

443 self.element = self.target = element 

444 self._ddl_if = getattr(element, "_ddl_if", None) 

445 

446 @property 

447 def stringify_dialect(self): # type: ignore[override] 

448 assert not isinstance(self.element, str) 

449 return self.element.create_drop_stringify_dialect 

450 

451 def _create_rule_disable(self, compiler): 

452 """Allow disable of _create_rule using a callable. 

453 

454 Pass to _create_rule using 

455 util.portable_instancemethod(self._create_rule_disable) 

456 to retain serializability. 

457 

458 """ 

459 return False 

460 

461 

462class _CreateBase(_CreateDropBase[_SI]): 

463 def __init__(self, element: _SI, if_not_exists: bool = False) -> None: 

464 super().__init__(element) 

465 self.if_not_exists = if_not_exists 

466 

467 

468class _DropBase(_CreateDropBase[_SI]): 

469 def __init__(self, element: _SI, if_exists: bool = False) -> None: 

470 super().__init__(element) 

471 self.if_exists = if_exists 

472 

473 

474class CreateSchema(_CreateBase[str]): 

475 """Represent a CREATE SCHEMA statement. 

476 

477 The argument here is the string name of the schema. 

478 

479 """ 

480 

481 __visit_name__ = "create_schema" 

482 

483 stringify_dialect = "default" 

484 

485 def __init__( 

486 self, 

487 name: str, 

488 if_not_exists: bool = False, 

489 ) -> None: 

490 """Create a new :class:`.CreateSchema` construct.""" 

491 

492 super().__init__(element=name, if_not_exists=if_not_exists) 

493 

494 

495class DropSchema(_DropBase[str]): 

496 """Represent a DROP SCHEMA statement. 

497 

498 The argument here is the string name of the schema. 

499 

500 """ 

501 

502 __visit_name__ = "drop_schema" 

503 

504 stringify_dialect = "default" 

505 

506 def __init__( 

507 self, 

508 name: str, 

509 cascade: bool = False, 

510 if_exists: bool = False, 

511 ) -> None: 

512 """Create a new :class:`.DropSchema` construct.""" 

513 

514 super().__init__(element=name, if_exists=if_exists) 

515 self.cascade = cascade 

516 

517 

518class CreateTable(_CreateBase["Table"]): 

519 """Represent a CREATE TABLE statement.""" 

520 

521 __visit_name__ = "create_table" 

522 

523 def __init__( 

524 self, 

525 element: Table, 

526 include_foreign_key_constraints: Optional[ 

527 typing_Sequence[ForeignKeyConstraint] 

528 ] = None, 

529 if_not_exists: bool = False, 

530 ) -> None: 

531 """Create a :class:`.CreateTable` construct. 

532 

533 :param element: a :class:`_schema.Table` that's the subject 

534 of the CREATE 

535 :param on: See the description for 'on' in :class:`.DDL`. 

536 :param include_foreign_key_constraints: optional sequence of 

537 :class:`_schema.ForeignKeyConstraint` objects that will be included 

538 inline within the CREATE construct; if omitted, all foreign key 

539 constraints that do not specify use_alter=True are included. 

540 

541 :param if_not_exists: if True, an IF NOT EXISTS operator will be 

542 applied to the construct. 

543 

544 .. versionadded:: 1.4.0b2 

545 

546 """ 

547 super().__init__(element, if_not_exists=if_not_exists) 

548 self.columns = [CreateColumn(column) for column in element.columns] 

549 self.include_foreign_key_constraints = include_foreign_key_constraints 

550 

551 

552class CreateTableAs(ExecutableDDLElement): 

553 """Represent a CREATE TABLE ... AS statement. 

554 

555 This creates a new table directly from the output of a SELECT. 

556 The set of columns in the new table is derived from the 

557 SELECT list; constraints, indexes, and defaults are not copied. 

558 

559 E.g.:: 

560 

561 from sqlalchemy import select 

562 from sqlalchemy.sql.ddl import CreateTableAs 

563 

564 # Create a new table from a SELECT 

565 stmt = CreateTableAs( 

566 select(users.c.id, users.c.name).where(users.c.status == "active"), 

567 "active_users", 

568 ) 

569 

570 with engine.begin() as conn: 

571 conn.execute(stmt) 

572 

573 # With optional flags 

574 stmt = CreateTableAs( 

575 select(users.c.id, users.c.name), 

576 "temp_snapshot", 

577 temporary=True, 

578 if_not_exists=True, 

579 ) 

580 

581 The generated table object can be accessed via the :attr:`.table` property, 

582 which will be an instance of :class:`.Table`; by default this is associated 

583 with a local :class:`.MetaData` construct:: 

584 

585 stmt = CreateTableAs(select(users.c.id, users.c.name), "active_users") 

586 active_users_table = stmt.table 

587 

588 To associate the :class:`.Table` with an existing :class:`.MetaData`, 

589 use the :paramref:`_schema.CreateTableAs.metadata` parameter:: 

590 

591 stmt = CreateTableAs( 

592 select(users.c.id, users.c.name), 

593 "active_users", 

594 metadata=some_metadata, 

595 ) 

596 active_users_table = stmt.table 

597 

598 .. versionadded:: 2.1 

599 

600 :param selectable: :class:`_sql.Select` 

601 The SELECT statement providing the columns and rows. 

602 

603 :param table_name: str 

604 Table name as a string. Must be unqualified; use the ``schema`` 

605 argument for qualification. 

606 

607 :param metadata: :class:`_schema.MetaData`, optional 

608 If provided, the :class:`_schema.Table` object available via the 

609 :attr:`.table` attribute will be associated with this 

610 :class:`.MetaData`. Otherwise, a new, empty :class:`.MetaData` 

611 is created. 

612 

613 :param schema: str, optional schema or owner name. 

614 

615 :param temporary: bool, default False. 

616 If True, render ``TEMPORARY`` 

617 

618 :param if_not_exists: bool, default False. 

619 If True, render ``IF NOT EXISTS`` 

620 

621 .. seealso:: 

622 

623 :ref:`tutorial_create_table_as` - in the :ref:`unified_tutorial` 

624 

625 :meth:`_sql.SelectBase.into` - convenience method to create a 

626 :class:`_schema.CreateTableAs` from a SELECT statement 

627 

628 

629 

630 """ 

631 

632 __visit_name__ = "create_table_as" 

633 inherit_cache = False 

634 

635 table: Table 

636 """:class:`.Table` object representing the table that this 

637 :class:`.CreateTableAs` would generate when executed.""" 

638 

639 def __init__( 

640 self, 

641 selectable: SelectBase, 

642 table_name: str, 

643 *, 

644 metadata: Optional["MetaData"] = None, 

645 schema: Optional[str] = None, 

646 temporary: bool = False, 

647 if_not_exists: bool = False, 

648 ): 

649 # Coerce selectable to a Select statement 

650 selectable = coercions.expect(roles.DMLSelectRole, selectable) 

651 

652 if isinstance(table_name, str): 

653 if not table_name: 

654 raise exc.ArgumentError("Table name must be non-empty") 

655 

656 if "." in table_name: 

657 raise exc.ArgumentError( 

658 "Target string must be unqualified (use schema=)." 

659 ) 

660 

661 self.schema = schema 

662 self.selectable = selectable 

663 self.temporary = bool(temporary) 

664 self.if_not_exists = bool(if_not_exists) 

665 self.metadata = metadata 

666 self.table_name = table_name 

667 self._gen_table() 

668 

669 @util.preload_module("sqlalchemy.sql.schema") 

670 def _gen_table(self): 

671 MetaData = util.preloaded.sql_schema.MetaData 

672 Column = util.preloaded.sql_schema.Column 

673 Table = util.preloaded.sql_schema.Table 

674 MetaData = util.preloaded.sql_schema.MetaData 

675 

676 column_name_type_pairs = ( 

677 (name, col_element.type) 

678 for _, name, _, col_element, _ in ( 

679 self.selectable._generate_columns_plus_names( 

680 anon_for_dupe_key=False 

681 ) 

682 ) 

683 ) 

684 

685 if self.metadata is None: 

686 self.metadata = metadata = MetaData() 

687 else: 

688 metadata = self.metadata 

689 

690 self.table = Table( 

691 self.table_name, 

692 metadata, 

693 *(Column(name, typ) for name, typ in column_name_type_pairs), 

694 schema=self.schema, 

695 ) 

696 

697 

698class _DropView(_DropBase["Table"]): 

699 """Semi-public 'DROP VIEW' construct. 

700 

701 Used by the test suite for dialect-agnostic drops of views. 

702 This object will eventually be part of a public "view" API. 

703 

704 """ 

705 

706 __visit_name__ = "drop_view" 

707 

708 

709class CreateConstraint(BaseDDLElement): 

710 element: Constraint 

711 

712 def __init__(self, element: Constraint) -> None: 

713 self.element = element 

714 

715 

716class CreateColumn(BaseDDLElement): 

717 """Represent a :class:`_schema.Column` 

718 as rendered in a CREATE TABLE statement, 

719 via the :class:`.CreateTable` construct. 

720 

721 This is provided to support custom column DDL within the generation 

722 of CREATE TABLE statements, by using the 

723 compiler extension documented in :ref:`sqlalchemy.ext.compiler_toplevel` 

724 to extend :class:`.CreateColumn`. 

725 

726 Typical integration is to examine the incoming :class:`_schema.Column` 

727 object, and to redirect compilation if a particular flag or condition 

728 is found:: 

729 

730 from sqlalchemy import schema 

731 from sqlalchemy.ext.compiler import compiles 

732 

733 

734 @compiles(schema.CreateColumn) 

735 def compile(element, compiler, **kw): 

736 column = element.element 

737 

738 if "special" not in column.info: 

739 return compiler.visit_create_column(element, **kw) 

740 

741 text = "%s SPECIAL DIRECTIVE %s" % ( 

742 column.name, 

743 compiler.type_compiler.process(column.type), 

744 ) 

745 default = compiler.get_column_default_string(column) 

746 if default is not None: 

747 text += " DEFAULT " + default 

748 

749 if not column.nullable: 

750 text += " NOT NULL" 

751 

752 if column.constraints: 

753 text += " ".join( 

754 compiler.process(const) for const in column.constraints 

755 ) 

756 return text 

757 

758 The above construct can be applied to a :class:`_schema.Table` 

759 as follows:: 

760 

761 from sqlalchemy import Table, Metadata, Column, Integer, String 

762 from sqlalchemy import schema 

763 

764 metadata = MetaData() 

765 

766 table = Table( 

767 "mytable", 

768 MetaData(), 

769 Column("x", Integer, info={"special": True}, primary_key=True), 

770 Column("y", String(50)), 

771 Column("z", String(20), info={"special": True}), 

772 ) 

773 

774 metadata.create_all(conn) 

775 

776 Above, the directives we've added to the :attr:`_schema.Column.info` 

777 collection 

778 will be detected by our custom compilation scheme: 

779 

780 .. sourcecode:: sql 

781 

782 CREATE TABLE mytable ( 

783 x SPECIAL DIRECTIVE INTEGER NOT NULL, 

784 y VARCHAR(50), 

785 z SPECIAL DIRECTIVE VARCHAR(20), 

786 PRIMARY KEY (x) 

787 ) 

788 

789 The :class:`.CreateColumn` construct can also be used to skip certain 

790 columns when producing a ``CREATE TABLE``. This is accomplished by 

791 creating a compilation rule that conditionally returns ``None``. 

792 This is essentially how to produce the same effect as using the 

793 ``system=True`` argument on :class:`_schema.Column`, which marks a column 

794 as an implicitly-present "system" column. 

795 

796 For example, suppose we wish to produce a :class:`_schema.Table` 

797 which skips 

798 rendering of the PostgreSQL ``xmin`` column against the PostgreSQL 

799 backend, but on other backends does render it, in anticipation of a 

800 triggered rule. A conditional compilation rule could skip this name only 

801 on PostgreSQL:: 

802 

803 from sqlalchemy.schema import CreateColumn 

804 

805 

806 @compiles(CreateColumn, "postgresql") 

807 def skip_xmin(element, compiler, **kw): 

808 if element.element.name == "xmin": 

809 return None 

810 else: 

811 return compiler.visit_create_column(element, **kw) 

812 

813 

814 my_table = Table( 

815 "mytable", 

816 metadata, 

817 Column("id", Integer, primary_key=True), 

818 Column("xmin", Integer), 

819 ) 

820 

821 Above, a :class:`.CreateTable` construct will generate a ``CREATE TABLE`` 

822 which only includes the ``id`` column in the string; the ``xmin`` column 

823 will be omitted, but only against the PostgreSQL backend. 

824 

825 """ 

826 

827 __visit_name__ = "create_column" 

828 

829 element: Column[Any] 

830 

831 def __init__(self, element: Column[Any]) -> None: 

832 self.element = element 

833 

834 

835class DropTable(_DropBase["Table"]): 

836 """Represent a DROP TABLE statement.""" 

837 

838 __visit_name__ = "drop_table" 

839 

840 def __init__(self, element: Table, if_exists: bool = False) -> None: 

841 """Create a :class:`.DropTable` construct. 

842 

843 :param element: a :class:`_schema.Table` that's the subject 

844 of the DROP. 

845 :param on: See the description for 'on' in :class:`.DDL`. 

846 :param if_exists: if True, an IF EXISTS operator will be applied to the 

847 construct. 

848 

849 .. versionadded:: 1.4.0b2 

850 

851 """ 

852 super().__init__(element, if_exists=if_exists) 

853 

854 

855class CreateSequence(_CreateBase["Sequence"]): 

856 """Represent a CREATE SEQUENCE statement.""" 

857 

858 __visit_name__ = "create_sequence" 

859 

860 

861class DropSequence(_DropBase["Sequence"]): 

862 """Represent a DROP SEQUENCE statement.""" 

863 

864 __visit_name__ = "drop_sequence" 

865 

866 

867class CreateIndex(_CreateBase["Index"]): 

868 """Represent a CREATE INDEX statement.""" 

869 

870 __visit_name__ = "create_index" 

871 

872 def __init__(self, element: Index, if_not_exists: bool = False) -> None: 

873 """Create a :class:`.Createindex` construct. 

874 

875 :param element: a :class:`_schema.Index` that's the subject 

876 of the CREATE. 

877 :param if_not_exists: if True, an IF NOT EXISTS operator will be 

878 applied to the construct. 

879 

880 .. versionadded:: 1.4.0b2 

881 

882 """ 

883 super().__init__(element, if_not_exists=if_not_exists) 

884 

885 

886class DropIndex(_DropBase["Index"]): 

887 """Represent a DROP INDEX statement.""" 

888 

889 __visit_name__ = "drop_index" 

890 

891 def __init__(self, element: Index, if_exists: bool = False) -> None: 

892 """Create a :class:`.DropIndex` construct. 

893 

894 :param element: a :class:`_schema.Index` that's the subject 

895 of the DROP. 

896 :param if_exists: if True, an IF EXISTS operator will be applied to the 

897 construct. 

898 

899 .. versionadded:: 1.4.0b2 

900 

901 """ 

902 super().__init__(element, if_exists=if_exists) 

903 

904 

905class AddConstraint(_CreateBase["Constraint"]): 

906 """Represent an ALTER TABLE ADD CONSTRAINT statement.""" 

907 

908 __visit_name__ = "add_constraint" 

909 

910 def __init__( 

911 self, 

912 element: Constraint, 

913 *, 

914 isolate_from_table: bool = True, 

915 ) -> None: 

916 """Construct a new :class:`.AddConstraint` construct. 

917 

918 :param element: a :class:`.Constraint` object 

919 

920 :param isolate_from_table: optional boolean, defaults to True. Has 

921 the effect of the incoming constraint being isolated from being 

922 included in a CREATE TABLE sequence when associated with a 

923 :class:`.Table`. 

924 

925 .. versionadded:: 2.0.39 - added 

926 :paramref:`.AddConstraint.isolate_from_table`, defaulting 

927 to True. Previously, the behavior of this parameter was implicitly 

928 turned on in all cases. 

929 

930 """ 

931 super().__init__(element) 

932 

933 if isolate_from_table: 

934 element._create_rule = self._create_rule_disable 

935 

936 

937class DropConstraint(_DropBase["Constraint"]): 

938 """Represent an ALTER TABLE DROP CONSTRAINT statement.""" 

939 

940 __visit_name__ = "drop_constraint" 

941 

942 def __init__( 

943 self, 

944 element: Constraint, 

945 *, 

946 cascade: bool = False, 

947 if_exists: bool = False, 

948 isolate_from_table: bool = True, 

949 **kw: Any, 

950 ) -> None: 

951 """Construct a new :class:`.DropConstraint` construct. 

952 

953 :param element: a :class:`.Constraint` object 

954 :param cascade: optional boolean, indicates backend-specific 

955 "CASCADE CONSTRAINT" directive should be rendered if available 

956 :param if_exists: optional boolean, indicates backend-specific 

957 "IF EXISTS" directive should be rendered if available 

958 :param isolate_from_table: optional boolean, defaults to True. Has 

959 the effect of the incoming constraint being isolated from being 

960 included in a CREATE TABLE sequence when associated with a 

961 :class:`.Table`. 

962 

963 .. versionadded:: 2.0.39 - added 

964 :paramref:`.DropConstraint.isolate_from_table`, defaulting 

965 to True. Previously, the behavior of this parameter was implicitly 

966 turned on in all cases. 

967 

968 """ 

969 self.cascade = cascade 

970 super().__init__(element, if_exists=if_exists, **kw) 

971 

972 if isolate_from_table: 

973 element._create_rule = self._create_rule_disable 

974 

975 

976class SetTableComment(_CreateDropBase["Table"]): 

977 """Represent a COMMENT ON TABLE IS statement.""" 

978 

979 __visit_name__ = "set_table_comment" 

980 

981 

982class DropTableComment(_CreateDropBase["Table"]): 

983 """Represent a COMMENT ON TABLE '' statement. 

984 

985 Note this varies a lot across database backends. 

986 

987 """ 

988 

989 __visit_name__ = "drop_table_comment" 

990 

991 

992class SetColumnComment(_CreateDropBase["Column[Any]"]): 

993 """Represent a COMMENT ON COLUMN IS statement.""" 

994 

995 __visit_name__ = "set_column_comment" 

996 

997 

998class DropColumnComment(_CreateDropBase["Column[Any]"]): 

999 """Represent a COMMENT ON COLUMN IS NULL statement.""" 

1000 

1001 __visit_name__ = "drop_column_comment" 

1002 

1003 

1004class SetConstraintComment(_CreateDropBase["Constraint"]): 

1005 """Represent a COMMENT ON CONSTRAINT IS statement.""" 

1006 

1007 __visit_name__ = "set_constraint_comment" 

1008 

1009 

1010class DropConstraintComment(_CreateDropBase["Constraint"]): 

1011 """Represent a COMMENT ON CONSTRAINT IS NULL statement.""" 

1012 

1013 __visit_name__ = "drop_constraint_comment" 

1014 

1015 

1016class InvokeDDLBase(SchemaVisitor): 

1017 def __init__(self, connection, **kw): 

1018 self.connection = connection 

1019 assert not kw, f"Unexpected keywords: {kw.keys()}" 

1020 

1021 @contextlib.contextmanager 

1022 def with_ddl_events(self, target, **kw): 

1023 """helper context manager that will apply appropriate DDL events 

1024 to a CREATE or DROP operation.""" 

1025 

1026 raise NotImplementedError() 

1027 

1028 

1029class InvokeCreateDDLBase(InvokeDDLBase): 

1030 @contextlib.contextmanager 

1031 def with_ddl_events(self, target, **kw): 

1032 """helper context manager that will apply appropriate DDL events 

1033 to a CREATE or DROP operation.""" 

1034 

1035 target.dispatch.before_create( 

1036 target, self.connection, _ddl_runner=self, **kw 

1037 ) 

1038 yield 

1039 target.dispatch.after_create( 

1040 target, self.connection, _ddl_runner=self, **kw 

1041 ) 

1042 

1043 

1044class InvokeDropDDLBase(InvokeDDLBase): 

1045 @contextlib.contextmanager 

1046 def with_ddl_events(self, target, **kw): 

1047 """helper context manager that will apply appropriate DDL events 

1048 to a CREATE or DROP operation.""" 

1049 

1050 target.dispatch.before_drop( 

1051 target, self.connection, _ddl_runner=self, **kw 

1052 ) 

1053 yield 

1054 target.dispatch.after_drop( 

1055 target, self.connection, _ddl_runner=self, **kw 

1056 ) 

1057 

1058 

1059class CheckFirst(Flag): 

1060 """Enumeration for the :paramref:`.MetaData.create_all.checkfirst` 

1061 parameter passed to methods like :meth:`.MetaData.create_all`, 

1062 :meth:`.MetaData.drop_all`, :meth:`.Table.create`, :meth:`.Table.drop` and 

1063 others. 

1064 

1065 This enumeration indicates what kinds of objects should be "checked" 

1066 with a separate query before emitting CREATE or DROP for that object. 

1067 

1068 Can use ``CheckFirst(bool_value)`` to convert from a boolean value. 

1069 

1070 .. versionadded:: 2.1 

1071 

1072 """ 

1073 

1074 NONE = 0 # equivalent to False 

1075 """No items should be checked""" 

1076 

1077 # avoid 1 so that bool True doesn't match by value 

1078 TABLES = 2 

1079 """Check for tables""" 

1080 

1081 INDEXES = auto() 

1082 """Check for indexes""" 

1083 

1084 SEQUENCES = auto() 

1085 """Check for sequences""" 

1086 

1087 TYPES = auto() 

1088 """Check for custom datatypes that are created server-side 

1089 

1090 This is currently used by PostgreSQL. 

1091 

1092 """ 

1093 

1094 ALL = TABLES | INDEXES | SEQUENCES | TYPES # equivalent to True 

1095 

1096 @classmethod 

1097 def _missing_(cls, value: object) -> Any: 

1098 if isinstance(value, bool): 

1099 return cls.ALL if value else cls.NONE 

1100 return super()._missing_(value) 

1101 

1102 

1103class SchemaGenerator(InvokeCreateDDLBase): 

1104 def __init__( 

1105 self, 

1106 dialect, 

1107 connection, 

1108 checkfirst=CheckFirst.NONE, 

1109 tables=None, 

1110 **kwargs, 

1111 ): 

1112 super().__init__(connection, **kwargs) 

1113 self.checkfirst = CheckFirst(checkfirst) 

1114 self.tables = tables 

1115 self.preparer = dialect.identifier_preparer 

1116 self.dialect = dialect 

1117 self.memo = {} 

1118 

1119 def _can_create_table(self, table): 

1120 self.dialect.validate_identifier(table.name) 

1121 effective_schema = self.connection.schema_for_object(table) 

1122 if effective_schema: 

1123 self.dialect.validate_identifier(effective_schema) 

1124 return ( 

1125 not self.checkfirst & CheckFirst.TABLES 

1126 or not self.dialect.has_table( 

1127 self.connection, table.name, schema=effective_schema 

1128 ) 

1129 ) 

1130 

1131 def _can_create_index(self, index): 

1132 effective_schema = self.connection.schema_for_object(index.table) 

1133 if effective_schema: 

1134 self.dialect.validate_identifier(effective_schema) 

1135 return ( 

1136 not self.checkfirst & CheckFirst.INDEXES 

1137 or not self.dialect.has_index( 

1138 self.connection, 

1139 index.table.name, 

1140 index.name, 

1141 schema=effective_schema, 

1142 ) 

1143 ) 

1144 

1145 def _can_create_sequence(self, sequence): 

1146 effective_schema = self.connection.schema_for_object(sequence) 

1147 

1148 return self.dialect.supports_sequences and ( 

1149 (not self.dialect.sequences_optional or not sequence.optional) 

1150 and ( 

1151 not self.checkfirst & CheckFirst.SEQUENCES 

1152 or not self.dialect.has_sequence( 

1153 self.connection, sequence.name, schema=effective_schema 

1154 ) 

1155 ) 

1156 ) 

1157 

1158 def visit_metadata(self, metadata): 

1159 if self.tables is not None: 

1160 tables = self.tables 

1161 else: 

1162 tables = list(metadata.tables.values()) 

1163 

1164 collection = sort_tables_and_constraints( 

1165 [t for t in tables if self._can_create_table(t)] 

1166 ) 

1167 

1168 seq_coll = [ 

1169 s 

1170 for s in metadata._sequences.values() 

1171 if s.column is None and self._can_create_sequence(s) 

1172 ] 

1173 

1174 event_collection = [t for (t, fks) in collection if t is not None] 

1175 

1176 with self.with_ddl_events( 

1177 metadata, 

1178 tables=event_collection, 

1179 checkfirst=self.checkfirst, 

1180 ): 

1181 for seq in seq_coll: 

1182 self.traverse_single(seq, create_ok=True) 

1183 

1184 for table, fkcs in collection: 

1185 if table is not None: 

1186 self.traverse_single( 

1187 table, 

1188 create_ok=True, 

1189 include_foreign_key_constraints=fkcs, 

1190 _is_metadata_operation=True, 

1191 ) 

1192 else: 

1193 for fkc in fkcs: 

1194 self.traverse_single(fkc) 

1195 

1196 def visit_table( 

1197 self, 

1198 table, 

1199 create_ok=False, 

1200 include_foreign_key_constraints=None, 

1201 _is_metadata_operation=False, 

1202 ): 

1203 if not create_ok and not self._can_create_table(table): 

1204 return 

1205 

1206 with self.with_ddl_events( 

1207 table, 

1208 checkfirst=self.checkfirst, 

1209 _is_metadata_operation=_is_metadata_operation, 

1210 ): 

1211 for column in table.columns: 

1212 if column.default is not None: 

1213 self.traverse_single(column.default) 

1214 

1215 if not self.dialect.supports_alter: 

1216 # e.g., don't omit any foreign key constraints 

1217 include_foreign_key_constraints = None 

1218 

1219 CreateTable( 

1220 table, 

1221 include_foreign_key_constraints=( 

1222 include_foreign_key_constraints 

1223 ), 

1224 )._invoke_with(self.connection) 

1225 

1226 if hasattr(table, "indexes"): 

1227 for index in table.indexes: 

1228 self.traverse_single(index, create_ok=True) 

1229 

1230 if ( 

1231 self.dialect.supports_comments 

1232 and not self.dialect.inline_comments 

1233 ): 

1234 if table.comment is not None: 

1235 SetTableComment(table)._invoke_with(self.connection) 

1236 

1237 for column in table.columns: 

1238 if column.comment is not None: 

1239 SetColumnComment(column)._invoke_with(self.connection) 

1240 

1241 if self.dialect.supports_constraint_comments: 

1242 for constraint in table.constraints: 

1243 if constraint.comment is not None: 

1244 self.connection.execute( 

1245 SetConstraintComment(constraint) 

1246 ) 

1247 

1248 def visit_foreign_key_constraint(self, constraint): 

1249 if not self.dialect.supports_alter: 

1250 return 

1251 

1252 with self.with_ddl_events(constraint): 

1253 AddConstraint(constraint)._invoke_with(self.connection) 

1254 

1255 def visit_sequence(self, sequence, create_ok=False): 

1256 if not create_ok and not self._can_create_sequence(sequence): 

1257 return 

1258 with self.with_ddl_events(sequence): 

1259 CreateSequence(sequence)._invoke_with(self.connection) 

1260 

1261 def visit_index(self, index, create_ok=False): 

1262 if not create_ok and not self._can_create_index(index): 

1263 return 

1264 with self.with_ddl_events(index): 

1265 CreateIndex(index)._invoke_with(self.connection) 

1266 

1267 

1268class SchemaDropper(InvokeDropDDLBase): 

1269 def __init__( 

1270 self, 

1271 dialect, 

1272 connection, 

1273 checkfirst=CheckFirst.NONE, 

1274 tables=None, 

1275 **kwargs, 

1276 ): 

1277 super().__init__(connection, **kwargs) 

1278 self.checkfirst = CheckFirst(checkfirst) 

1279 self.tables = tables 

1280 self.preparer = dialect.identifier_preparer 

1281 self.dialect = dialect 

1282 self.memo = {} 

1283 

1284 def visit_metadata(self, metadata): 

1285 if self.tables is not None: 

1286 tables = self.tables 

1287 else: 

1288 tables = list(metadata.tables.values()) 

1289 

1290 try: 

1291 unsorted_tables = [t for t in tables if self._can_drop_table(t)] 

1292 collection = list( 

1293 reversed( 

1294 sort_tables_and_constraints( 

1295 unsorted_tables, 

1296 filter_fn=lambda constraint: ( 

1297 False 

1298 if not self.dialect.supports_alter 

1299 or constraint.name is None 

1300 else None 

1301 ), 

1302 ) 

1303 ) 

1304 ) 

1305 except exc.CircularDependencyError as err2: 

1306 if not self.dialect.supports_alter: 

1307 util.warn( 

1308 "Can't sort tables for DROP; an " 

1309 "unresolvable foreign key " 

1310 "dependency exists between tables: %s; and backend does " 

1311 "not support ALTER. To restore at least a partial sort, " 

1312 "apply use_alter=True to ForeignKey and " 

1313 "ForeignKeyConstraint " 

1314 "objects involved in the cycle to mark these as known " 

1315 "cycles that will be ignored." 

1316 % (", ".join(sorted([t.fullname for t in err2.cycles]))) 

1317 ) 

1318 collection = [(t, ()) for t in unsorted_tables] 

1319 else: 

1320 raise exc.CircularDependencyError( 

1321 err2.args[0], 

1322 err2.cycles, 

1323 err2.edges, 

1324 msg="Can't sort tables for DROP; an " 

1325 "unresolvable foreign key " 

1326 "dependency exists between tables: %s. Please ensure " 

1327 "that the ForeignKey and ForeignKeyConstraint objects " 

1328 "involved in the cycle have " 

1329 "names so that they can be dropped using " 

1330 "DROP CONSTRAINT." 

1331 % (", ".join(sorted([t.fullname for t in err2.cycles]))), 

1332 ) from err2 

1333 

1334 seq_coll = [ 

1335 s 

1336 for s in metadata._sequences.values() 

1337 if self._can_drop_sequence(s) 

1338 ] 

1339 

1340 event_collection = [t for (t, fks) in collection if t is not None] 

1341 

1342 with self.with_ddl_events( 

1343 metadata, 

1344 tables=event_collection, 

1345 checkfirst=self.checkfirst, 

1346 ): 

1347 for table, fkcs in collection: 

1348 if table is not None: 

1349 self.traverse_single( 

1350 table, 

1351 drop_ok=True, 

1352 _is_metadata_operation=True, 

1353 _ignore_sequences=seq_coll, 

1354 ) 

1355 else: 

1356 for fkc in fkcs: 

1357 self.traverse_single(fkc) 

1358 

1359 for seq in seq_coll: 

1360 self.traverse_single(seq, drop_ok=seq.column is None) 

1361 

1362 def _can_drop_table(self, table): 

1363 self.dialect.validate_identifier(table.name) 

1364 effective_schema = self.connection.schema_for_object(table) 

1365 if effective_schema: 

1366 self.dialect.validate_identifier(effective_schema) 

1367 return ( 

1368 not self.checkfirst & CheckFirst.TABLES 

1369 or self.dialect.has_table( 

1370 self.connection, table.name, schema=effective_schema 

1371 ) 

1372 ) 

1373 

1374 def _can_drop_index(self, index): 

1375 effective_schema = self.connection.schema_for_object(index.table) 

1376 if effective_schema: 

1377 self.dialect.validate_identifier(effective_schema) 

1378 return ( 

1379 not self.checkfirst & CheckFirst.INDEXES 

1380 or self.dialect.has_index( 

1381 self.connection, 

1382 index.table.name, 

1383 index.name, 

1384 schema=effective_schema, 

1385 ) 

1386 ) 

1387 

1388 def _can_drop_sequence(self, sequence): 

1389 effective_schema = self.connection.schema_for_object(sequence) 

1390 return self.dialect.supports_sequences and ( 

1391 (not self.dialect.sequences_optional or not sequence.optional) 

1392 and ( 

1393 not self.checkfirst & CheckFirst.SEQUENCES 

1394 or self.dialect.has_sequence( 

1395 self.connection, sequence.name, schema=effective_schema 

1396 ) 

1397 ) 

1398 ) 

1399 

1400 def visit_index(self, index, drop_ok=False): 

1401 if not drop_ok and not self._can_drop_index(index): 

1402 return 

1403 

1404 with self.with_ddl_events(index): 

1405 DropIndex(index)(index, self.connection) 

1406 

1407 def visit_table( 

1408 self, 

1409 table, 

1410 drop_ok=False, 

1411 _is_metadata_operation=False, 

1412 _ignore_sequences=(), 

1413 ): 

1414 if not drop_ok and not self._can_drop_table(table): 

1415 return 

1416 

1417 with self.with_ddl_events( 

1418 table, 

1419 checkfirst=self.checkfirst, 

1420 _is_metadata_operation=_is_metadata_operation, 

1421 ): 

1422 DropTable(table)._invoke_with(self.connection) 

1423 

1424 # traverse client side defaults which may refer to server-side 

1425 # sequences. noting that some of these client side defaults may 

1426 # also be set up as server side defaults 

1427 # (see https://docs.sqlalchemy.org/en/ 

1428 # latest/core/defaults.html 

1429 # #associating-a-sequence-as-the-server-side- 

1430 # default), so have to be dropped after the table is dropped. 

1431 for column in table.columns: 

1432 if ( 

1433 column.default is not None 

1434 and column.default not in _ignore_sequences 

1435 ): 

1436 self.traverse_single(column.default) 

1437 

1438 def visit_foreign_key_constraint(self, constraint): 

1439 if not self.dialect.supports_alter: 

1440 return 

1441 with self.with_ddl_events(constraint): 

1442 DropConstraint(constraint)._invoke_with(self.connection) 

1443 

1444 def visit_sequence(self, sequence, drop_ok=False): 

1445 if not drop_ok and not self._can_drop_sequence(sequence): 

1446 return 

1447 with self.with_ddl_events(sequence): 

1448 DropSequence(sequence)._invoke_with(self.connection) 

1449 

1450 

1451def sort_tables( 

1452 tables: Iterable[TableClause], 

1453 skip_fn: Optional[Callable[[ForeignKeyConstraint], bool]] = None, 

1454 extra_dependencies: Optional[ 

1455 typing_Sequence[Tuple[TableClause, TableClause]] 

1456 ] = None, 

1457) -> List[Table]: 

1458 """Sort a collection of :class:`_schema.Table` objects based on 

1459 dependency. 

1460 

1461 This is a dependency-ordered sort which will emit :class:`_schema.Table` 

1462 objects such that they will follow their dependent :class:`_schema.Table` 

1463 objects. 

1464 Tables are dependent on another based on the presence of 

1465 :class:`_schema.ForeignKeyConstraint` 

1466 objects as well as explicit dependencies 

1467 added by :meth:`_schema.Table.add_is_dependent_on`. 

1468 

1469 .. warning:: 

1470 

1471 The :func:`._schema.sort_tables` function cannot by itself 

1472 accommodate automatic resolution of dependency cycles between 

1473 tables, which are usually caused by mutually dependent foreign key 

1474 constraints. When these cycles are detected, the foreign keys 

1475 of these tables are omitted from consideration in the sort. 

1476 A warning is emitted when this condition occurs, which will be an 

1477 exception raise in a future release. Tables which are not part 

1478 of the cycle will still be returned in dependency order. 

1479 

1480 To resolve these cycles, the 

1481 :paramref:`_schema.ForeignKeyConstraint.use_alter` parameter may be 

1482 applied to those constraints which create a cycle. Alternatively, 

1483 the :func:`_schema.sort_tables_and_constraints` function will 

1484 automatically return foreign key constraints in a separate 

1485 collection when cycles are detected so that they may be applied 

1486 to a schema separately. 

1487 

1488 :param tables: a sequence of :class:`_schema.Table` objects. 

1489 

1490 :param skip_fn: optional callable which will be passed a 

1491 :class:`_schema.ForeignKeyConstraint` object; if it returns True, this 

1492 constraint will not be considered as a dependency. Note this is 

1493 **different** from the same parameter in 

1494 :func:`.sort_tables_and_constraints`, which is 

1495 instead passed the owning :class:`_schema.ForeignKeyConstraint` object. 

1496 

1497 :param extra_dependencies: a sequence of 2-tuples of tables which will 

1498 also be considered as dependent on each other. 

1499 

1500 .. seealso:: 

1501 

1502 :func:`.sort_tables_and_constraints` 

1503 

1504 :attr:`_schema.MetaData.sorted_tables` - uses this function to sort 

1505 

1506 

1507 """ 

1508 

1509 if skip_fn is not None: 

1510 fixed_skip_fn = skip_fn 

1511 

1512 def _skip_fn(fkc): 

1513 for fk in fkc.elements: 

1514 if fixed_skip_fn(fk): 

1515 return True 

1516 else: 

1517 return None 

1518 

1519 else: 

1520 _skip_fn = None # type: ignore 

1521 

1522 return [ 

1523 t 

1524 for (t, fkcs) in sort_tables_and_constraints( 

1525 tables, 

1526 filter_fn=_skip_fn, 

1527 extra_dependencies=extra_dependencies, 

1528 _warn_for_cycles=True, 

1529 ) 

1530 if t is not None 

1531 ] 

1532 

1533 

1534def sort_tables_and_constraints( 

1535 tables, filter_fn=None, extra_dependencies=None, _warn_for_cycles=False 

1536): 

1537 """Sort a collection of :class:`_schema.Table` / 

1538 :class:`_schema.ForeignKeyConstraint` 

1539 objects. 

1540 

1541 This is a dependency-ordered sort which will emit tuples of 

1542 ``(Table, [ForeignKeyConstraint, ...])`` such that each 

1543 :class:`_schema.Table` follows its dependent :class:`_schema.Table` 

1544 objects. 

1545 Remaining :class:`_schema.ForeignKeyConstraint` 

1546 objects that are separate due to 

1547 dependency rules not satisfied by the sort are emitted afterwards 

1548 as ``(None, [ForeignKeyConstraint ...])``. 

1549 

1550 Tables are dependent on another based on the presence of 

1551 :class:`_schema.ForeignKeyConstraint` objects, explicit dependencies 

1552 added by :meth:`_schema.Table.add_is_dependent_on`, 

1553 as well as dependencies 

1554 stated here using the :paramref:`~.sort_tables_and_constraints.skip_fn` 

1555 and/or :paramref:`~.sort_tables_and_constraints.extra_dependencies` 

1556 parameters. 

1557 

1558 :param tables: a sequence of :class:`_schema.Table` objects. 

1559 

1560 :param filter_fn: optional callable which will be passed a 

1561 :class:`_schema.ForeignKeyConstraint` object, 

1562 and returns a value based on 

1563 whether this constraint should definitely be included or excluded as 

1564 an inline constraint, or neither. If it returns False, the constraint 

1565 will definitely be included as a dependency that cannot be subject 

1566 to ALTER; if True, it will **only** be included as an ALTER result at 

1567 the end. Returning None means the constraint is included in the 

1568 table-based result unless it is detected as part of a dependency cycle. 

1569 

1570 :param extra_dependencies: a sequence of 2-tuples of tables which will 

1571 also be considered as dependent on each other. 

1572 

1573 .. seealso:: 

1574 

1575 :func:`.sort_tables` 

1576 

1577 

1578 """ 

1579 

1580 fixed_dependencies = set() 

1581 mutable_dependencies = set() 

1582 

1583 if extra_dependencies is not None: 

1584 fixed_dependencies.update(extra_dependencies) 

1585 

1586 remaining_fkcs = set() 

1587 for table in tables: 

1588 for fkc in table.foreign_key_constraints: 

1589 if fkc.use_alter is True: 

1590 remaining_fkcs.add(fkc) 

1591 continue 

1592 

1593 if filter_fn: 

1594 filtered = filter_fn(fkc) 

1595 

1596 if filtered is True: 

1597 remaining_fkcs.add(fkc) 

1598 continue 

1599 

1600 dependent_on = fkc.referred_table 

1601 if dependent_on is not table: 

1602 mutable_dependencies.add((dependent_on, table)) 

1603 

1604 fixed_dependencies.update( 

1605 (parent, table) for parent in table._extra_dependencies 

1606 ) 

1607 

1608 try: 

1609 candidate_sort = list( 

1610 topological.sort( 

1611 fixed_dependencies.union(mutable_dependencies), 

1612 tables, 

1613 ) 

1614 ) 

1615 except exc.CircularDependencyError as err: 

1616 if _warn_for_cycles: 

1617 util.warn( 

1618 "Cannot correctly sort tables; there are unresolvable cycles " 

1619 'between tables "%s", which is usually caused by mutually ' 

1620 "dependent foreign key constraints. Foreign key constraints " 

1621 "involving these tables will not be considered; this warning " 

1622 "may raise an error in a future release." 

1623 % (", ".join(sorted(t.fullname for t in err.cycles)),) 

1624 ) 

1625 for edge in err.edges: 

1626 if edge in mutable_dependencies: 

1627 table = edge[1] 

1628 if table not in err.cycles: 

1629 continue 

1630 can_remove = [ 

1631 fkc 

1632 for fkc in table.foreign_key_constraints 

1633 if filter_fn is None or filter_fn(fkc) is not False 

1634 ] 

1635 remaining_fkcs.update(can_remove) 

1636 for fkc in can_remove: 

1637 dependent_on = fkc.referred_table 

1638 if dependent_on is not table: 

1639 mutable_dependencies.discard((dependent_on, table)) 

1640 candidate_sort = list( 

1641 topological.sort( 

1642 fixed_dependencies.union(mutable_dependencies), 

1643 tables, 

1644 ) 

1645 ) 

1646 

1647 return [ 

1648 (table, table.foreign_key_constraints.difference(remaining_fkcs)) 

1649 for table in candidate_sort 

1650 ] + [(None, list(remaining_fkcs))]