Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1452 statements  

1# dialects/postgresql/base.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9r""" 

10.. dialect:: postgresql 

11 :name: PostgreSQL 

12 :normal_support: 9.6+ 

13 :best_effort: 9+ 

14 

15.. _postgresql_sequences: 

16 

17Sequences/SERIAL/IDENTITY 

18------------------------- 

19 

20PostgreSQL supports sequences, and SQLAlchemy uses these as the default means 

21of creating new primary key values for integer-based primary key columns. When 

22creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for 

23integer-based primary key columns, which generates a sequence and server side 

24default corresponding to the column. 

25 

26To specify a specific named sequence to be used for primary key generation, 

27use the :func:`~sqlalchemy.schema.Sequence` construct:: 

28 

29 Table( 

30 "sometable", 

31 metadata, 

32 Column( 

33 "id", Integer, Sequence("some_id_seq", start=1), primary_key=True 

34 ), 

35 ) 

36 

37When SQLAlchemy issues a single INSERT statement, to fulfill the contract of 

38having the "last insert identifier" available, a RETURNING clause is added to 

39the INSERT statement which specifies the primary key columns should be 

40returned after the statement completes. The RETURNING functionality only takes 

41place if PostgreSQL 8.2 or later is in use. As a fallback approach, the 

42sequence, whether specified explicitly or implicitly via ``SERIAL``, is 

43executed independently beforehand, the returned value to be used in the 

44subsequent insert. Note that when an 

45:func:`~sqlalchemy.sql.expression.insert()` construct is executed using 

46"executemany" semantics, the "last inserted identifier" functionality does not 

47apply; no RETURNING clause is emitted nor is the sequence pre-executed in this 

48case. 

49 

50 

51PostgreSQL 10 and above IDENTITY columns 

52^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

53 

54PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use 

55of SERIAL. The :class:`_schema.Identity` construct in a 

56:class:`_schema.Column` can be used to control its behavior:: 

57 

58 from sqlalchemy import Table, Column, MetaData, Integer, Computed 

59 

60 metadata = MetaData() 

61 

62 data = Table( 

63 "data", 

64 metadata, 

65 Column( 

66 "id", Integer, Identity(start=42, cycle=True), primary_key=True 

67 ), 

68 Column("data", String), 

69 ) 

70 

71The CREATE TABLE for the above :class:`_schema.Table` object would be: 

72 

73.. sourcecode:: sql 

74 

75 CREATE TABLE data ( 

76 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE), 

77 data VARCHAR, 

78 PRIMARY KEY (id) 

79 ) 

80 

81.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct 

82 in a :class:`_schema.Column` to specify the option of an autoincrementing 

83 column. 

84 

85.. note:: 

86 

87 Previous versions of SQLAlchemy did not have built-in support for rendering 

88 of IDENTITY, and could use the following compilation hook to replace 

89 occurrences of SERIAL with IDENTITY:: 

90 

91 from sqlalchemy.schema import CreateColumn 

92 from sqlalchemy.ext.compiler import compiles 

93 

94 

95 @compiles(CreateColumn, "postgresql") 

96 def use_identity(element, compiler, **kw): 

97 text = compiler.visit_create_column(element, **kw) 

98 text = text.replace("SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY") 

99 return text 

100 

101 Using the above, a table such as:: 

102 

103 t = Table( 

104 "t", m, Column("id", Integer, primary_key=True), Column("data", String) 

105 ) 

106 

107 Will generate on the backing database as: 

108 

109 .. sourcecode:: sql 

110 

111 CREATE TABLE t ( 

112 id INT GENERATED BY DEFAULT AS IDENTITY, 

113 data VARCHAR, 

114 PRIMARY KEY (id) 

115 ) 

116 

117.. _postgresql_ss_cursors: 

118 

119Server Side Cursors 

120------------------- 

121 

122Server-side cursor support is available for the psycopg2, asyncpg 

123dialects and may also be available in others. 

124 

125Server side cursors are enabled on a per-statement basis by using the 

126:paramref:`.Connection.execution_options.stream_results` connection execution 

127option:: 

128 

129 with engine.connect() as conn: 

130 result = conn.execution_options(stream_results=True).execute( 

131 text("select * from table") 

132 ) 

133 

134Note that some kinds of SQL statements may not be supported with 

135server side cursors; generally, only SQL statements that return rows should be 

136used with this option. 

137 

138.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated 

139 and will be removed in a future release. Please use the 

140 :paramref:`_engine.Connection.stream_results` execution option for 

141 unbuffered cursor support. 

142 

143.. seealso:: 

144 

145 :ref:`engine_stream_results` 

146 

147.. _postgresql_isolation_level: 

148 

149Transaction Isolation Level 

150--------------------------- 

151 

152Most SQLAlchemy dialects support setting of transaction isolation level 

153using the :paramref:`_sa.create_engine.isolation_level` parameter 

154at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection` 

155level via the :paramref:`.Connection.execution_options.isolation_level` 

156parameter. 

157 

158For PostgreSQL dialects, this feature works either by making use of the 

159DBAPI-specific features, such as psycopg2's isolation level flags which will 

160embed the isolation level setting inline with the ``"BEGIN"`` statement, or for 

161DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS 

162TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement 

163emitted by the DBAPI. For the special AUTOCOMMIT isolation level, 

164DBAPI-specific techniques are used which is typically an ``.autocommit`` 

165flag on the DBAPI connection object. 

166 

167To set isolation level using :func:`_sa.create_engine`:: 

168 

169 engine = create_engine( 

170 "postgresql+pg8000://scott:tiger@localhost/test", 

171 isolation_level="REPEATABLE READ", 

172 ) 

173 

174To set using per-connection execution options:: 

175 

176 with engine.connect() as conn: 

177 conn = conn.execution_options(isolation_level="REPEATABLE READ") 

178 with conn.begin(): 

179 ... # work with transaction 

180 

181There are also more options for isolation level configurations, such as 

182"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply 

183different isolation level settings. See the discussion at 

184:ref:`dbapi_autocommit` for background. 

185 

186Valid values for ``isolation_level`` on most PostgreSQL dialects include: 

187 

188* ``READ COMMITTED`` 

189* ``READ UNCOMMITTED`` 

190* ``REPEATABLE READ`` 

191* ``SERIALIZABLE`` 

192* ``AUTOCOMMIT`` 

193 

194.. seealso:: 

195 

196 :ref:`dbapi_autocommit` 

197 

198 :ref:`postgresql_readonly_deferrable` 

199 

200 :ref:`psycopg2_isolation_level` 

201 

202 :ref:`pg8000_isolation_level` 

203 

204.. _postgresql_readonly_deferrable: 

205 

206Setting READ ONLY / DEFERRABLE 

207------------------------------ 

208 

209Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE" 

210characteristics of the transaction, which is in addition to the isolation level 

211setting. These two attributes can be established either in conjunction with or 

212independently of the isolation level by passing the ``postgresql_readonly`` and 

213``postgresql_deferrable`` flags with 

214:meth:`_engine.Connection.execution_options`. The example below illustrates 

215passing the ``"SERIALIZABLE"`` isolation level at the same time as setting 

216"READ ONLY" and "DEFERRABLE":: 

217 

218 with engine.connect() as conn: 

219 conn = conn.execution_options( 

220 isolation_level="SERIALIZABLE", 

221 postgresql_readonly=True, 

222 postgresql_deferrable=True, 

223 ) 

224 with conn.begin(): 

225 ... # work with transaction 

226 

227Note that some DBAPIs such as asyncpg only support "readonly" with 

228SERIALIZABLE isolation. 

229 

230.. versionadded:: 1.4 added support for the ``postgresql_readonly`` 

231 and ``postgresql_deferrable`` execution options. 

232 

233.. _postgresql_reset_on_return: 

234 

235Temporary Table / Resource Reset for Connection Pooling 

236------------------------------------------------------- 

237 

238The :class:`.QueuePool` connection pool implementation used 

239by the SQLAlchemy :class:`.Engine` object includes 

240:ref:`reset on return <pool_reset_on_return>` behavior that will invoke 

241the DBAPI ``.rollback()`` method when connections are returned to the pool. 

242While this rollback will clear out the immediate state used by the previous 

243transaction, it does not cover a wider range of session-level state, including 

244temporary tables as well as other server state such as prepared statement 

245handles and statement caches. The PostgreSQL database includes a variety 

246of commands which may be used to reset this state, including 

247``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``. 

248 

249 

250To install 

251one or more of these commands as the means of performing reset-on-return, 

252the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated 

253in the example below. The implementation 

254will end transactions in progress as well as discard temporary tables 

255using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL 

256documentation for background on what each of these statements do. 

257 

258The :paramref:`_sa.create_engine.pool_reset_on_return` parameter 

259is set to ``None`` so that the custom scheme can replace the default behavior 

260completely. The custom hook implementation calls ``.rollback()`` in any case, 

261as it's usually important that the DBAPI's own tracking of commit/rollback 

262will remain consistent with the state of the transaction:: 

263 

264 

265 from sqlalchemy import create_engine 

266 from sqlalchemy import event 

267 

268 postgresql_engine = create_engine( 

269 "postgresql+psycopg2://scott:tiger@hostname/dbname", 

270 # disable default reset-on-return scheme 

271 pool_reset_on_return=None, 

272 ) 

273 

274 

275 @event.listens_for(postgresql_engine, "reset") 

276 def _reset_postgresql(dbapi_connection, connection_record, reset_state): 

277 if not reset_state.terminate_only: 

278 dbapi_connection.execute("CLOSE ALL") 

279 dbapi_connection.execute("RESET ALL") 

280 dbapi_connection.execute("DISCARD TEMP") 

281 

282 # so that the DBAPI itself knows that the connection has been 

283 # reset 

284 dbapi_connection.rollback() 

285 

286.. versionchanged:: 2.0.0b3 Added additional state arguments to 

287 the :meth:`.PoolEvents.reset` event and additionally ensured the event 

288 is invoked for all "reset" occurrences, so that it's appropriate 

289 as a place for custom "reset" handlers. Previous schemes which 

290 use the :meth:`.PoolEvents.checkin` handler remain usable as well. 

291 

292.. seealso:: 

293 

294 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation 

295 

296.. _postgresql_alternate_search_path: 

297 

298Setting Alternate Search Paths on Connect 

299------------------------------------------ 

300 

301The PostgreSQL ``search_path`` variable refers to the list of schema names 

302that will be implicitly referenced when a particular table or other 

303object is referenced in a SQL statement. As detailed in the next section 

304:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around 

305the concept of keeping this variable at its default value of ``public``, 

306however, in order to have it set to any arbitrary name or names when connections 

307are used automatically, the "SET SESSION search_path" command may be invoked 

308for all connections in a pool using the following event handler, as discussed 

309at :ref:`schema_set_default_connections`:: 

310 

311 from sqlalchemy import event 

312 from sqlalchemy import create_engine 

313 

314 engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname") 

315 

316 

317 @event.listens_for(engine, "connect", insert=True) 

318 def set_search_path(dbapi_connection, connection_record): 

319 existing_autocommit = dbapi_connection.autocommit 

320 dbapi_connection.autocommit = True 

321 cursor = dbapi_connection.cursor() 

322 cursor.execute("SET SESSION search_path='%s'" % schema_name) 

323 cursor.close() 

324 dbapi_connection.autocommit = existing_autocommit 

325 

326The reason the recipe is complicated by use of the ``.autocommit`` DBAPI 

327attribute is so that when the ``SET SESSION search_path`` directive is invoked, 

328it is invoked outside of the scope of any transaction and therefore will not 

329be reverted when the DBAPI connection has a rollback. 

330 

331.. seealso:: 

332 

333 :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation 

334 

335.. _postgresql_schema_reflection: 

336 

337Remote-Schema Table Introspection and PostgreSQL search_path 

338------------------------------------------------------------ 

339 

340.. admonition:: Section Best Practices Summarized 

341 

342 keep the ``search_path`` variable set to its default of ``public``, without 

343 any other schema names. Ensure the username used to connect **does not** 

344 match remote schemas, or ensure the ``"$user"`` token is **removed** from 

345 ``search_path``. For other schema names, name these explicitly 

346 within :class:`_schema.Table` definitions. Alternatively, the 

347 ``postgresql_ignore_search_path`` option will cause all reflected 

348 :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema` 

349 attribute set up. 

350 

351The PostgreSQL dialect can reflect tables from any schema, as outlined in 

352:ref:`metadata_reflection_schemas`. 

353 

354In all cases, the first thing SQLAlchemy does when reflecting tables is 

355to **determine the default schema for the current database connection**. 

356It does this using the PostgreSQL ``current_schema()`` 

357function, illustated below using a PostgreSQL client session (i.e. using 

358the ``psql`` tool): 

359 

360.. sourcecode:: sql 

361 

362 test=> select current_schema(); 

363 current_schema 

364 ---------------- 

365 public 

366 (1 row) 

367 

368Above we see that on a plain install of PostgreSQL, the default schema name 

369is the name ``public``. 

370 

371However, if your database username **matches the name of a schema**, PostgreSQL's 

372default is to then **use that name as the default schema**. Below, we log in 

373using the username ``scott``. When we create a schema named ``scott``, **it 

374implicitly changes the default schema**: 

375 

376.. sourcecode:: sql 

377 

378 test=> select current_schema(); 

379 current_schema 

380 ---------------- 

381 public 

382 (1 row) 

383 

384 test=> create schema scott; 

385 CREATE SCHEMA 

386 test=> select current_schema(); 

387 current_schema 

388 ---------------- 

389 scott 

390 (1 row) 

391 

392The behavior of ``current_schema()`` is derived from the 

393`PostgreSQL search path 

394<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_ 

395variable ``search_path``, which in modern PostgreSQL versions defaults to this: 

396 

397.. sourcecode:: sql 

398 

399 test=> show search_path; 

400 search_path 

401 ----------------- 

402 "$user", public 

403 (1 row) 

404 

405Where above, the ``"$user"`` variable will inject the current username as the 

406default schema, if one exists. Otherwise, ``public`` is used. 

407 

408When a :class:`_schema.Table` object is reflected, if it is present in the 

409schema indicated by the ``current_schema()`` function, **the schema name assigned 

410to the ".schema" attribute of the Table is the Python "None" value**. Otherwise, the 

411".schema" attribute will be assigned the string name of that schema. 

412 

413With regards to tables which these :class:`_schema.Table` 

414objects refer to via foreign key constraint, a decision must be made as to how 

415the ``.schema`` is represented in those remote tables, in the case where that 

416remote schema name is also a member of the current ``search_path``. 

417 

418By default, the PostgreSQL dialect mimics the behavior encouraged by 

419PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function 

420returns a sample definition for a particular foreign key constraint, 

421omitting the referenced schema name from that definition when the name is 

422also in the PostgreSQL schema search path. The interaction below 

423illustrates this behavior: 

424 

425.. sourcecode:: sql 

426 

427 test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY); 

428 CREATE TABLE 

429 test=> CREATE TABLE referring( 

430 test(> id INTEGER PRIMARY KEY, 

431 test(> referred_id INTEGER REFERENCES test_schema.referred(id)); 

432 CREATE TABLE 

433 test=> SET search_path TO public, test_schema; 

434 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM 

435 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n 

436 test-> ON n.oid = c.relnamespace 

437 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid 

438 test-> WHERE c.relname='referring' AND r.contype = 'f' 

439 test-> ; 

440 pg_get_constraintdef 

441 --------------------------------------------------- 

442 FOREIGN KEY (referred_id) REFERENCES referred(id) 

443 (1 row) 

444 

445Above, we created a table ``referred`` as a member of the remote schema 

446``test_schema``, however when we added ``test_schema`` to the 

447PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the 

448``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of 

449the function. 

450 

451On the other hand, if we set the search path back to the typical default 

452of ``public``: 

453 

454.. sourcecode:: sql 

455 

456 test=> SET search_path TO public; 

457 SET 

458 

459The same query against ``pg_get_constraintdef()`` now returns the fully 

460schema-qualified name for us: 

461 

462.. sourcecode:: sql 

463 

464 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM 

465 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n 

466 test-> ON n.oid = c.relnamespace 

467 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid 

468 test-> WHERE c.relname='referring' AND r.contype = 'f'; 

469 pg_get_constraintdef 

470 --------------------------------------------------------------- 

471 FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id) 

472 (1 row) 

473 

474SQLAlchemy will by default use the return value of ``pg_get_constraintdef()`` 

475in order to determine the remote schema name. That is, if our ``search_path`` 

476were set to include ``test_schema``, and we invoked a table 

477reflection process as follows:: 

478 

479 >>> from sqlalchemy import Table, MetaData, create_engine, text 

480 >>> engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

481 >>> with engine.connect() as conn: 

482 ... conn.execute(text("SET search_path TO test_schema, public")) 

483 ... metadata_obj = MetaData() 

484 ... referring = Table("referring", metadata_obj, autoload_with=conn) 

485 <sqlalchemy.engine.result.CursorResult object at 0x101612ed0> 

486 

487The above process would deliver to the :attr:`_schema.MetaData.tables` 

488collection 

489``referred`` table named **without** the schema:: 

490 

491 >>> metadata_obj.tables["referred"].schema is None 

492 True 

493 

494To alter the behavior of reflection such that the referred schema is 

495maintained regardless of the ``search_path`` setting, use the 

496``postgresql_ignore_search_path`` option, which can be specified as a 

497dialect-specific argument to both :class:`_schema.Table` as well as 

498:meth:`_schema.MetaData.reflect`:: 

499 

500 >>> with engine.connect() as conn: 

501 ... conn.execute(text("SET search_path TO test_schema, public")) 

502 ... metadata_obj = MetaData() 

503 ... referring = Table( 

504 ... "referring", 

505 ... metadata_obj, 

506 ... autoload_with=conn, 

507 ... postgresql_ignore_search_path=True, 

508 ... ) 

509 <sqlalchemy.engine.result.CursorResult object at 0x1016126d0> 

510 

511We will now have ``test_schema.referred`` stored as schema-qualified:: 

512 

513 >>> metadata_obj.tables["test_schema.referred"].schema 

514 'test_schema' 

515 

516.. sidebar:: Best Practices for PostgreSQL Schema reflection 

517 

518 The description of PostgreSQL schema reflection behavior is complex, and 

519 is the product of many years of dealing with widely varied use cases and 

520 user preferences. But in fact, there's no need to understand any of it if 

521 you just stick to the simplest use pattern: leave the ``search_path`` set 

522 to its default of ``public`` only, never refer to the name ``public`` as 

523 an explicit schema name otherwise, and refer to all other schema names 

524 explicitly when building up a :class:`_schema.Table` object. The options 

525 described here are only for those users who can't, or prefer not to, stay 

526 within these guidelines. 

527 

528.. seealso:: 

529 

530 :ref:`reflection_schema_qualified_interaction` - discussion of the issue 

531 from a backend-agnostic perspective 

532 

533 `The Schema Search Path 

534 <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_ 

535 - on the PostgreSQL website. 

536 

537INSERT/UPDATE...RETURNING 

538------------------------- 

539 

540The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and 

541``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default 

542for single-row INSERT statements in order to fetch newly generated 

543primary key identifiers. To specify an explicit ``RETURNING`` clause, 

544use the :meth:`._UpdateBase.returning` method on a per-statement basis:: 

545 

546 # INSERT..RETURNING 

547 result = ( 

548 table.insert().returning(table.c.col1, table.c.col2).values(name="foo") 

549 ) 

550 print(result.fetchall()) 

551 

552 # UPDATE..RETURNING 

553 result = ( 

554 table.update() 

555 .returning(table.c.col1, table.c.col2) 

556 .where(table.c.name == "foo") 

557 .values(name="bar") 

558 ) 

559 print(result.fetchall()) 

560 

561 # DELETE..RETURNING 

562 result = ( 

563 table.delete() 

564 .returning(table.c.col1, table.c.col2) 

565 .where(table.c.name == "foo") 

566 ) 

567 print(result.fetchall()) 

568 

569.. _postgresql_insert_on_conflict: 

570 

571INSERT...ON CONFLICT (Upsert) 

572------------------------------ 

573 

574Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of 

575rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A 

576candidate row will only be inserted if that row does not violate any unique 

577constraints. In the case of a unique constraint violation, a secondary action 

578can occur which can be either "DO UPDATE", indicating that the data in the 

579target row should be updated, or "DO NOTHING", which indicates to silently skip 

580this row. 

581 

582Conflicts are determined using existing unique constraints and indexes. These 

583constraints may be identified either using their name as stated in DDL, 

584or they may be inferred by stating the columns and conditions that comprise 

585the indexes. 

586 

587SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific 

588:func:`_postgresql.insert()` function, which provides 

589the generative methods :meth:`_postgresql.Insert.on_conflict_do_update` 

590and :meth:`~.postgresql.Insert.on_conflict_do_nothing`: 

591 

592.. sourcecode:: pycon+sql 

593 

594 >>> from sqlalchemy.dialects.postgresql import insert 

595 >>> insert_stmt = insert(my_table).values( 

596 ... id="some_existing_id", data="inserted value" 

597 ... ) 

598 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"]) 

599 >>> print(do_nothing_stmt) 

600 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

601 ON CONFLICT (id) DO NOTHING 

602 {stop} 

603 

604 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

605 ... constraint="pk_my_table", set_=dict(data="updated value") 

606 ... ) 

607 >>> print(do_update_stmt) 

608 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

609 ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s 

610 

611.. seealso:: 

612 

613 `INSERT .. ON CONFLICT 

614 <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_ 

615 - in the PostgreSQL documentation. 

616 

617Specifying the Target 

618^^^^^^^^^^^^^^^^^^^^^ 

619 

620Both methods supply the "target" of the conflict using either the 

621named constraint or by column inference: 

622 

623* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument 

624 specifies a sequence containing string column names, :class:`_schema.Column` 

625 objects, and/or SQL expression elements, which would identify a unique 

626 index: 

627 

628 .. sourcecode:: pycon+sql 

629 

630 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

631 ... index_elements=["id"], set_=dict(data="updated value") 

632 ... ) 

633 >>> print(do_update_stmt) 

634 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

635 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

636 {stop} 

637 

638 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

639 ... index_elements=[my_table.c.id], set_=dict(data="updated value") 

640 ... ) 

641 >>> print(do_update_stmt) 

642 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

643 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

644 

645* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to 

646 infer an index, a partial index can be inferred by also specifying the 

647 use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter: 

648 

649 .. sourcecode:: pycon+sql 

650 

651 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data") 

652 >>> stmt = stmt.on_conflict_do_update( 

653 ... index_elements=[my_table.c.user_email], 

654 ... index_where=my_table.c.user_email.like("%@gmail.com"), 

655 ... set_=dict(data=stmt.excluded.data), 

656 ... ) 

657 >>> print(stmt) 

658 {printsql}INSERT INTO my_table (data, user_email) 

659 VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email) 

660 WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data 

661 

662* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is 

663 used to specify an index directly rather than inferring it. This can be 

664 the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX: 

665 

666 .. sourcecode:: pycon+sql 

667 

668 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

669 ... constraint="my_table_idx_1", set_=dict(data="updated value") 

670 ... ) 

671 >>> print(do_update_stmt) 

672 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

673 ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s 

674 {stop} 

675 

676 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

677 ... constraint="my_table_pk", set_=dict(data="updated value") 

678 ... ) 

679 >>> print(do_update_stmt) 

680 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

681 ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s 

682 {stop} 

683 

684* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may 

685 also refer to a SQLAlchemy construct representing a constraint, 

686 e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`, 

687 :class:`.Index`, or :class:`.ExcludeConstraint`. In this use, 

688 if the constraint has a name, it is used directly. Otherwise, if the 

689 constraint is unnamed, then inference will be used, where the expressions 

690 and optional WHERE clause of the constraint will be spelled out in the 

691 construct. This use is especially convenient 

692 to refer to the named or unnamed primary key of a :class:`_schema.Table` 

693 using the 

694 :attr:`_schema.Table.primary_key` attribute: 

695 

696 .. sourcecode:: pycon+sql 

697 

698 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

699 ... constraint=my_table.primary_key, set_=dict(data="updated value") 

700 ... ) 

701 >>> print(do_update_stmt) 

702 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

703 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

704 

705The SET Clause 

706^^^^^^^^^^^^^^^ 

707 

708``ON CONFLICT...DO UPDATE`` is used to perform an update of the already 

709existing row, using any combination of new values as well as values 

710from the proposed insertion. These values are specified using the 

711:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This 

712parameter accepts a dictionary which consists of direct values 

713for UPDATE: 

714 

715.. sourcecode:: pycon+sql 

716 

717 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

718 >>> do_update_stmt = stmt.on_conflict_do_update( 

719 ... index_elements=["id"], set_=dict(data="updated value") 

720 ... ) 

721 >>> print(do_update_stmt) 

722 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

723 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

724 

725.. warning:: 

726 

727 The :meth:`_expression.Insert.on_conflict_do_update` 

728 method does **not** take into 

729 account Python-side default UPDATE values or generation functions, e.g. 

730 those specified using :paramref:`_schema.Column.onupdate`. 

731 These values will not be exercised for an ON CONFLICT style of UPDATE, 

732 unless they are manually specified in the 

733 :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary. 

734 

735Updating using the Excluded INSERT Values 

736^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

737 

738In order to refer to the proposed insertion row, the special alias 

739:attr:`~.postgresql.Insert.excluded` is available as an attribute on 

740the :class:`_postgresql.Insert` object; this object is a 

741:class:`_expression.ColumnCollection` 

742which alias contains all columns of the target 

743table: 

744 

745.. sourcecode:: pycon+sql 

746 

747 >>> stmt = insert(my_table).values( 

748 ... id="some_id", data="inserted value", author="jlh" 

749 ... ) 

750 >>> do_update_stmt = stmt.on_conflict_do_update( 

751 ... index_elements=["id"], 

752 ... set_=dict(data="updated value", author=stmt.excluded.author), 

753 ... ) 

754 >>> print(do_update_stmt) 

755 {printsql}INSERT INTO my_table (id, data, author) 

756 VALUES (%(id)s, %(data)s, %(author)s) 

757 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author 

758 

759Additional WHERE Criteria 

760^^^^^^^^^^^^^^^^^^^^^^^^^ 

761 

762The :meth:`_expression.Insert.on_conflict_do_update` method also accepts 

763a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where` 

764parameter, which will limit those rows which receive an UPDATE: 

765 

766.. sourcecode:: pycon+sql 

767 

768 >>> stmt = insert(my_table).values( 

769 ... id="some_id", data="inserted value", author="jlh" 

770 ... ) 

771 >>> on_update_stmt = stmt.on_conflict_do_update( 

772 ... index_elements=["id"], 

773 ... set_=dict(data="updated value", author=stmt.excluded.author), 

774 ... where=(my_table.c.status == 2), 

775 ... ) 

776 >>> print(on_update_stmt) 

777 {printsql}INSERT INTO my_table (id, data, author) 

778 VALUES (%(id)s, %(data)s, %(author)s) 

779 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author 

780 WHERE my_table.status = %(status_1)s 

781 

782Skipping Rows with DO NOTHING 

783^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

784 

785``ON CONFLICT`` may be used to skip inserting a row entirely 

786if any conflict with a unique or exclusion constraint occurs; below 

787this is illustrated using the 

788:meth:`~.postgresql.Insert.on_conflict_do_nothing` method: 

789 

790.. sourcecode:: pycon+sql 

791 

792 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

793 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) 

794 >>> print(stmt) 

795 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

796 ON CONFLICT (id) DO NOTHING 

797 

798If ``DO NOTHING`` is used without specifying any columns or constraint, 

799it has the effect of skipping the INSERT for any unique or exclusion 

800constraint violation which occurs: 

801 

802.. sourcecode:: pycon+sql 

803 

804 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

805 >>> stmt = stmt.on_conflict_do_nothing() 

806 >>> print(stmt) 

807 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

808 ON CONFLICT DO NOTHING 

809 

810.. _postgresql_match: 

811 

812Full Text Search 

813---------------- 

814 

815PostgreSQL's full text search system is available through the use of the 

816:data:`.func` namespace, combined with the use of custom operators 

817via the :meth:`.Operators.bool_op` method. For simple cases with some 

818degree of cross-backend compatibility, the :meth:`.Operators.match` operator 

819may also be used. 

820 

821.. _postgresql_simple_match: 

822 

823Simple plain text matching with ``match()`` 

824^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

825 

826The :meth:`.Operators.match` operator provides for cross-compatible simple 

827text matching. For the PostgreSQL backend, it's hardcoded to generate 

828an expression using the ``@@`` operator in conjunction with the 

829``plainto_tsquery()`` PostgreSQL function. 

830 

831On the PostgreSQL dialect, an expression like the following:: 

832 

833 select(sometable.c.text.match("search string")) 

834 

835would emit to the database: 

836 

837.. sourcecode:: sql 

838 

839 SELECT text @@ plainto_tsquery('search string') FROM table 

840 

841Above, passing a plain string to :meth:`.Operators.match` will automatically 

842make use of ``plainto_tsquery()`` to specify the type of tsquery. This 

843establishes basic database cross-compatibility for :meth:`.Operators.match` 

844with other backends. 

845 

846.. versionchanged:: 2.0 The default tsquery generation function used by the 

847 PostgreSQL dialect with :meth:`.Operators.match` is ``plainto_tsquery()``. 

848 

849 To render exactly what was rendered in 1.4, use the following form:: 

850 

851 from sqlalchemy import func 

852 

853 select(sometable.c.text.bool_op("@@")(func.to_tsquery("search string"))) 

854 

855 Which would emit: 

856 

857 .. sourcecode:: sql 

858 

859 SELECT text @@ to_tsquery('search string') FROM table 

860 

861Using PostgreSQL full text functions and operators directly 

862^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

863 

864Text search operations beyond the simple use of :meth:`.Operators.match` 

865may make use of the :data:`.func` namespace to generate PostgreSQL full-text 

866functions, in combination with :meth:`.Operators.bool_op` to generate 

867any boolean operator. 

868 

869For example, the query:: 

870 

871 select(func.to_tsquery("cat").bool_op("@>")(func.to_tsquery("cat & rat"))) 

872 

873would generate: 

874 

875.. sourcecode:: sql 

876 

877 SELECT to_tsquery('cat') @> to_tsquery('cat & rat') 

878 

879 

880The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST:: 

881 

882 from sqlalchemy.dialects.postgresql import TSVECTOR 

883 from sqlalchemy import select, cast 

884 

885 select(cast("some text", TSVECTOR)) 

886 

887produces a statement equivalent to: 

888 

889.. sourcecode:: sql 

890 

891 SELECT CAST('some text' AS TSVECTOR) AS anon_1 

892 

893The ``func`` namespace is augmented by the PostgreSQL dialect to set up 

894correct argument and return types for most full text search functions. 

895These functions are used automatically by the :attr:`_sql.func` namespace 

896assuming the ``sqlalchemy.dialects.postgresql`` package has been imported, 

897or :func:`_sa.create_engine` has been invoked using a ``postgresql`` 

898dialect. These functions are documented at: 

899 

900* :class:`_postgresql.to_tsvector` 

901* :class:`_postgresql.to_tsquery` 

902* :class:`_postgresql.plainto_tsquery` 

903* :class:`_postgresql.phraseto_tsquery` 

904* :class:`_postgresql.websearch_to_tsquery` 

905* :class:`_postgresql.ts_headline` 

906 

907Specifying the "regconfig" with ``match()`` or custom operators 

908^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

909 

910PostgreSQL's ``plainto_tsquery()`` function accepts an optional 

911"regconfig" argument that is used to instruct PostgreSQL to use a 

912particular pre-computed GIN or GiST index in order to perform the search. 

913When using :meth:`.Operators.match`, this additional parameter may be 

914specified using the ``postgresql_regconfig`` parameter, such as:: 

915 

916 select(mytable.c.id).where( 

917 mytable.c.title.match("somestring", postgresql_regconfig="english") 

918 ) 

919 

920Which would emit: 

921 

922.. sourcecode:: sql 

923 

924 SELECT mytable.id FROM mytable 

925 WHERE mytable.title @@ plainto_tsquery('english', 'somestring') 

926 

927When using other PostgreSQL search functions with :data:`.func`, the 

928"regconfig" parameter may be passed directly as the initial argument:: 

929 

930 select(mytable.c.id).where( 

931 func.to_tsvector("english", mytable.c.title).bool_op("@@")( 

932 func.to_tsquery("english", "somestring") 

933 ) 

934 ) 

935 

936produces a statement equivalent to: 

937 

938.. sourcecode:: sql 

939 

940 SELECT mytable.id FROM mytable 

941 WHERE to_tsvector('english', mytable.title) @@ 

942 to_tsquery('english', 'somestring') 

943 

944It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from 

945PostgreSQL to ensure that you are generating queries with SQLAlchemy that 

946take full advantage of any indexes you may have created for full text search. 

947 

948.. seealso:: 

949 

950 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation 

951 

952 

953FROM ONLY ... 

954------------- 

955 

956The dialect supports PostgreSQL's ONLY keyword for targeting only a particular 

957table in an inheritance hierarchy. This can be used to produce the 

958``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...`` 

959syntaxes. It uses SQLAlchemy's hints mechanism:: 

960 

961 # SELECT ... FROM ONLY ... 

962 result = table.select().with_hint(table, "ONLY", "postgresql") 

963 print(result.fetchall()) 

964 

965 # UPDATE ONLY ... 

966 table.update(values=dict(foo="bar")).with_hint( 

967 "ONLY", dialect_name="postgresql" 

968 ) 

969 

970 # DELETE FROM ONLY ... 

971 table.delete().with_hint("ONLY", dialect_name="postgresql") 

972 

973.. _postgresql_indexes: 

974 

975PostgreSQL-Specific Index Options 

976--------------------------------- 

977 

978Several extensions to the :class:`.Index` construct are available, specific 

979to the PostgreSQL dialect. 

980 

981.. _postgresql_covering_indexes: 

982 

983Covering Indexes 

984^^^^^^^^^^^^^^^^ 

985 

986A covering index includes additional columns that are not part of the index key 

987but are stored in the index, allowing PostgreSQL to satisfy queries using only 

988the index without accessing the table (an "index-only scan"). This is 

989indicated on the index using the ``INCLUDE`` clause. The 

990``postgresql_include`` option for :class:`.Index` (as well as 

991:class:`.UniqueConstraint`) renders ``INCLUDE(colname)`` for the given string 

992names:: 

993 

994 Index("my_index", table.c.x, postgresql_include=["y"]) 

995 

996would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` 

997 

998Note that this feature requires PostgreSQL 11 or later. 

999 

1000.. seealso:: 

1001 

1002 :ref:`postgresql_constraint_options_include` - the same feature implemented 

1003 for :class:`.UniqueConstraint` 

1004 

1005.. versionadded:: 1.4 - support for covering indexes with :class:`.Index`. 

1006 support for :class:`.UniqueConstraint` was in 2.0.41 

1007 

1008.. _postgresql_partial_indexes: 

1009 

1010Partial Indexes 

1011^^^^^^^^^^^^^^^ 

1012 

1013Partial indexes add criterion to the index definition so that the index is 

1014applied to a subset of rows. These can be specified on :class:`.Index` 

1015using the ``postgresql_where`` keyword argument:: 

1016 

1017 Index("my_index", my_table.c.id, postgresql_where=my_table.c.value > 10) 

1018 

1019.. _postgresql_operator_classes: 

1020 

1021Operator Classes 

1022^^^^^^^^^^^^^^^^ 

1023 

1024PostgreSQL allows the specification of an *operator class* for each column of 

1025an index (see 

1026https://www.postgresql.org/docs/current/interactive/indexes-opclass.html). 

1027The :class:`.Index` construct allows these to be specified via the 

1028``postgresql_ops`` keyword argument:: 

1029 

1030 Index( 

1031 "my_index", 

1032 my_table.c.id, 

1033 my_table.c.data, 

1034 postgresql_ops={"data": "text_pattern_ops", "id": "int4_ops"}, 

1035 ) 

1036 

1037Note that the keys in the ``postgresql_ops`` dictionaries are the 

1038"key" name of the :class:`_schema.Column`, i.e. the name used to access it from 

1039the ``.c`` collection of :class:`_schema.Table`, which can be configured to be 

1040different than the actual name of the column as expressed in the database. 

1041 

1042If ``postgresql_ops`` is to be used against a complex SQL expression such 

1043as a function call, then to apply to the column it must be given a label 

1044that is identified in the dictionary by name, e.g.:: 

1045 

1046 Index( 

1047 "my_index", 

1048 my_table.c.id, 

1049 func.lower(my_table.c.data).label("data_lower"), 

1050 postgresql_ops={"data_lower": "text_pattern_ops", "id": "int4_ops"}, 

1051 ) 

1052 

1053Operator classes are also supported by the 

1054:class:`_postgresql.ExcludeConstraint` construct using the 

1055:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for 

1056details. 

1057 

1058.. versionadded:: 1.3.21 added support for operator classes with 

1059 :class:`_postgresql.ExcludeConstraint`. 

1060 

1061 

1062Index Types 

1063^^^^^^^^^^^ 

1064 

1065PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well 

1066as the ability for users to create their own (see 

1067https://www.postgresql.org/docs/current/static/indexes-types.html). These can be 

1068specified on :class:`.Index` using the ``postgresql_using`` keyword argument:: 

1069 

1070 Index("my_index", my_table.c.data, postgresql_using="gin") 

1071 

1072The value passed to the keyword argument will be simply passed through to the 

1073underlying CREATE INDEX command, so it *must* be a valid index type for your 

1074version of PostgreSQL. 

1075 

1076.. _postgresql_index_storage: 

1077 

1078Index Storage Parameters 

1079^^^^^^^^^^^^^^^^^^^^^^^^ 

1080 

1081PostgreSQL allows storage parameters to be set on indexes. The storage 

1082parameters available depend on the index method used by the index. Storage 

1083parameters can be specified on :class:`.Index` using the ``postgresql_with`` 

1084keyword argument:: 

1085 

1086 Index("my_index", my_table.c.data, postgresql_with={"fillfactor": 50}) 

1087 

1088PostgreSQL allows to define the tablespace in which to create the index. 

1089The tablespace can be specified on :class:`.Index` using the 

1090``postgresql_tablespace`` keyword argument:: 

1091 

1092 Index("my_index", my_table.c.data, postgresql_tablespace="my_tablespace") 

1093 

1094Note that the same option is available on :class:`_schema.Table` as well. 

1095 

1096.. _postgresql_index_concurrently: 

1097 

1098Indexes with CONCURRENTLY 

1099^^^^^^^^^^^^^^^^^^^^^^^^^ 

1100 

1101The PostgreSQL index option CONCURRENTLY is supported by passing the 

1102flag ``postgresql_concurrently`` to the :class:`.Index` construct:: 

1103 

1104 tbl = Table("testtbl", m, Column("data", Integer)) 

1105 

1106 idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True) 

1107 

1108The above index construct will render DDL for CREATE INDEX, assuming 

1109PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as: 

1110 

1111.. sourcecode:: sql 

1112 

1113 CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data) 

1114 

1115For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for 

1116a connection-less dialect, it will emit: 

1117 

1118.. sourcecode:: sql 

1119 

1120 DROP INDEX CONCURRENTLY test_idx1 

1121 

1122When using CONCURRENTLY, the PostgreSQL database requires that the statement 

1123be invoked outside of a transaction block. The Python DBAPI enforces that 

1124even for a single statement, a transaction is present, so to use this 

1125construct, the DBAPI's "autocommit" mode must be used:: 

1126 

1127 metadata = MetaData() 

1128 table = Table("foo", metadata, Column("id", String)) 

1129 index = Index("foo_idx", table.c.id, postgresql_concurrently=True) 

1130 

1131 with engine.connect() as conn: 

1132 with conn.execution_options(isolation_level="AUTOCOMMIT"): 

1133 table.create(conn) 

1134 

1135.. seealso:: 

1136 

1137 :ref:`postgresql_isolation_level` 

1138 

1139.. _postgresql_index_reflection: 

1140 

1141PostgreSQL Index Reflection 

1142--------------------------- 

1143 

1144The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the 

1145UNIQUE CONSTRAINT construct is used. When inspecting a table using 

1146:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes` 

1147and the :meth:`_reflection.Inspector.get_unique_constraints` 

1148will report on these 

1149two constructs distinctly; in the case of the index, the key 

1150``duplicates_constraint`` will be present in the index entry if it is 

1151detected as mirroring a constraint. When performing reflection using 

1152``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned 

1153in :attr:`_schema.Table.indexes` when it is detected as mirroring a 

1154:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection 

1155. 

1156 

1157Special Reflection Options 

1158-------------------------- 

1159 

1160The :class:`_reflection.Inspector` 

1161used for the PostgreSQL backend is an instance 

1162of :class:`.PGInspector`, which offers additional methods:: 

1163 

1164 from sqlalchemy import create_engine, inspect 

1165 

1166 engine = create_engine("postgresql+psycopg2://localhost/test") 

1167 insp = inspect(engine) # will be a PGInspector 

1168 

1169 print(insp.get_enums()) 

1170 

1171.. autoclass:: PGInspector 

1172 :members: 

1173 

1174.. _postgresql_table_options: 

1175 

1176PostgreSQL Table Options 

1177------------------------ 

1178 

1179Several options for CREATE TABLE are supported directly by the PostgreSQL 

1180dialect in conjunction with the :class:`_schema.Table` construct, listed in 

1181the following sections. 

1182 

1183.. seealso:: 

1184 

1185 `PostgreSQL CREATE TABLE options 

1186 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ - 

1187 in the PostgreSQL documentation. 

1188 

1189``INHERITS`` 

1190^^^^^^^^^^^^ 

1191 

1192Specifies one or more parent tables from which this table inherits columns and 

1193constraints, enabling table inheritance hierarchies in PostgreSQL. 

1194 

1195:: 

1196 

1197 Table("some_table", metadata, ..., postgresql_inherits="some_supertable") 

1198 

1199 Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...)) 

1200 

1201For schema-qualified parent table names, use :class:`.quoted_name` with 

1202``quote=False`` to prevent the dotted name from being quoted as a single 

1203identifier:: 

1204 

1205 from sqlalchemy.sql import quoted_name 

1206 

1207 Table( 

1208 "some_table", 

1209 metadata, 

1210 ..., 

1211 postgresql_inherits=quoted_name( 

1212 "my_schema.some_supertable", quote=False 

1213 ), 

1214 ) 

1215 

1216``ON COMMIT`` 

1217^^^^^^^^^^^^^ 

1218 

1219Controls the behavior of temporary tables at transaction commit, with options 

1220to preserve rows, delete rows, or drop the table. 

1221 

1222:: 

1223 

1224 Table("some_table", metadata, ..., postgresql_on_commit="PRESERVE ROWS") 

1225 

1226``PARTITION BY`` 

1227^^^^^^^^^^^^^^^^ 

1228 

1229Declares the table as a partitioned table using the specified partitioning 

1230strategy (RANGE, LIST, or HASH) on the given column(s). 

1231 

1232:: 

1233 

1234 Table( 

1235 "some_table", 

1236 metadata, 

1237 ..., 

1238 postgresql_partition_by="LIST (part_column)", 

1239 ) 

1240 

1241``TABLESPACE`` 

1242^^^^^^^^^^^^^^ 

1243 

1244Specifies the tablespace where the table will be stored, allowing control over 

1245the physical location of table data on disk. 

1246 

1247:: 

1248 

1249 Table("some_table", metadata, ..., postgresql_tablespace="some_tablespace") 

1250 

1251The above option is also available on the :class:`.Index` construct. 

1252 

1253``USING`` 

1254^^^^^^^^^ 

1255 

1256Specifies the table access method to use for storing table data, such as 

1257``heap`` (the default) or other custom access methods. 

1258 

1259:: 

1260 

1261 Table("some_table", metadata, ..., postgresql_using="heap") 

1262 

1263.. versionadded:: 2.0.26 

1264 

1265``WITH OIDS`` 

1266^^^^^^^^^^^^^ 

1267 

1268Enables the legacy OID (object identifier) system column for the table, which 

1269assigns a unique identifier to each row. 

1270 

1271:: 

1272 

1273 Table("some_table", metadata, ..., postgresql_with_oids=True) 

1274 

1275``WITHOUT OIDS`` 

1276^^^^^^^^^^^^^^^^ 

1277 

1278Explicitly disables the OID system column for the table (the default behavior 

1279in modern PostgreSQL versions). 

1280 

1281:: 

1282 

1283 Table("some_table", metadata, ..., postgresql_with_oids=False) 

1284 

1285.. _postgresql_constraint_options: 

1286 

1287PostgreSQL Constraint Options 

1288----------------------------- 

1289 

1290The following sections indicate options which are supported by the PostgreSQL 

1291dialect in conjunction with selected constraint constructs. 

1292 

1293 

1294``NOT VALID`` 

1295^^^^^^^^^^^^^ 

1296 

1297Allows a constraint to be added without validating existing rows, improving 

1298performance when adding constraints to large tables. This option applies 

1299towards CHECK and FOREIGN KEY constraints when the constraint is being added 

1300to an existing table via ALTER TABLE, and has the effect that existing rows 

1301are not scanned during the ALTER operation against the constraint being added. 

1302 

1303When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_ 

1304that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument 

1305may be specified as an additional keyword argument within the operation 

1306that creates the constraint, as in the following Alembic example:: 

1307 

1308 def update(): 

1309 op.create_foreign_key( 

1310 "fk_user_address", 

1311 "address", 

1312 "user", 

1313 ["user_id"], 

1314 ["id"], 

1315 postgresql_not_valid=True, 

1316 ) 

1317 

1318The keyword is ultimately accepted directly by the 

1319:class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint` 

1320and :class:`_schema.ForeignKey` constructs; when using a tool like 

1321Alembic, dialect-specific keyword arguments are passed through to 

1322these constructs from the migration operation directives:: 

1323 

1324 CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True) 

1325 

1326 ForeignKeyConstraint( 

1327 ["some_id"], ["some_table.some_id"], postgresql_not_valid=True 

1328 ) 

1329 

1330.. versionadded:: 1.4.32 

1331 

1332.. seealso:: 

1333 

1334 `PostgreSQL ALTER TABLE options 

1335 <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ - 

1336 in the PostgreSQL documentation. 

1337 

1338.. _postgresql_constraint_options_include: 

1339 

1340``INCLUDE`` 

1341^^^^^^^^^^^ 

1342 

1343This keyword is applicable to both a ``UNIQUE`` constraint as well as an 

1344``INDEX``. The ``postgresql_include`` option available for 

1345:class:`.UniqueConstraint` as well as :class:`.Index` creates a covering index 

1346by including additional columns in the underlying index without making them 

1347part of the key constraint. This option adds one or more columns as a "payload" 

1348to the index created automatically by PostgreSQL for the constraint. For 

1349example, the following table definition:: 

1350 

1351 Table( 

1352 "mytable", 

1353 metadata, 

1354 Column("id", Integer, nullable=False), 

1355 Column("value", Integer, nullable=False), 

1356 UniqueConstraint("id", postgresql_include=["value"]), 

1357 ) 

1358 

1359would produce the DDL statement 

1360 

1361.. sourcecode:: sql 

1362 

1363 CREATE TABLE mytable ( 

1364 id INTEGER NOT NULL, 

1365 value INTEGER NOT NULL, 

1366 UNIQUE (id) INCLUDE (value) 

1367 ) 

1368 

1369Note that this feature requires PostgreSQL 11 or later. 

1370 

1371.. versionadded:: 2.0.41 - added support for ``postgresql_include`` to 

1372 :class:`.UniqueConstraint`, to complement the existing feature in 

1373 :class:`.Index`. 

1374 

1375.. seealso:: 

1376 

1377 :ref:`postgresql_covering_indexes` - background on ``postgresql_include`` 

1378 for the :class:`.Index` construct. 

1379 

1380 

1381Column list with foreign key ``ON DELETE SET`` actions 

1382^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

1383 

1384Allows selective column updates when a foreign key action is triggered, limiting 

1385which columns are set to NULL or DEFAULT upon deletion of a referenced row. 

1386This applies to :class:`.ForeignKey` and :class:`.ForeignKeyConstraint`, the 

1387:paramref:`.ForeignKey.ondelete` parameter will accept on the PostgreSQL 

1388backend only a string list of column names inside parenthesis, following the 

1389``SET NULL`` or ``SET DEFAULT`` phrases, which will limit the set of columns 

1390that are subject to the action:: 

1391 

1392 fktable = Table( 

1393 "fktable", 

1394 metadata, 

1395 Column("tid", Integer), 

1396 Column("id", Integer), 

1397 Column("fk_id_del_set_null", Integer), 

1398 ForeignKeyConstraint( 

1399 columns=["tid", "fk_id_del_set_null"], 

1400 refcolumns=[pktable.c.tid, pktable.c.id], 

1401 ondelete="SET NULL (fk_id_del_set_null)", 

1402 ), 

1403 ) 

1404 

1405.. versionadded:: 2.0.40 

1406 

1407 

1408``NULLS NOT DISTINCT`` 

1409^^^^^^^^^^^^^^^^^^^^^^ 

1410 

1411By default, two ``null`` values are not considered equal for unique constraints 

1412and indexes. Therefore, seemingly duplicate rows may be stored if one of the 

1413values in the constraint is ``null``. This default behavior is implementation 

1414defined, so other SQL dialects may behave differently than PostgreSQL. 

1415 

1416The ``NULLS NOT DISTINCT`` clause can be used to change this behavior, treating 

1417null values as equal and preventing unintended duplicate rows. The opposite 

1418``NULLS DISTINCT`` clause can also be used to make PostgreSQL's default behavior 

1419explict. 

1420 

1421The ``postgresql_nulls_not_distinct`` parameter can be set to ``True`` to 

1422add the ``NULLS NOT DISTINCT`` clause, or ``False`` to add ``NULLS DISTINCT``. 

1423Not setting it, or passing ``None``, will not add a clause and keep the default 

1424behavior. 

1425 

1426This feature requires PostgreSQL 15 or later. 

1427 

1428.. versionadded:: 2.0.16 

1429 

1430 

1431.. _postgresql_table_valued_overview: 

1432 

1433Table values, Table and Column valued functions, Row and Tuple objects 

1434----------------------------------------------------------------------- 

1435 

1436PostgreSQL makes great use of modern SQL forms such as table-valued functions, 

1437tables and rows as values. These constructs are commonly used as part 

1438of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other 

1439datatypes. SQLAlchemy's SQL expression language has native support for 

1440most table-valued and row-valued forms. 

1441 

1442.. _postgresql_table_valued: 

1443 

1444Table-Valued Functions 

1445^^^^^^^^^^^^^^^^^^^^^^^ 

1446 

1447Many PostgreSQL built-in functions are intended to be used in the FROM clause 

1448of a SELECT statement, and are capable of returning table rows or sets of table 

1449rows. A large portion of PostgreSQL's JSON functions for example such as 

1450``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``, 

1451``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such 

1452forms. These classes of SQL function calling forms in SQLAlchemy are available 

1453using the :meth:`_functions.FunctionElement.table_valued` method in conjunction 

1454with :class:`_functions.Function` objects generated from the :data:`_sql.func` 

1455namespace. 

1456 

1457Examples from PostgreSQL's reference documentation follow below: 

1458 

1459* ``json_each()``: 

1460 

1461 .. sourcecode:: pycon+sql 

1462 

1463 >>> from sqlalchemy import select, func 

1464 >>> stmt = select( 

1465 ... func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value") 

1466 ... ) 

1467 >>> print(stmt) 

1468 {printsql}SELECT anon_1.key, anon_1.value 

1469 FROM json_each(:json_each_1) AS anon_1 

1470 

1471* ``json_populate_record()``: 

1472 

1473 .. sourcecode:: pycon+sql 

1474 

1475 >>> from sqlalchemy import select, func, literal_column 

1476 >>> stmt = select( 

1477 ... func.json_populate_record( 

1478 ... literal_column("null::myrowtype"), '{"a":1,"b":2}' 

1479 ... ).table_valued("a", "b", name="x") 

1480 ... ) 

1481 >>> print(stmt) 

1482 {printsql}SELECT x.a, x.b 

1483 FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x 

1484 

1485* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived 

1486 columns in the alias, where we may make use of :func:`_sql.column` elements with 

1487 types to produce them. The :meth:`_functions.FunctionElement.table_valued` 

1488 method produces a :class:`_sql.TableValuedAlias` construct, and the method 

1489 :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived 

1490 columns specification: 

1491 

1492 .. sourcecode:: pycon+sql 

1493 

1494 >>> from sqlalchemy import select, func, column, Integer, Text 

1495 >>> stmt = select( 

1496 ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}') 

1497 ... .table_valued( 

1498 ... column("a", Integer), 

1499 ... column("b", Text), 

1500 ... column("d", Text), 

1501 ... ) 

1502 ... .render_derived(name="x", with_types=True) 

1503 ... ) 

1504 >>> print(stmt) 

1505 {printsql}SELECT x.a, x.b, x.d 

1506 FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT) 

1507 

1508* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an 

1509 ordinal counter to the output of a function and is accepted by a limited set 

1510 of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The 

1511 :meth:`_functions.FunctionElement.table_valued` method accepts a keyword 

1512 parameter ``with_ordinality`` for this purpose, which accepts the string name 

1513 that will be applied to the "ordinality" column: 

1514 

1515 .. sourcecode:: pycon+sql 

1516 

1517 >>> from sqlalchemy import select, func 

1518 >>> stmt = select( 

1519 ... func.generate_series(4, 1, -1) 

1520 ... .table_valued("value", with_ordinality="ordinality") 

1521 ... .render_derived() 

1522 ... ) 

1523 >>> print(stmt) 

1524 {printsql}SELECT anon_1.value, anon_1.ordinality 

1525 FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3) 

1526 WITH ORDINALITY AS anon_1(value, ordinality) 

1527 

1528.. versionadded:: 1.4.0b2 

1529 

1530.. seealso:: 

1531 

1532 :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial` 

1533 

1534.. _postgresql_column_valued: 

1535 

1536Column Valued Functions 

1537^^^^^^^^^^^^^^^^^^^^^^^ 

1538 

1539Similar to the table valued function, a column valued function is present 

1540in the FROM clause, but delivers itself to the columns clause as a single 

1541scalar value. PostgreSQL functions such as ``json_array_elements()``, 

1542``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the 

1543:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`: 

1544 

1545* ``json_array_elements()``: 

1546 

1547 .. sourcecode:: pycon+sql 

1548 

1549 >>> from sqlalchemy import select, func 

1550 >>> stmt = select( 

1551 ... func.json_array_elements('["one", "two"]').column_valued("x") 

1552 ... ) 

1553 >>> print(stmt) 

1554 {printsql}SELECT x 

1555 FROM json_array_elements(:json_array_elements_1) AS x 

1556 

1557* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the 

1558 :func:`_postgresql.array` construct may be used: 

1559 

1560 .. sourcecode:: pycon+sql 

1561 

1562 >>> from sqlalchemy.dialects.postgresql import array 

1563 >>> from sqlalchemy import select, func 

1564 >>> stmt = select(func.unnest(array([1, 2])).column_valued()) 

1565 >>> print(stmt) 

1566 {printsql}SELECT anon_1 

1567 FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1 

1568 

1569 The function can of course be used against an existing table-bound column 

1570 that's of type :class:`_types.ARRAY`: 

1571 

1572 .. sourcecode:: pycon+sql 

1573 

1574 >>> from sqlalchemy import table, column, ARRAY, Integer 

1575 >>> from sqlalchemy import select, func 

1576 >>> t = table("t", column("value", ARRAY(Integer))) 

1577 >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value")) 

1578 >>> print(stmt) 

1579 {printsql}SELECT unnested_value 

1580 FROM unnest(t.value) AS unnested_value 

1581 

1582.. seealso:: 

1583 

1584 :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial` 

1585 

1586 

1587Row Types 

1588^^^^^^^^^ 

1589 

1590Built-in support for rendering a ``ROW`` may be approximated using 

1591``func.ROW`` with the :attr:`_sa.func` namespace, or by using the 

1592:func:`_sql.tuple_` construct: 

1593 

1594.. sourcecode:: pycon+sql 

1595 

1596 >>> from sqlalchemy import table, column, func, tuple_ 

1597 >>> t = table("t", column("id"), column("fk")) 

1598 >>> stmt = ( 

1599 ... t.select() 

1600 ... .where(tuple_(t.c.id, t.c.fk) > (1, 2)) 

1601 ... .where(func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7)) 

1602 ... ) 

1603 >>> print(stmt) 

1604 {printsql}SELECT t.id, t.fk 

1605 FROM t 

1606 WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2) 

1607 

1608.. seealso:: 

1609 

1610 `PostgreSQL Row Constructors 

1611 <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_ 

1612 

1613 `PostgreSQL Row Constructor Comparison 

1614 <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_ 

1615 

1616Table Types passed to Functions 

1617^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

1618 

1619PostgreSQL supports passing a table as an argument to a function, which is 

1620known as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects 

1621such as :class:`_schema.Table` support this special form using the 

1622:meth:`_sql.FromClause.table_valued` method, which is comparable to the 

1623:meth:`_functions.FunctionElement.table_valued` method except that the collection 

1624of columns is already established by that of the :class:`_sql.FromClause` 

1625itself: 

1626 

1627.. sourcecode:: pycon+sql 

1628 

1629 >>> from sqlalchemy import table, column, func, select 

1630 >>> a = table("a", column("id"), column("x"), column("y")) 

1631 >>> stmt = select(func.row_to_json(a.table_valued())) 

1632 >>> print(stmt) 

1633 {printsql}SELECT row_to_json(a) AS row_to_json_1 

1634 FROM a 

1635 

1636.. versionadded:: 1.4.0b2 

1637 

1638 

1639 

1640""" # noqa: E501 

1641 

1642from __future__ import annotations 

1643 

1644from collections import defaultdict 

1645from functools import lru_cache 

1646import re 

1647from typing import Any 

1648from typing import cast 

1649from typing import Dict 

1650from typing import List 

1651from typing import Optional 

1652from typing import Tuple 

1653from typing import TYPE_CHECKING 

1654from typing import Union 

1655 

1656from . import arraylib as _array 

1657from . import json as _json 

1658from . import pg_catalog 

1659from . import ranges as _ranges 

1660from .ext import _regconfig_fn 

1661from .ext import aggregate_order_by 

1662from .hstore import HSTORE 

1663from .named_types import CreateDomainType as CreateDomainType # noqa: F401 

1664from .named_types import CreateEnumType as CreateEnumType # noqa: F401 

1665from .named_types import DOMAIN as DOMAIN # noqa: F401 

1666from .named_types import DropDomainType as DropDomainType # noqa: F401 

1667from .named_types import DropEnumType as DropEnumType # noqa: F401 

1668from .named_types import ENUM as ENUM # noqa: F401 

1669from .named_types import NamedType as NamedType # noqa: F401 

1670from .types import _DECIMAL_TYPES # noqa: F401 

1671from .types import _FLOAT_TYPES # noqa: F401 

1672from .types import _INT_TYPES # noqa: F401 

1673from .types import BIT as BIT 

1674from .types import BYTEA as BYTEA 

1675from .types import CIDR as CIDR 

1676from .types import CITEXT as CITEXT 

1677from .types import INET as INET 

1678from .types import INTERVAL as INTERVAL 

1679from .types import MACADDR as MACADDR 

1680from .types import MACADDR8 as MACADDR8 

1681from .types import MONEY as MONEY 

1682from .types import OID as OID 

1683from .types import PGBit as PGBit # noqa: F401 

1684from .types import PGCidr as PGCidr # noqa: F401 

1685from .types import PGInet as PGInet # noqa: F401 

1686from .types import PGInterval as PGInterval # noqa: F401 

1687from .types import PGMacAddr as PGMacAddr # noqa: F401 

1688from .types import PGMacAddr8 as PGMacAddr8 # noqa: F401 

1689from .types import PGUuid as PGUuid 

1690from .types import REGCLASS as REGCLASS 

1691from .types import REGCONFIG as REGCONFIG # noqa: F401 

1692from .types import TIME as TIME 

1693from .types import TIMESTAMP as TIMESTAMP 

1694from .types import TSVECTOR as TSVECTOR 

1695from ... import exc 

1696from ... import schema 

1697from ... import select 

1698from ... import sql 

1699from ... import util 

1700from ...engine import characteristics 

1701from ...engine import default 

1702from ...engine import interfaces 

1703from ...engine import ObjectKind 

1704from ...engine import ObjectScope 

1705from ...engine import reflection 

1706from ...engine import URL 

1707from ...engine.reflection import ReflectionDefaults 

1708from ...sql import bindparam 

1709from ...sql import coercions 

1710from ...sql import compiler 

1711from ...sql import elements 

1712from ...sql import expression 

1713from ...sql import functions 

1714from ...sql import roles 

1715from ...sql import sqltypes 

1716from ...sql import util as sql_util 

1717from ...sql.compiler import InsertmanyvaluesSentinelOpts 

1718from ...sql.visitors import InternalTraversal 

1719from ...types import BIGINT 

1720from ...types import BOOLEAN 

1721from ...types import CHAR 

1722from ...types import DATE 

1723from ...types import DOUBLE_PRECISION 

1724from ...types import FLOAT 

1725from ...types import INTEGER 

1726from ...types import NUMERIC 

1727from ...types import REAL 

1728from ...types import SMALLINT 

1729from ...types import TEXT 

1730from ...types import UUID as UUID 

1731from ...types import VARCHAR 

1732from ...util.typing import TypedDict 

1733 

1734IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I) 

1735 

1736RESERVED_WORDS = { 

1737 "all", 

1738 "analyse", 

1739 "analyze", 

1740 "and", 

1741 "any", 

1742 "array", 

1743 "as", 

1744 "asc", 

1745 "asymmetric", 

1746 "both", 

1747 "case", 

1748 "cast", 

1749 "check", 

1750 "collate", 

1751 "column", 

1752 "constraint", 

1753 "create", 

1754 "current_catalog", 

1755 "current_date", 

1756 "current_role", 

1757 "current_time", 

1758 "current_timestamp", 

1759 "current_user", 

1760 "default", 

1761 "deferrable", 

1762 "desc", 

1763 "distinct", 

1764 "do", 

1765 "else", 

1766 "end", 

1767 "except", 

1768 "false", 

1769 "fetch", 

1770 "for", 

1771 "foreign", 

1772 "from", 

1773 "grant", 

1774 "group", 

1775 "having", 

1776 "in", 

1777 "initially", 

1778 "intersect", 

1779 "into", 

1780 "leading", 

1781 "limit", 

1782 "localtime", 

1783 "localtimestamp", 

1784 "new", 

1785 "not", 

1786 "null", 

1787 "of", 

1788 "off", 

1789 "offset", 

1790 "old", 

1791 "on", 

1792 "only", 

1793 "or", 

1794 "order", 

1795 "placing", 

1796 "primary", 

1797 "references", 

1798 "returning", 

1799 "select", 

1800 "session_user", 

1801 "some", 

1802 "symmetric", 

1803 "table", 

1804 "then", 

1805 "to", 

1806 "trailing", 

1807 "true", 

1808 "union", 

1809 "unique", 

1810 "user", 

1811 "using", 

1812 "variadic", 

1813 "when", 

1814 "where", 

1815 "window", 

1816 "with", 

1817 "authorization", 

1818 "between", 

1819 "binary", 

1820 "cross", 

1821 "current_schema", 

1822 "freeze", 

1823 "full", 

1824 "ilike", 

1825 "inner", 

1826 "is", 

1827 "isnull", 

1828 "join", 

1829 "left", 

1830 "like", 

1831 "natural", 

1832 "notnull", 

1833 "outer", 

1834 "over", 

1835 "overlaps", 

1836 "right", 

1837 "similar", 

1838 "verbose", 

1839} 

1840 

1841 

1842colspecs = { 

1843 sqltypes.ARRAY: _array.ARRAY, 

1844 sqltypes.Interval: INTERVAL, 

1845 sqltypes.Enum: ENUM, 

1846 sqltypes.JSON.JSONPathType: _json.JSONPATH, 

1847 sqltypes.JSON: _json.JSON, 

1848 sqltypes.Uuid: PGUuid, 

1849} 

1850 

1851 

1852ischema_names = { 

1853 "_array": _array.ARRAY, 

1854 "hstore": HSTORE, 

1855 "json": _json.JSON, 

1856 "jsonb": _json.JSONB, 

1857 "int4range": _ranges.INT4RANGE, 

1858 "int8range": _ranges.INT8RANGE, 

1859 "numrange": _ranges.NUMRANGE, 

1860 "daterange": _ranges.DATERANGE, 

1861 "tsrange": _ranges.TSRANGE, 

1862 "tstzrange": _ranges.TSTZRANGE, 

1863 "int4multirange": _ranges.INT4MULTIRANGE, 

1864 "int8multirange": _ranges.INT8MULTIRANGE, 

1865 "nummultirange": _ranges.NUMMULTIRANGE, 

1866 "datemultirange": _ranges.DATEMULTIRANGE, 

1867 "tsmultirange": _ranges.TSMULTIRANGE, 

1868 "tstzmultirange": _ranges.TSTZMULTIRANGE, 

1869 "integer": INTEGER, 

1870 "bigint": BIGINT, 

1871 "smallint": SMALLINT, 

1872 "character varying": VARCHAR, 

1873 "character": CHAR, 

1874 '"char"': sqltypes.String, 

1875 "name": sqltypes.String, 

1876 "text": TEXT, 

1877 "numeric": NUMERIC, 

1878 "float": FLOAT, 

1879 "real": REAL, 

1880 "inet": INET, 

1881 "cidr": CIDR, 

1882 "citext": CITEXT, 

1883 "uuid": UUID, 

1884 "bit": BIT, 

1885 "bit varying": BIT, 

1886 "macaddr": MACADDR, 

1887 "macaddr8": MACADDR8, 

1888 "money": MONEY, 

1889 "oid": OID, 

1890 "regclass": REGCLASS, 

1891 "double precision": DOUBLE_PRECISION, 

1892 "timestamp": TIMESTAMP, 

1893 "timestamp with time zone": TIMESTAMP, 

1894 "timestamp without time zone": TIMESTAMP, 

1895 "time with time zone": TIME, 

1896 "time without time zone": TIME, 

1897 "date": DATE, 

1898 "time": TIME, 

1899 "bytea": BYTEA, 

1900 "boolean": BOOLEAN, 

1901 "interval": INTERVAL, 

1902 "tsvector": TSVECTOR, 

1903} 

1904 

1905 

1906class PGCompiler(compiler.SQLCompiler): 

1907 def visit_to_tsvector_func(self, element, **kw): 

1908 return self._assert_pg_ts_ext(element, **kw) 

1909 

1910 def visit_to_tsquery_func(self, element, **kw): 

1911 return self._assert_pg_ts_ext(element, **kw) 

1912 

1913 def visit_plainto_tsquery_func(self, element, **kw): 

1914 return self._assert_pg_ts_ext(element, **kw) 

1915 

1916 def visit_phraseto_tsquery_func(self, element, **kw): 

1917 return self._assert_pg_ts_ext(element, **kw) 

1918 

1919 def visit_websearch_to_tsquery_func(self, element, **kw): 

1920 return self._assert_pg_ts_ext(element, **kw) 

1921 

1922 def visit_ts_headline_func(self, element, **kw): 

1923 return self._assert_pg_ts_ext(element, **kw) 

1924 

1925 def _assert_pg_ts_ext(self, element, **kw): 

1926 if not isinstance(element, _regconfig_fn): 

1927 # other options here include trying to rewrite the function 

1928 # with the correct types. however, that means we have to 

1929 # "un-SQL-ize" the first argument, which can't work in a 

1930 # generalized way. Also, parent compiler class has already added 

1931 # the incorrect return type to the result map. So let's just 

1932 # make sure the function we want is used up front. 

1933 

1934 raise exc.CompileError( 

1935 f'Can\'t compile "{element.name}()" full text search ' 

1936 f"function construct that does not originate from the " 

1937 f'"sqlalchemy.dialects.postgresql" package. ' 

1938 f'Please ensure "import sqlalchemy.dialects.postgresql" is ' 

1939 f"called before constructing " 

1940 f'"sqlalchemy.func.{element.name}()" to ensure registration ' 

1941 f"of the correct argument and return types." 

1942 ) 

1943 

1944 return f"{element.name}{self.function_argspec(element, **kw)}" 

1945 

1946 def render_bind_cast(self, type_, dbapi_type, sqltext): 

1947 if dbapi_type._type_affinity is sqltypes.String and dbapi_type.length: 

1948 # use VARCHAR with no length for VARCHAR cast. 

1949 # see #9511 

1950 dbapi_type = sqltypes.STRINGTYPE 

1951 return f"""{sqltext}::{ 

1952 self.dialect.type_compiler_instance.process( 

1953 dbapi_type, identifier_preparer=self.preparer 

1954 ) 

1955 }""" 

1956 

1957 def visit_array(self, element, **kw): 

1958 if not element.clauses and not element.type.item_type._isnull: 

1959 return "ARRAY[]::%s" % element.type.compile(self.dialect) 

1960 return "ARRAY[%s]" % self.visit_clauselist(element, **kw) 

1961 

1962 def visit_slice(self, element, **kw): 

1963 return "%s:%s" % ( 

1964 self.process(element.start, **kw), 

1965 self.process(element.stop, **kw), 

1966 ) 

1967 

1968 def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 

1969 return self._generate_generic_binary(binary, " # ", **kw) 

1970 

1971 def visit_json_getitem_op_binary( 

1972 self, binary, operator, _cast_applied=False, **kw 

1973 ): 

1974 if ( 

1975 not _cast_applied 

1976 and binary.type._type_affinity is not sqltypes.JSON 

1977 ): 

1978 kw["_cast_applied"] = True 

1979 return self.process(sql.cast(binary, binary.type), **kw) 

1980 

1981 kw["eager_grouping"] = True 

1982 

1983 if ( 

1984 not _cast_applied 

1985 and isinstance(binary.left.type, _json.JSONB) 

1986 and self.dialect._supports_jsonb_subscripting 

1987 ): 

1988 left = binary.left 

1989 if isinstance(left, (functions.FunctionElement, elements.Cast)): 

1990 left = elements.Grouping(left) 

1991 

1992 # for pg14+JSONB use subscript notation: col['key'] instead 

1993 # of col -> 'key' 

1994 return "%s[%s]" % ( 

1995 self.process(left, **kw), 

1996 self.process(binary.right, **kw), 

1997 ) 

1998 else: 

1999 # Fall back to arrow notation for older versions or when cast 

2000 # is applied 

2001 return self._generate_generic_binary( 

2002 binary, " -> " if not _cast_applied else " ->> ", **kw 

2003 ) 

2004 

2005 def visit_json_path_getitem_op_binary( 

2006 self, binary, operator, _cast_applied=False, **kw 

2007 ): 

2008 if ( 

2009 not _cast_applied 

2010 and binary.type._type_affinity is not sqltypes.JSON 

2011 ): 

2012 kw["_cast_applied"] = True 

2013 return self.process(sql.cast(binary, binary.type), **kw) 

2014 

2015 kw["eager_grouping"] = True 

2016 return self._generate_generic_binary( 

2017 binary, " #> " if not _cast_applied else " #>> ", **kw 

2018 ) 

2019 

2020 def visit_getitem_binary(self, binary, operator, **kw): 

2021 return "%s[%s]" % ( 

2022 self.process(binary.left, **kw), 

2023 self.process(binary.right, **kw), 

2024 ) 

2025 

2026 def visit_aggregate_order_by(self, element, **kw): 

2027 return "%s ORDER BY %s" % ( 

2028 self.process(element.target, **kw), 

2029 self.process(element.order_by, **kw), 

2030 ) 

2031 

2032 def visit_match_op_binary(self, binary, operator, **kw): 

2033 if "postgresql_regconfig" in binary.modifiers: 

2034 regconfig = self.render_literal_value( 

2035 binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE 

2036 ) 

2037 if regconfig: 

2038 return "%s @@ plainto_tsquery(%s, %s)" % ( 

2039 self.process(binary.left, **kw), 

2040 regconfig, 

2041 self.process(binary.right, **kw), 

2042 ) 

2043 return "%s @@ plainto_tsquery(%s)" % ( 

2044 self.process(binary.left, **kw), 

2045 self.process(binary.right, **kw), 

2046 ) 

2047 

2048 def visit_ilike_case_insensitive_operand(self, element, **kw): 

2049 return element.element._compiler_dispatch(self, **kw) 

2050 

2051 def visit_ilike_op_binary(self, binary, operator, **kw): 

2052 escape = binary.modifiers.get("escape", None) 

2053 

2054 return "%s ILIKE %s" % ( 

2055 self.process(binary.left, **kw), 

2056 self.process(binary.right, **kw), 

2057 ) + ( 

2058 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

2059 if escape is not None 

2060 else "" 

2061 ) 

2062 

2063 def visit_not_ilike_op_binary(self, binary, operator, **kw): 

2064 escape = binary.modifiers.get("escape", None) 

2065 return "%s NOT ILIKE %s" % ( 

2066 self.process(binary.left, **kw), 

2067 self.process(binary.right, **kw), 

2068 ) + ( 

2069 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

2070 if escape is not None 

2071 else "" 

2072 ) 

2073 

2074 def _regexp_match(self, base_op, binary, operator, kw): 

2075 flags = binary.modifiers["flags"] 

2076 if flags is None: 

2077 return self._generate_generic_binary( 

2078 binary, " %s " % base_op, **kw 

2079 ) 

2080 if flags == "i": 

2081 return self._generate_generic_binary( 

2082 binary, " %s* " % base_op, **kw 

2083 ) 

2084 return "%s %s CONCAT('(?', %s, ')', %s)" % ( 

2085 self.process(binary.left, **kw), 

2086 base_op, 

2087 self.render_literal_value(flags, sqltypes.STRINGTYPE), 

2088 self.process(binary.right, **kw), 

2089 ) 

2090 

2091 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

2092 return self._regexp_match("~", binary, operator, kw) 

2093 

2094 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

2095 return self._regexp_match("!~", binary, operator, kw) 

2096 

2097 def visit_regexp_replace_op_binary(self, binary, operator, **kw): 

2098 string = self.process(binary.left, **kw) 

2099 pattern_replace = self.process(binary.right, **kw) 

2100 flags = binary.modifiers["flags"] 

2101 if flags is None: 

2102 return "REGEXP_REPLACE(%s, %s)" % ( 

2103 string, 

2104 pattern_replace, 

2105 ) 

2106 else: 

2107 return "REGEXP_REPLACE(%s, %s, %s)" % ( 

2108 string, 

2109 pattern_replace, 

2110 self.render_literal_value(flags, sqltypes.STRINGTYPE), 

2111 ) 

2112 

2113 def visit_empty_set_expr(self, element_types, **kw): 

2114 # cast the empty set to the type we are comparing against. if 

2115 # we are comparing against the null type, pick an arbitrary 

2116 # datatype for the empty set 

2117 return "SELECT %s WHERE 1!=1" % ( 

2118 ", ".join( 

2119 "CAST(NULL AS %s)" 

2120 % self.dialect.type_compiler_instance.process( 

2121 INTEGER() if type_._isnull else type_ 

2122 ) 

2123 for type_ in element_types or [INTEGER()] 

2124 ), 

2125 ) 

2126 

2127 def render_literal_value(self, value, type_): 

2128 value = super().render_literal_value(value, type_) 

2129 

2130 if self.dialect._backslash_escapes: 

2131 value = value.replace("\\", "\\\\") 

2132 return value 

2133 

2134 def visit_aggregate_strings_func(self, fn, **kw): 

2135 return "string_agg%s" % self.function_argspec(fn) 

2136 

2137 def visit_sequence(self, seq, **kw): 

2138 return "nextval('%s')" % self.preparer.format_sequence(seq) 

2139 

2140 def limit_clause(self, select, **kw): 

2141 text = "" 

2142 if select._limit_clause is not None: 

2143 text += " \n LIMIT " + self.process(select._limit_clause, **kw) 

2144 if select._offset_clause is not None: 

2145 if select._limit_clause is None: 

2146 text += "\n LIMIT ALL" 

2147 text += " OFFSET " + self.process(select._offset_clause, **kw) 

2148 return text 

2149 

2150 def format_from_hint_text(self, sqltext, table, hint, iscrud): 

2151 if hint.upper() != "ONLY": 

2152 raise exc.CompileError("Unrecognized hint: %r" % hint) 

2153 return "ONLY " + sqltext 

2154 

2155 def get_select_precolumns(self, select, **kw): 

2156 # Do not call super().get_select_precolumns because 

2157 # it will warn/raise when distinct on is present 

2158 if select._distinct or select._distinct_on: 

2159 if select._distinct_on: 

2160 return ( 

2161 "DISTINCT ON (" 

2162 + ", ".join( 

2163 [ 

2164 self.process(col, **kw) 

2165 for col in select._distinct_on 

2166 ] 

2167 ) 

2168 + ") " 

2169 ) 

2170 else: 

2171 return "DISTINCT " 

2172 else: 

2173 return "" 

2174 

2175 def for_update_clause(self, select, **kw): 

2176 if select._for_update_arg.read: 

2177 if select._for_update_arg.key_share: 

2178 tmp = " FOR KEY SHARE" 

2179 else: 

2180 tmp = " FOR SHARE" 

2181 elif select._for_update_arg.key_share: 

2182 tmp = " FOR NO KEY UPDATE" 

2183 else: 

2184 tmp = " FOR UPDATE" 

2185 

2186 if select._for_update_arg.of: 

2187 tables = util.OrderedSet() 

2188 for c in select._for_update_arg.of: 

2189 tables.update(sql_util.surface_selectables_only(c)) 

2190 

2191 of_kw = dict(kw) 

2192 of_kw.update(ashint=True, use_schema=False) 

2193 tmp += " OF " + ", ".join( 

2194 self.process(table, **of_kw) for table in tables 

2195 ) 

2196 

2197 if select._for_update_arg.nowait: 

2198 tmp += " NOWAIT" 

2199 if select._for_update_arg.skip_locked: 

2200 tmp += " SKIP LOCKED" 

2201 

2202 return tmp 

2203 

2204 def visit_substring_func(self, func, **kw): 

2205 s = self.process(func.clauses.clauses[0], **kw) 

2206 start = self.process(func.clauses.clauses[1], **kw) 

2207 if len(func.clauses.clauses) > 2: 

2208 length = self.process(func.clauses.clauses[2], **kw) 

2209 return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length) 

2210 else: 

2211 return "SUBSTRING(%s FROM %s)" % (s, start) 

2212 

2213 def _on_conflict_target(self, clause, **kw): 

2214 if clause.constraint_target is not None: 

2215 # target may be a name of an Index, UniqueConstraint or 

2216 # ExcludeConstraint. While there is a separate 

2217 # "max_identifier_length" for indexes, PostgreSQL uses the same 

2218 # length for all objects so we can use 

2219 # truncate_and_render_constraint_name 

2220 target_text = ( 

2221 "ON CONSTRAINT %s" 

2222 % self.preparer.truncate_and_render_constraint_name( 

2223 clause.constraint_target 

2224 ) 

2225 ) 

2226 elif clause.inferred_target_elements is not None: 

2227 target_text = "(%s)" % ", ".join( 

2228 ( 

2229 self.preparer.quote(c) 

2230 if isinstance(c, str) 

2231 else self.process(c, include_table=False, use_schema=False) 

2232 ) 

2233 for c in clause.inferred_target_elements 

2234 ) 

2235 if clause.inferred_target_whereclause is not None: 

2236 whereclause_kw = dict(kw) 

2237 whereclause_kw.update(include_table=False, use_schema=False) 

2238 target_text += " WHERE %s" % self.process( 

2239 clause.inferred_target_whereclause, 

2240 **whereclause_kw, 

2241 ) 

2242 else: 

2243 target_text = "" 

2244 

2245 return target_text 

2246 

2247 def visit_on_conflict_do_nothing(self, on_conflict, **kw): 

2248 target_text = self._on_conflict_target(on_conflict, **kw) 

2249 

2250 if target_text: 

2251 return "ON CONFLICT %s DO NOTHING" % target_text 

2252 else: 

2253 return "ON CONFLICT DO NOTHING" 

2254 

2255 def visit_on_conflict_do_update(self, on_conflict, **kw): 

2256 clause = on_conflict 

2257 

2258 target_text = self._on_conflict_target(on_conflict, **kw) 

2259 

2260 action_set_ops = [] 

2261 

2262 set_parameters = dict(clause.update_values_to_set) 

2263 # create a list of column assignment clauses as tuples 

2264 

2265 insert_statement = self.stack[-1]["selectable"] 

2266 cols = insert_statement.table.c 

2267 set_kw = dict(kw) 

2268 set_kw.update(use_schema=False) 

2269 for c in cols: 

2270 col_key = c.key 

2271 

2272 if col_key in set_parameters: 

2273 value = set_parameters.pop(col_key) 

2274 elif c in set_parameters: 

2275 value = set_parameters.pop(c) 

2276 else: 

2277 continue 

2278 

2279 # TODO: this coercion should be up front. we can't cache 

2280 # SQL constructs with non-bound literals buried in them 

2281 if coercions._is_literal(value): 

2282 value = elements.BindParameter(None, value, type_=c.type) 

2283 

2284 else: 

2285 if ( 

2286 isinstance(value, elements.BindParameter) 

2287 and value.type._isnull 

2288 ): 

2289 value = value._clone() 

2290 value.type = c.type 

2291 value_text = self.process( 

2292 value.self_group(), is_upsert_set=True, **set_kw 

2293 ) 

2294 

2295 key_text = self.preparer.quote(c.name) 

2296 action_set_ops.append("%s = %s" % (key_text, value_text)) 

2297 

2298 # check for names that don't match columns 

2299 if set_parameters: 

2300 util.warn( 

2301 "Additional column names not matching " 

2302 "any column keys in table '%s': %s" 

2303 % ( 

2304 self.current_executable.table.name, 

2305 (", ".join("'%s'" % c for c in set_parameters)), 

2306 ) 

2307 ) 

2308 for k, v in set_parameters.items(): 

2309 key_text = ( 

2310 self.preparer.quote(k) 

2311 if isinstance(k, str) 

2312 else self.process(k, use_schema=False) 

2313 ) 

2314 value_text = self.process( 

2315 coercions.expect(roles.ExpressionElementRole, v), 

2316 is_upsert_set=True, 

2317 **set_kw, 

2318 ) 

2319 action_set_ops.append("%s = %s" % (key_text, value_text)) 

2320 

2321 action_text = ", ".join(action_set_ops) 

2322 if clause.update_whereclause is not None: 

2323 where_kw = dict(kw) 

2324 where_kw.update(include_table=True, use_schema=False) 

2325 action_text += " WHERE %s" % self.process( 

2326 clause.update_whereclause, **where_kw 

2327 ) 

2328 

2329 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) 

2330 

2331 def update_from_clause( 

2332 self, update_stmt, from_table, extra_froms, from_hints, **kw 

2333 ): 

2334 kw["asfrom"] = True 

2335 return "FROM " + ", ".join( 

2336 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

2337 for t in extra_froms 

2338 ) 

2339 

2340 def delete_extra_from_clause( 

2341 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

2342 ): 

2343 """Render the DELETE .. USING clause specific to PostgreSQL.""" 

2344 kw["asfrom"] = True 

2345 return "USING " + ", ".join( 

2346 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

2347 for t in extra_froms 

2348 ) 

2349 

2350 def fetch_clause(self, select, **kw): 

2351 # pg requires parens for non literal clauses. It's also required for 

2352 # bind parameters if a ::type casts is used by the driver (asyncpg), 

2353 # so it's easiest to just always add it 

2354 text = "" 

2355 if select._offset_clause is not None: 

2356 text += "\n OFFSET (%s) ROWS" % self.process( 

2357 select._offset_clause, **kw 

2358 ) 

2359 if select._fetch_clause is not None: 

2360 text += "\n FETCH FIRST (%s)%s ROWS %s" % ( 

2361 self.process(select._fetch_clause, **kw), 

2362 " PERCENT" if select._fetch_clause_options["percent"] else "", 

2363 ( 

2364 "WITH TIES" 

2365 if select._fetch_clause_options["with_ties"] 

2366 else "ONLY" 

2367 ), 

2368 ) 

2369 return text 

2370 

2371 

2372class PGDDLCompiler(compiler.DDLCompiler): 

2373 def get_column_specification(self, column, **kwargs): 

2374 colspec = self.preparer.format_column(column) 

2375 impl_type = column.type.dialect_impl(self.dialect) 

2376 if isinstance(impl_type, sqltypes.TypeDecorator): 

2377 impl_type = impl_type.impl 

2378 

2379 has_identity = ( 

2380 column.identity is not None 

2381 and self.dialect.supports_identity_columns 

2382 ) 

2383 

2384 if ( 

2385 column.primary_key 

2386 and column is column.table._autoincrement_column 

2387 and ( 

2388 self.dialect.supports_smallserial 

2389 or not isinstance(impl_type, sqltypes.SmallInteger) 

2390 ) 

2391 and not has_identity 

2392 and ( 

2393 column.default is None 

2394 or ( 

2395 isinstance(column.default, schema.Sequence) 

2396 and column.default.optional 

2397 ) 

2398 ) 

2399 ): 

2400 if isinstance(impl_type, sqltypes.BigInteger): 

2401 colspec += " BIGSERIAL" 

2402 elif isinstance(impl_type, sqltypes.SmallInteger): 

2403 colspec += " SMALLSERIAL" 

2404 else: 

2405 colspec += " SERIAL" 

2406 else: 

2407 colspec += " " + self.dialect.type_compiler_instance.process( 

2408 column.type, 

2409 type_expression=column, 

2410 identifier_preparer=self.preparer, 

2411 ) 

2412 default = self.get_column_default_string(column) 

2413 if default is not None: 

2414 colspec += " DEFAULT " + default 

2415 

2416 if column.computed is not None: 

2417 colspec += " " + self.process(column.computed) 

2418 if has_identity: 

2419 colspec += " " + self.process(column.identity) 

2420 

2421 if not column.nullable and not has_identity: 

2422 colspec += " NOT NULL" 

2423 elif column.nullable and has_identity: 

2424 colspec += " NULL" 

2425 return colspec 

2426 

2427 def _define_constraint_validity(self, constraint): 

2428 not_valid = constraint.dialect_options["postgresql"]["not_valid"] 

2429 return " NOT VALID" if not_valid else "" 

2430 

2431 def _define_include(self, obj): 

2432 includeclause = obj.dialect_options["postgresql"]["include"] 

2433 if not includeclause: 

2434 return "" 

2435 inclusions = [ 

2436 obj.table.c[col] if isinstance(col, str) else col 

2437 for col in includeclause 

2438 ] 

2439 return " INCLUDE (%s)" % ", ".join( 

2440 [self.preparer.quote(c.name) for c in inclusions] 

2441 ) 

2442 

2443 def visit_check_constraint(self, constraint, **kw): 

2444 if constraint._type_bound: 

2445 typ = list(constraint.columns)[0].type 

2446 if ( 

2447 isinstance(typ, sqltypes.ARRAY) 

2448 and isinstance(typ.item_type, sqltypes.Enum) 

2449 and not typ.item_type.native_enum 

2450 ): 

2451 raise exc.CompileError( 

2452 "PostgreSQL dialect cannot produce the CHECK constraint " 

2453 "for ARRAY of non-native ENUM; please specify " 

2454 "create_constraint=False on this Enum datatype." 

2455 ) 

2456 

2457 text = super().visit_check_constraint(constraint) 

2458 text += self._define_constraint_validity(constraint) 

2459 return text 

2460 

2461 def visit_foreign_key_constraint(self, constraint, **kw): 

2462 text = super().visit_foreign_key_constraint(constraint) 

2463 text += self._define_constraint_validity(constraint) 

2464 return text 

2465 

2466 def visit_primary_key_constraint(self, constraint, **kw): 

2467 text = self.define_constraint_preamble(constraint, **kw) 

2468 text += self.define_primary_key_body(constraint, **kw) 

2469 text += self._define_include(constraint) 

2470 text += self.define_constraint_deferrability(constraint) 

2471 return text 

2472 

2473 def visit_unique_constraint(self, constraint, **kw): 

2474 if len(constraint) == 0: 

2475 return "" 

2476 text = self.define_constraint_preamble(constraint, **kw) 

2477 text += self.define_unique_body(constraint, **kw) 

2478 text += self._define_include(constraint) 

2479 text += self.define_constraint_deferrability(constraint) 

2480 return text 

2481 

2482 @util.memoized_property 

2483 def _fk_ondelete_pattern(self): 

2484 return re.compile( 

2485 r"^(?:RESTRICT|CASCADE|SET (?:NULL|DEFAULT)(?:\s*\(.+\))?" 

2486 r"|NO ACTION)$", 

2487 re.I, 

2488 ) 

2489 

2490 def define_constraint_ondelete_cascade(self, constraint): 

2491 return " ON DELETE %s" % self.preparer.validate_sql_phrase( 

2492 constraint.ondelete, self._fk_ondelete_pattern 

2493 ) 

2494 

2495 def visit_create_enum_type(self, create, **kw): 

2496 type_ = create.element 

2497 

2498 return "CREATE TYPE %s AS ENUM (%s)" % ( 

2499 self.preparer.format_type(type_), 

2500 ", ".join( 

2501 self.sql_compiler.process(sql.literal(e), literal_binds=True) 

2502 for e in type_.enums 

2503 ), 

2504 ) 

2505 

2506 def visit_drop_enum_type(self, drop, **kw): 

2507 type_ = drop.element 

2508 

2509 return "DROP TYPE %s" % (self.preparer.format_type(type_)) 

2510 

2511 def visit_create_domain_type(self, create, **kw): 

2512 domain: DOMAIN = create.element 

2513 

2514 options = [] 

2515 if domain.collation is not None: 

2516 options.append(f"COLLATE {self.preparer.quote(domain.collation)}") 

2517 if domain.default is not None: 

2518 default = self.render_default_string(domain.default) 

2519 options.append(f"DEFAULT {default}") 

2520 if domain.constraint_name is not None: 

2521 name = self.preparer.truncate_and_render_constraint_name( 

2522 domain.constraint_name 

2523 ) 

2524 options.append(f"CONSTRAINT {name}") 

2525 if domain.not_null: 

2526 options.append("NOT NULL") 

2527 if domain.check is not None: 

2528 check = self.sql_compiler.process( 

2529 domain.check, include_table=False, literal_binds=True 

2530 ) 

2531 options.append(f"CHECK ({check})") 

2532 

2533 return ( 

2534 f"CREATE DOMAIN {self.preparer.format_type(domain)} AS " 

2535 f"{self.type_compiler.process(domain.data_type)} " 

2536 f"{' '.join(options)}" 

2537 ) 

2538 

2539 def visit_drop_domain_type(self, drop, **kw): 

2540 domain = drop.element 

2541 return f"DROP DOMAIN {self.preparer.format_type(domain)}" 

2542 

2543 def visit_create_index(self, create, **kw): 

2544 preparer = self.preparer 

2545 index = create.element 

2546 self._verify_index_table(index) 

2547 text = "CREATE " 

2548 if index.unique: 

2549 text += "UNIQUE " 

2550 

2551 text += "INDEX " 

2552 

2553 if self.dialect._supports_create_index_concurrently: 

2554 concurrently = index.dialect_options["postgresql"]["concurrently"] 

2555 if concurrently: 

2556 text += "CONCURRENTLY " 

2557 

2558 if create.if_not_exists: 

2559 text += "IF NOT EXISTS " 

2560 

2561 text += "%s ON %s " % ( 

2562 self._prepared_index_name(index, include_schema=False), 

2563 preparer.format_table(index.table), 

2564 ) 

2565 

2566 using = index.dialect_options["postgresql"]["using"] 

2567 if using: 

2568 text += ( 

2569 "USING %s " 

2570 % self.preparer.validate_sql_phrase(using, IDX_USING).lower() 

2571 ) 

2572 

2573 ops = index.dialect_options["postgresql"]["ops"] 

2574 text += "(%s)" % ( 

2575 ", ".join( 

2576 [ 

2577 self.sql_compiler.process( 

2578 ( 

2579 expr.self_group() 

2580 if not isinstance(expr, expression.ColumnClause) 

2581 else expr 

2582 ), 

2583 include_table=False, 

2584 literal_binds=True, 

2585 ) 

2586 + ( 

2587 (" " + ops[expr.key]) 

2588 if hasattr(expr, "key") and expr.key in ops 

2589 else "" 

2590 ) 

2591 for expr in index.expressions 

2592 ] 

2593 ) 

2594 ) 

2595 

2596 text += self._define_include(index) 

2597 

2598 nulls_not_distinct = index.dialect_options["postgresql"][ 

2599 "nulls_not_distinct" 

2600 ] 

2601 if nulls_not_distinct is True: 

2602 text += " NULLS NOT DISTINCT" 

2603 elif nulls_not_distinct is False: 

2604 text += " NULLS DISTINCT" 

2605 

2606 withclause = index.dialect_options["postgresql"]["with"] 

2607 if withclause: 

2608 text += " WITH (%s)" % ( 

2609 ", ".join( 

2610 [ 

2611 "%s = %s" % storage_parameter 

2612 for storage_parameter in withclause.items() 

2613 ] 

2614 ) 

2615 ) 

2616 

2617 tablespace_name = index.dialect_options["postgresql"]["tablespace"] 

2618 if tablespace_name: 

2619 text += " TABLESPACE %s" % preparer.quote(tablespace_name) 

2620 

2621 whereclause = index.dialect_options["postgresql"]["where"] 

2622 if whereclause is not None: 

2623 whereclause = coercions.expect( 

2624 roles.DDLExpressionRole, whereclause 

2625 ) 

2626 

2627 where_compiled = self.sql_compiler.process( 

2628 whereclause, include_table=False, literal_binds=True 

2629 ) 

2630 text += " WHERE " + where_compiled 

2631 

2632 return text 

2633 

2634 def define_unique_constraint_distinct(self, constraint, **kw): 

2635 nulls_not_distinct = constraint.dialect_options["postgresql"][ 

2636 "nulls_not_distinct" 

2637 ] 

2638 if nulls_not_distinct is True: 

2639 nulls_not_distinct_param = "NULLS NOT DISTINCT " 

2640 elif nulls_not_distinct is False: 

2641 nulls_not_distinct_param = "NULLS DISTINCT " 

2642 else: 

2643 nulls_not_distinct_param = "" 

2644 return nulls_not_distinct_param 

2645 

2646 def visit_drop_index(self, drop, **kw): 

2647 index = drop.element 

2648 

2649 text = "\nDROP INDEX " 

2650 

2651 if self.dialect._supports_drop_index_concurrently: 

2652 concurrently = index.dialect_options["postgresql"]["concurrently"] 

2653 if concurrently: 

2654 text += "CONCURRENTLY " 

2655 

2656 if drop.if_exists: 

2657 text += "IF EXISTS " 

2658 

2659 text += self._prepared_index_name(index, include_schema=True) 

2660 return text 

2661 

2662 def visit_exclude_constraint(self, constraint, **kw): 

2663 text = "" 

2664 if constraint.name is not None: 

2665 text += "CONSTRAINT %s " % self.preparer.format_constraint( 

2666 constraint 

2667 ) 

2668 elements = [] 

2669 kw["include_table"] = False 

2670 kw["literal_binds"] = True 

2671 for expr, name, op in constraint._render_exprs: 

2672 exclude_element = self.sql_compiler.process(expr, **kw) + ( 

2673 (" " + constraint.ops[expr.key]) 

2674 if hasattr(expr, "key") and expr.key in constraint.ops 

2675 else "" 

2676 ) 

2677 

2678 elements.append("%s WITH %s" % (exclude_element, op)) 

2679 text += "EXCLUDE USING %s (%s)" % ( 

2680 self.preparer.validate_sql_phrase( 

2681 constraint.using, IDX_USING 

2682 ).lower(), 

2683 ", ".join(elements), 

2684 ) 

2685 if constraint.where is not None: 

2686 text += " WHERE (%s)" % self.sql_compiler.process( 

2687 constraint.where, literal_binds=True 

2688 ) 

2689 text += self.define_constraint_deferrability(constraint) 

2690 return text 

2691 

2692 def post_create_table(self, table): 

2693 table_opts = [] 

2694 pg_opts = table.dialect_options["postgresql"] 

2695 

2696 inherits = pg_opts.get("inherits") 

2697 if inherits is not None: 

2698 if not isinstance(inherits, (list, tuple)): 

2699 inherits = (inherits,) 

2700 table_opts.append( 

2701 "\n INHERITS ( " 

2702 + ", ".join(self.preparer.quote(name) for name in inherits) 

2703 + " )" 

2704 ) 

2705 

2706 if pg_opts["partition_by"]: 

2707 table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"]) 

2708 

2709 if pg_opts["using"]: 

2710 table_opts.append("\n USING %s" % pg_opts["using"]) 

2711 

2712 if pg_opts["with_oids"] is True: 

2713 table_opts.append("\n WITH OIDS") 

2714 elif pg_opts["with_oids"] is False: 

2715 table_opts.append("\n WITHOUT OIDS") 

2716 

2717 if pg_opts["on_commit"]: 

2718 on_commit_options = pg_opts["on_commit"].replace("_", " ").upper() 

2719 table_opts.append("\n ON COMMIT %s" % on_commit_options) 

2720 

2721 if pg_opts["tablespace"]: 

2722 tablespace_name = pg_opts["tablespace"] 

2723 table_opts.append( 

2724 "\n TABLESPACE %s" % self.preparer.quote(tablespace_name) 

2725 ) 

2726 

2727 return "".join(table_opts) 

2728 

2729 def visit_computed_column(self, generated, **kw): 

2730 if generated.persisted is False: 

2731 raise exc.CompileError( 

2732 "PostrgreSQL computed columns do not support 'virtual' " 

2733 "persistence; set the 'persisted' flag to None or True for " 

2734 "PostgreSQL support." 

2735 ) 

2736 

2737 return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process( 

2738 generated.sqltext, include_table=False, literal_binds=True 

2739 ) 

2740 

2741 def visit_create_sequence(self, create, **kw): 

2742 prefix = None 

2743 if create.element.data_type is not None: 

2744 prefix = " AS %s" % self.type_compiler.process( 

2745 create.element.data_type 

2746 ) 

2747 

2748 return super().visit_create_sequence(create, prefix=prefix, **kw) 

2749 

2750 def _can_comment_on_constraint(self, ddl_instance): 

2751 constraint = ddl_instance.element 

2752 if constraint.name is None: 

2753 raise exc.CompileError( 

2754 f"Can't emit COMMENT ON for constraint {constraint!r}: " 

2755 "it has no name" 

2756 ) 

2757 if constraint.table is None: 

2758 raise exc.CompileError( 

2759 f"Can't emit COMMENT ON for constraint {constraint!r}: " 

2760 "it has no associated table" 

2761 ) 

2762 

2763 def visit_set_constraint_comment(self, create, **kw): 

2764 self._can_comment_on_constraint(create) 

2765 return "COMMENT ON CONSTRAINT %s ON %s IS %s" % ( 

2766 self.preparer.format_constraint(create.element), 

2767 self.preparer.format_table(create.element.table), 

2768 self.sql_compiler.render_literal_value( 

2769 create.element.comment, sqltypes.String() 

2770 ), 

2771 ) 

2772 

2773 def visit_drop_constraint_comment(self, drop, **kw): 

2774 self._can_comment_on_constraint(drop) 

2775 return "COMMENT ON CONSTRAINT %s ON %s IS NULL" % ( 

2776 self.preparer.format_constraint(drop.element), 

2777 self.preparer.format_table(drop.element.table), 

2778 ) 

2779 

2780 

2781class PGTypeCompiler(compiler.GenericTypeCompiler): 

2782 def visit_TSVECTOR(self, type_, **kw): 

2783 return "TSVECTOR" 

2784 

2785 def visit_TSQUERY(self, type_, **kw): 

2786 return "TSQUERY" 

2787 

2788 def visit_INET(self, type_, **kw): 

2789 return "INET" 

2790 

2791 def visit_CIDR(self, type_, **kw): 

2792 return "CIDR" 

2793 

2794 def visit_CITEXT(self, type_, **kw): 

2795 return "CITEXT" 

2796 

2797 def visit_MACADDR(self, type_, **kw): 

2798 return "MACADDR" 

2799 

2800 def visit_MACADDR8(self, type_, **kw): 

2801 return "MACADDR8" 

2802 

2803 def visit_MONEY(self, type_, **kw): 

2804 return "MONEY" 

2805 

2806 def visit_OID(self, type_, **kw): 

2807 return "OID" 

2808 

2809 def visit_REGCONFIG(self, type_, **kw): 

2810 return "REGCONFIG" 

2811 

2812 def visit_REGCLASS(self, type_, **kw): 

2813 return "REGCLASS" 

2814 

2815 def visit_FLOAT(self, type_, **kw): 

2816 if not type_.precision: 

2817 return "FLOAT" 

2818 else: 

2819 return "FLOAT(%(precision)s)" % {"precision": type_.precision} 

2820 

2821 def visit_double(self, type_, **kw): 

2822 return self.visit_DOUBLE_PRECISION(type, **kw) 

2823 

2824 def visit_BIGINT(self, type_, **kw): 

2825 return "BIGINT" 

2826 

2827 def visit_HSTORE(self, type_, **kw): 

2828 return "HSTORE" 

2829 

2830 def visit_JSON(self, type_, **kw): 

2831 return "JSON" 

2832 

2833 def visit_JSONB(self, type_, **kw): 

2834 return "JSONB" 

2835 

2836 def visit_INT4MULTIRANGE(self, type_, **kw): 

2837 return "INT4MULTIRANGE" 

2838 

2839 def visit_INT8MULTIRANGE(self, type_, **kw): 

2840 return "INT8MULTIRANGE" 

2841 

2842 def visit_NUMMULTIRANGE(self, type_, **kw): 

2843 return "NUMMULTIRANGE" 

2844 

2845 def visit_DATEMULTIRANGE(self, type_, **kw): 

2846 return "DATEMULTIRANGE" 

2847 

2848 def visit_TSMULTIRANGE(self, type_, **kw): 

2849 return "TSMULTIRANGE" 

2850 

2851 def visit_TSTZMULTIRANGE(self, type_, **kw): 

2852 return "TSTZMULTIRANGE" 

2853 

2854 def visit_INT4RANGE(self, type_, **kw): 

2855 return "INT4RANGE" 

2856 

2857 def visit_INT8RANGE(self, type_, **kw): 

2858 return "INT8RANGE" 

2859 

2860 def visit_NUMRANGE(self, type_, **kw): 

2861 return "NUMRANGE" 

2862 

2863 def visit_DATERANGE(self, type_, **kw): 

2864 return "DATERANGE" 

2865 

2866 def visit_TSRANGE(self, type_, **kw): 

2867 return "TSRANGE" 

2868 

2869 def visit_TSTZRANGE(self, type_, **kw): 

2870 return "TSTZRANGE" 

2871 

2872 def visit_json_int_index(self, type_, **kw): 

2873 return "INT" 

2874 

2875 def visit_json_str_index(self, type_, **kw): 

2876 return "TEXT" 

2877 

2878 def visit_datetime(self, type_, **kw): 

2879 return self.visit_TIMESTAMP(type_, **kw) 

2880 

2881 def visit_enum(self, type_, **kw): 

2882 if not type_.native_enum or not self.dialect.supports_native_enum: 

2883 return super().visit_enum(type_, **kw) 

2884 else: 

2885 return self.visit_ENUM(type_, **kw) 

2886 

2887 def visit_ENUM(self, type_, identifier_preparer=None, **kw): 

2888 if identifier_preparer is None: 

2889 identifier_preparer = self.dialect.identifier_preparer 

2890 return identifier_preparer.format_type(type_) 

2891 

2892 def visit_DOMAIN(self, type_, identifier_preparer=None, **kw): 

2893 if identifier_preparer is None: 

2894 identifier_preparer = self.dialect.identifier_preparer 

2895 return identifier_preparer.format_type(type_) 

2896 

2897 def visit_TIMESTAMP(self, type_, **kw): 

2898 return "TIMESTAMP%s %s" % ( 

2899 ( 

2900 "(%d)" % type_.precision 

2901 if getattr(type_, "precision", None) is not None 

2902 else "" 

2903 ), 

2904 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", 

2905 ) 

2906 

2907 def visit_TIME(self, type_, **kw): 

2908 return "TIME%s %s" % ( 

2909 ( 

2910 "(%d)" % type_.precision 

2911 if getattr(type_, "precision", None) is not None 

2912 else "" 

2913 ), 

2914 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", 

2915 ) 

2916 

2917 def visit_INTERVAL(self, type_, **kw): 

2918 text = "INTERVAL" 

2919 if type_.fields is not None: 

2920 text += " " + type_.fields 

2921 if type_.precision is not None: 

2922 text += " (%d)" % type_.precision 

2923 return text 

2924 

2925 def visit_BIT(self, type_, **kw): 

2926 if type_.varying: 

2927 compiled = "BIT VARYING" 

2928 if type_.length is not None: 

2929 compiled += "(%d)" % type_.length 

2930 else: 

2931 compiled = "BIT(%d)" % type_.length 

2932 return compiled 

2933 

2934 def visit_uuid(self, type_, **kw): 

2935 if type_.native_uuid: 

2936 return self.visit_UUID(type_, **kw) 

2937 else: 

2938 return super().visit_uuid(type_, **kw) 

2939 

2940 def visit_UUID(self, type_, **kw): 

2941 return "UUID" 

2942 

2943 def visit_large_binary(self, type_, **kw): 

2944 return self.visit_BYTEA(type_, **kw) 

2945 

2946 def visit_BYTEA(self, type_, **kw): 

2947 return "BYTEA" 

2948 

2949 def visit_ARRAY(self, type_, **kw): 

2950 inner = self.process(type_.item_type, **kw) 

2951 return re.sub( 

2952 r"((?: COLLATE.*)?)$", 

2953 ( 

2954 r"%s\1" 

2955 % ( 

2956 "[]" 

2957 * (type_.dimensions if type_.dimensions is not None else 1) 

2958 ) 

2959 ), 

2960 inner, 

2961 count=1, 

2962 ) 

2963 

2964 def visit_json_path(self, type_, **kw): 

2965 return self.visit_JSONPATH(type_, **kw) 

2966 

2967 def visit_JSONPATH(self, type_, **kw): 

2968 return "JSONPATH" 

2969 

2970 

2971class PGIdentifierPreparer(compiler.IdentifierPreparer): 

2972 reserved_words = RESERVED_WORDS 

2973 

2974 def _unquote_identifier(self, value): 

2975 if value[0] == self.initial_quote: 

2976 value = value[1:-1].replace( 

2977 self.escape_to_quote, self.escape_quote 

2978 ) 

2979 return value 

2980 

2981 def format_type(self, type_, use_schema=True): 

2982 if not type_.name: 

2983 raise exc.CompileError( 

2984 f"PostgreSQL {type_.__class__.__name__} type requires a name." 

2985 ) 

2986 

2987 name = self.quote(type_.name) 

2988 effective_schema = self.schema_for_object(type_) 

2989 

2990 if ( 

2991 not self.omit_schema 

2992 and use_schema 

2993 and effective_schema is not None 

2994 ): 

2995 name = f"{self.quote_schema(effective_schema)}.{name}" 

2996 return name 

2997 

2998 

2999class ReflectedNamedType(TypedDict): 

3000 """Represents a reflected named type.""" 

3001 

3002 name: str 

3003 """Name of the type.""" 

3004 schema: str 

3005 """The schema of the type.""" 

3006 visible: bool 

3007 """Indicates if this type is in the current search path.""" 

3008 

3009 

3010class ReflectedDomainConstraint(TypedDict): 

3011 """Represents a reflect check constraint of a domain.""" 

3012 

3013 name: str 

3014 """Name of the constraint.""" 

3015 check: str 

3016 """The check constraint text.""" 

3017 

3018 

3019class ReflectedDomain(ReflectedNamedType): 

3020 """Represents a reflected enum.""" 

3021 

3022 type: str 

3023 """The string name of the underlying data type of the domain.""" 

3024 nullable: bool 

3025 """Indicates if the domain allows null or not.""" 

3026 default: Optional[str] 

3027 """The string representation of the default value of this domain 

3028 or ``None`` if none present. 

3029 """ 

3030 constraints: List[ReflectedDomainConstraint] 

3031 """The constraints defined in the domain, if any. 

3032 The constraint are in order of evaluation by postgresql. 

3033 """ 

3034 collation: Optional[str] 

3035 """The collation for the domain.""" 

3036 

3037 

3038class ReflectedEnum(ReflectedNamedType): 

3039 """Represents a reflected enum.""" 

3040 

3041 labels: List[str] 

3042 """The labels that compose the enum.""" 

3043 

3044 

3045class PGInspector(reflection.Inspector): 

3046 dialect: PGDialect 

3047 

3048 def get_table_oid( 

3049 self, table_name: str, schema: Optional[str] = None 

3050 ) -> int: 

3051 """Return the OID for the given table name. 

3052 

3053 :param table_name: string name of the table. For special quoting, 

3054 use :class:`.quoted_name`. 

3055 

3056 :param schema: string schema name; if omitted, uses the default schema 

3057 of the database connection. For special quoting, 

3058 use :class:`.quoted_name`. 

3059 

3060 """ 

3061 

3062 with self._operation_context() as conn: 

3063 return self.dialect.get_table_oid( 

3064 conn, table_name, schema, info_cache=self.info_cache 

3065 ) 

3066 

3067 def get_domains( 

3068 self, schema: Optional[str] = None 

3069 ) -> List[ReflectedDomain]: 

3070 """Return a list of DOMAIN objects. 

3071 

3072 Each member is a dictionary containing these fields: 

3073 

3074 * name - name of the domain 

3075 * schema - the schema name for the domain. 

3076 * visible - boolean, whether or not this domain is visible 

3077 in the default search path. 

3078 * type - the type defined by this domain. 

3079 * nullable - Indicates if this domain can be ``NULL``. 

3080 * default - The default value of the domain or ``None`` if the 

3081 domain has no default. 

3082 * constraints - A list of dict with the constraint defined by this 

3083 domain. Each element contains two keys: ``name`` of the 

3084 constraint and ``check`` with the constraint text. 

3085 

3086 :param schema: schema name. If None, the default schema 

3087 (typically 'public') is used. May also be set to ``'*'`` to 

3088 indicate load domains for all schemas. 

3089 

3090 .. versionadded:: 2.0 

3091 

3092 """ 

3093 with self._operation_context() as conn: 

3094 return self.dialect._load_domains( 

3095 conn, schema, info_cache=self.info_cache 

3096 ) 

3097 

3098 def get_enums(self, schema: Optional[str] = None) -> List[ReflectedEnum]: 

3099 """Return a list of ENUM objects. 

3100 

3101 Each member is a dictionary containing these fields: 

3102 

3103 * name - name of the enum 

3104 * schema - the schema name for the enum. 

3105 * visible - boolean, whether or not this enum is visible 

3106 in the default search path. 

3107 * labels - a list of string labels that apply to the enum. 

3108 

3109 :param schema: schema name. If None, the default schema 

3110 (typically 'public') is used. May also be set to ``'*'`` to 

3111 indicate load enums for all schemas. 

3112 

3113 """ 

3114 with self._operation_context() as conn: 

3115 return self.dialect._load_enums( 

3116 conn, schema, info_cache=self.info_cache 

3117 ) 

3118 

3119 def get_foreign_table_names( 

3120 self, schema: Optional[str] = None 

3121 ) -> List[str]: 

3122 """Return a list of FOREIGN TABLE names. 

3123 

3124 Behavior is similar to that of 

3125 :meth:`_reflection.Inspector.get_table_names`, 

3126 except that the list is limited to those tables that report a 

3127 ``relkind`` value of ``f``. 

3128 

3129 """ 

3130 with self._operation_context() as conn: 

3131 return self.dialect._get_foreign_table_names( 

3132 conn, schema, info_cache=self.info_cache 

3133 ) 

3134 

3135 def has_type( 

3136 self, type_name: str, schema: Optional[str] = None, **kw: Any 

3137 ) -> bool: 

3138 """Return if the database has the specified type in the provided 

3139 schema. 

3140 

3141 :param type_name: the type to check. 

3142 :param schema: schema name. If None, the default schema 

3143 (typically 'public') is used. May also be set to ``'*'`` to 

3144 check in all schemas. 

3145 

3146 .. versionadded:: 2.0 

3147 

3148 """ 

3149 with self._operation_context() as conn: 

3150 return self.dialect.has_type( 

3151 conn, type_name, schema, info_cache=self.info_cache 

3152 ) 

3153 

3154 

3155class PGExecutionContext(default.DefaultExecutionContext): 

3156 def fire_sequence(self, seq, type_): 

3157 return self._execute_scalar( 

3158 ( 

3159 "select nextval('%s')" 

3160 % self.identifier_preparer.format_sequence(seq) 

3161 ), 

3162 type_, 

3163 ) 

3164 

3165 def get_insert_default(self, column): 

3166 if column.primary_key and column is column.table._autoincrement_column: 

3167 if column.server_default and column.server_default.has_argument: 

3168 # pre-execute passive defaults on primary key columns 

3169 return self._execute_scalar( 

3170 "select %s" % column.server_default.arg, column.type 

3171 ) 

3172 

3173 elif column.default is None or ( 

3174 column.default.is_sequence and column.default.optional 

3175 ): 

3176 # execute the sequence associated with a SERIAL primary 

3177 # key column. for non-primary-key SERIAL, the ID just 

3178 # generates server side. 

3179 

3180 try: 

3181 seq_name = column._postgresql_seq_name 

3182 except AttributeError: 

3183 tab = column.table.name 

3184 col = column.name 

3185 tab = tab[0 : 29 + max(0, (29 - len(col)))] 

3186 col = col[0 : 29 + max(0, (29 - len(tab)))] 

3187 name = "%s_%s_seq" % (tab, col) 

3188 column._postgresql_seq_name = seq_name = name 

3189 

3190 if column.table is not None: 

3191 effective_schema = self.connection.schema_for_object( 

3192 column.table 

3193 ) 

3194 else: 

3195 effective_schema = None 

3196 

3197 if effective_schema is not None: 

3198 exc = 'select nextval(\'"%s"."%s"\')' % ( 

3199 effective_schema, 

3200 seq_name, 

3201 ) 

3202 else: 

3203 exc = "select nextval('\"%s\"')" % (seq_name,) 

3204 

3205 return self._execute_scalar(exc, column.type) 

3206 

3207 return super().get_insert_default(column) 

3208 

3209 

3210class PGReadOnlyConnectionCharacteristic( 

3211 characteristics.ConnectionCharacteristic 

3212): 

3213 transactional = True 

3214 

3215 def reset_characteristic(self, dialect, dbapi_conn): 

3216 dialect.set_readonly(dbapi_conn, False) 

3217 

3218 def set_characteristic(self, dialect, dbapi_conn, value): 

3219 dialect.set_readonly(dbapi_conn, value) 

3220 

3221 def get_characteristic(self, dialect, dbapi_conn): 

3222 return dialect.get_readonly(dbapi_conn) 

3223 

3224 

3225class PGDeferrableConnectionCharacteristic( 

3226 characteristics.ConnectionCharacteristic 

3227): 

3228 transactional = True 

3229 

3230 def reset_characteristic(self, dialect, dbapi_conn): 

3231 dialect.set_deferrable(dbapi_conn, False) 

3232 

3233 def set_characteristic(self, dialect, dbapi_conn, value): 

3234 dialect.set_deferrable(dbapi_conn, value) 

3235 

3236 def get_characteristic(self, dialect, dbapi_conn): 

3237 return dialect.get_deferrable(dbapi_conn) 

3238 

3239 

3240class PGDialect(default.DefaultDialect): 

3241 name = "postgresql" 

3242 supports_statement_cache = True 

3243 supports_alter = True 

3244 max_identifier_length = 63 

3245 supports_sane_rowcount = True 

3246 

3247 bind_typing = interfaces.BindTyping.RENDER_CASTS 

3248 

3249 supports_native_enum = True 

3250 supports_native_boolean = True 

3251 supports_native_uuid = True 

3252 supports_smallserial = True 

3253 

3254 supports_sequences = True 

3255 sequences_optional = True 

3256 preexecute_autoincrement_sequences = True 

3257 postfetch_lastrowid = False 

3258 use_insertmanyvalues = True 

3259 

3260 returns_native_bytes = True 

3261 

3262 insertmanyvalues_implicit_sentinel = ( 

3263 InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT 

3264 | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT 

3265 | InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS 

3266 ) 

3267 

3268 supports_comments = True 

3269 supports_constraint_comments = True 

3270 supports_default_values = True 

3271 

3272 supports_default_metavalue = True 

3273 

3274 supports_empty_insert = False 

3275 supports_multivalues_insert = True 

3276 

3277 supports_identity_columns = True 

3278 

3279 default_paramstyle = "pyformat" 

3280 ischema_names = ischema_names 

3281 colspecs = colspecs 

3282 

3283 statement_compiler = PGCompiler 

3284 ddl_compiler = PGDDLCompiler 

3285 type_compiler_cls = PGTypeCompiler 

3286 preparer = PGIdentifierPreparer 

3287 execution_ctx_cls = PGExecutionContext 

3288 inspector = PGInspector 

3289 

3290 update_returning = True 

3291 delete_returning = True 

3292 insert_returning = True 

3293 update_returning_multifrom = True 

3294 delete_returning_multifrom = True 

3295 

3296 connection_characteristics = ( 

3297 default.DefaultDialect.connection_characteristics 

3298 ) 

3299 connection_characteristics = connection_characteristics.union( 

3300 { 

3301 "postgresql_readonly": PGReadOnlyConnectionCharacteristic(), 

3302 "postgresql_deferrable": PGDeferrableConnectionCharacteristic(), 

3303 } 

3304 ) 

3305 

3306 construct_arguments = [ 

3307 ( 

3308 schema.Index, 

3309 { 

3310 "using": False, 

3311 "include": None, 

3312 "where": None, 

3313 "ops": {}, 

3314 "concurrently": False, 

3315 "with": {}, 

3316 "tablespace": None, 

3317 "nulls_not_distinct": None, 

3318 }, 

3319 ), 

3320 ( 

3321 schema.Table, 

3322 { 

3323 "ignore_search_path": False, 

3324 "tablespace": None, 

3325 "partition_by": None, 

3326 "with_oids": None, 

3327 "on_commit": None, 

3328 "inherits": None, 

3329 "using": None, 

3330 }, 

3331 ), 

3332 ( 

3333 schema.CheckConstraint, 

3334 { 

3335 "not_valid": False, 

3336 }, 

3337 ), 

3338 ( 

3339 schema.ForeignKeyConstraint, 

3340 { 

3341 "not_valid": False, 

3342 }, 

3343 ), 

3344 ( 

3345 schema.PrimaryKeyConstraint, 

3346 {"include": None}, 

3347 ), 

3348 ( 

3349 schema.UniqueConstraint, 

3350 { 

3351 "include": None, 

3352 "nulls_not_distinct": None, 

3353 }, 

3354 ), 

3355 ] 

3356 

3357 reflection_options = ("postgresql_ignore_search_path",) 

3358 

3359 _backslash_escapes = True 

3360 _supports_create_index_concurrently = True 

3361 _supports_drop_index_concurrently = True 

3362 _supports_jsonb_subscripting = True 

3363 

3364 def __init__( 

3365 self, 

3366 native_inet_types=None, 

3367 json_serializer=None, 

3368 json_deserializer=None, 

3369 **kwargs, 

3370 ): 

3371 default.DefaultDialect.__init__(self, **kwargs) 

3372 

3373 self._native_inet_types = native_inet_types 

3374 self._json_deserializer = json_deserializer 

3375 self._json_serializer = json_serializer 

3376 

3377 def initialize(self, connection): 

3378 super().initialize(connection) 

3379 

3380 # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689 

3381 self.supports_smallserial = self.server_version_info >= (9, 2) 

3382 

3383 self._set_backslash_escapes(connection) 

3384 

3385 self._supports_drop_index_concurrently = self.server_version_info >= ( 

3386 9, 

3387 2, 

3388 ) 

3389 self.supports_identity_columns = self.server_version_info >= (10,) 

3390 

3391 self._supports_jsonb_subscripting = self.server_version_info >= (14,) 

3392 

3393 def get_isolation_level_values(self, dbapi_conn): 

3394 # note the generic dialect doesn't have AUTOCOMMIT, however 

3395 # all postgresql dialects should include AUTOCOMMIT. 

3396 return ( 

3397 "SERIALIZABLE", 

3398 "READ UNCOMMITTED", 

3399 "READ COMMITTED", 

3400 "REPEATABLE READ", 

3401 ) 

3402 

3403 def set_isolation_level(self, dbapi_connection, level): 

3404 cursor = dbapi_connection.cursor() 

3405 cursor.execute( 

3406 "SET SESSION CHARACTERISTICS AS TRANSACTION " 

3407 f"ISOLATION LEVEL {level}" 

3408 ) 

3409 cursor.execute("COMMIT") 

3410 cursor.close() 

3411 

3412 def get_isolation_level(self, dbapi_connection): 

3413 cursor = dbapi_connection.cursor() 

3414 cursor.execute("show transaction isolation level") 

3415 val = cursor.fetchone()[0] 

3416 cursor.close() 

3417 return val.upper() 

3418 

3419 def set_readonly(self, connection, value): 

3420 raise NotImplementedError() 

3421 

3422 def get_readonly(self, connection): 

3423 raise NotImplementedError() 

3424 

3425 def set_deferrable(self, connection, value): 

3426 raise NotImplementedError() 

3427 

3428 def get_deferrable(self, connection): 

3429 raise NotImplementedError() 

3430 

3431 def _split_multihost_from_url(self, url: URL) -> Union[ 

3432 Tuple[None, None], 

3433 Tuple[Tuple[Optional[str], ...], Tuple[Optional[int], ...]], 

3434 ]: 

3435 hosts: Optional[Tuple[Optional[str], ...]] = None 

3436 ports_str: Union[str, Tuple[Optional[str], ...], None] = None 

3437 

3438 integrated_multihost = False 

3439 

3440 if "host" in url.query: 

3441 if isinstance(url.query["host"], (list, tuple)): 

3442 integrated_multihost = True 

3443 hosts, ports_str = zip( 

3444 *[ 

3445 token.split(":") if ":" in token else (token, None) 

3446 for token in url.query["host"] 

3447 ] 

3448 ) 

3449 

3450 elif isinstance(url.query["host"], str): 

3451 hosts = tuple(url.query["host"].split(",")) 

3452 

3453 if ( 

3454 "port" not in url.query 

3455 and len(hosts) == 1 

3456 and ":" in hosts[0] 

3457 ): 

3458 # internet host is alphanumeric plus dots or hyphens. 

3459 # this is essentially rfc1123, which refers to rfc952. 

3460 # https://stackoverflow.com/questions/3523028/ 

3461 # valid-characters-of-a-hostname 

3462 host_port_match = re.match( 

3463 r"^([a-zA-Z0-9\-\.]*)(?:\:(\d*))?$", hosts[0] 

3464 ) 

3465 if host_port_match: 

3466 integrated_multihost = True 

3467 h, p = host_port_match.group(1, 2) 

3468 if TYPE_CHECKING: 

3469 assert isinstance(h, str) 

3470 assert isinstance(p, str) 

3471 hosts = (h,) 

3472 ports_str = cast( 

3473 "Tuple[Optional[str], ...]", (p,) if p else (None,) 

3474 ) 

3475 

3476 if "port" in url.query: 

3477 if integrated_multihost: 

3478 raise exc.ArgumentError( 

3479 "Can't mix 'multihost' formats together; use " 

3480 '"host=h1,h2,h3&port=p1,p2,p3" or ' 

3481 '"host=h1:p1&host=h2:p2&host=h3:p3" separately' 

3482 ) 

3483 if isinstance(url.query["port"], (list, tuple)): 

3484 ports_str = url.query["port"] 

3485 elif isinstance(url.query["port"], str): 

3486 ports_str = tuple(url.query["port"].split(",")) 

3487 

3488 ports: Optional[Tuple[Optional[int], ...]] = None 

3489 

3490 if ports_str: 

3491 try: 

3492 ports = tuple(int(x) if x else None for x in ports_str) 

3493 except ValueError: 

3494 raise exc.ArgumentError( 

3495 f"Received non-integer port arguments: {ports_str}" 

3496 ) from None 

3497 

3498 if ports and ( 

3499 (not hosts and len(ports) > 1) 

3500 or ( 

3501 hosts 

3502 and ports 

3503 and len(hosts) != len(ports) 

3504 and (len(hosts) > 1 or len(ports) > 1) 

3505 ) 

3506 ): 

3507 raise exc.ArgumentError("number of hosts and ports don't match") 

3508 

3509 if hosts is not None: 

3510 if ports is None: 

3511 ports = tuple(None for _ in hosts) 

3512 

3513 return hosts, ports # type: ignore 

3514 

3515 def do_begin_twophase(self, connection, xid): 

3516 self.do_begin(connection.connection) 

3517 

3518 def do_prepare_twophase(self, connection, xid): 

3519 connection.execute( 

3520 sql.text("PREPARE TRANSACTION :xid").bindparams( 

3521 sql.bindparam("xid", xid, literal_execute=True) 

3522 ) 

3523 ) 

3524 

3525 def do_rollback_twophase( 

3526 self, connection, xid, is_prepared=True, recover=False 

3527 ): 

3528 if is_prepared: 

3529 if recover: 

3530 # FIXME: ugly hack to get out of transaction 

3531 # context when committing recoverable transactions 

3532 # Must find out a way how to make the dbapi not 

3533 # open a transaction. 

3534 connection.exec_driver_sql("ROLLBACK") 

3535 connection.execute( 

3536 sql.text("ROLLBACK PREPARED :xid").bindparams( 

3537 sql.bindparam("xid", xid, literal_execute=True) 

3538 ) 

3539 ) 

3540 connection.exec_driver_sql("BEGIN") 

3541 self.do_rollback(connection.connection) 

3542 else: 

3543 self.do_rollback(connection.connection) 

3544 

3545 def do_commit_twophase( 

3546 self, connection, xid, is_prepared=True, recover=False 

3547 ): 

3548 if is_prepared: 

3549 if recover: 

3550 connection.exec_driver_sql("ROLLBACK") 

3551 connection.execute( 

3552 sql.text("COMMIT PREPARED :xid").bindparams( 

3553 sql.bindparam("xid", xid, literal_execute=True) 

3554 ) 

3555 ) 

3556 connection.exec_driver_sql("BEGIN") 

3557 self.do_rollback(connection.connection) 

3558 else: 

3559 self.do_commit(connection.connection) 

3560 

3561 def do_recover_twophase(self, connection): 

3562 return connection.scalars( 

3563 sql.text("SELECT gid FROM pg_prepared_xacts") 

3564 ).all() 

3565 

3566 def _get_default_schema_name(self, connection): 

3567 return connection.exec_driver_sql("select current_schema()").scalar() 

3568 

3569 @reflection.cache 

3570 def has_schema(self, connection, schema, **kw): 

3571 query = select(pg_catalog.pg_namespace.c.nspname).where( 

3572 pg_catalog.pg_namespace.c.nspname == schema 

3573 ) 

3574 return bool(connection.scalar(query)) 

3575 

3576 def _pg_class_filter_scope_schema( 

3577 self, query, schema, scope, pg_class_table=None 

3578 ): 

3579 if pg_class_table is None: 

3580 pg_class_table = pg_catalog.pg_class 

3581 query = query.join( 

3582 pg_catalog.pg_namespace, 

3583 pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace, 

3584 ) 

3585 

3586 if scope is ObjectScope.DEFAULT: 

3587 query = query.where(pg_class_table.c.relpersistence != "t") 

3588 elif scope is ObjectScope.TEMPORARY: 

3589 query = query.where(pg_class_table.c.relpersistence == "t") 

3590 

3591 if schema is None: 

3592 query = query.where( 

3593 pg_catalog.pg_table_is_visible(pg_class_table.c.oid), 

3594 # ignore pg_catalog schema 

3595 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

3596 ) 

3597 else: 

3598 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

3599 return query 

3600 

3601 def _pg_class_relkind_condition(self, relkinds, pg_class_table=None): 

3602 if pg_class_table is None: 

3603 pg_class_table = pg_catalog.pg_class 

3604 # uses the any form instead of in otherwise postgresql complaings 

3605 # that 'IN could not convert type character to "char"' 

3606 return pg_class_table.c.relkind == sql.any_(_array.array(relkinds)) 

3607 

3608 @lru_cache() 

3609 def _has_table_query(self, schema): 

3610 query = select(pg_catalog.pg_class.c.relname).where( 

3611 pg_catalog.pg_class.c.relname == bindparam("table_name"), 

3612 self._pg_class_relkind_condition( 

3613 pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3614 ), 

3615 ) 

3616 return self._pg_class_filter_scope_schema( 

3617 query, schema, scope=ObjectScope.ANY 

3618 ) 

3619 

3620 @reflection.cache 

3621 def has_table(self, connection, table_name, schema=None, **kw): 

3622 self._ensure_has_table_connection(connection) 

3623 query = self._has_table_query(schema) 

3624 return bool(connection.scalar(query, {"table_name": table_name})) 

3625 

3626 @reflection.cache 

3627 def has_sequence(self, connection, sequence_name, schema=None, **kw): 

3628 query = select(pg_catalog.pg_class.c.relname).where( 

3629 pg_catalog.pg_class.c.relkind == "S", 

3630 pg_catalog.pg_class.c.relname == sequence_name, 

3631 ) 

3632 query = self._pg_class_filter_scope_schema( 

3633 query, schema, scope=ObjectScope.ANY 

3634 ) 

3635 return bool(connection.scalar(query)) 

3636 

3637 @reflection.cache 

3638 def has_type(self, connection, type_name, schema=None, **kw): 

3639 query = ( 

3640 select(pg_catalog.pg_type.c.typname) 

3641 .join( 

3642 pg_catalog.pg_namespace, 

3643 pg_catalog.pg_namespace.c.oid 

3644 == pg_catalog.pg_type.c.typnamespace, 

3645 ) 

3646 .where(pg_catalog.pg_type.c.typname == type_name) 

3647 ) 

3648 if schema is None: 

3649 query = query.where( 

3650 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), 

3651 # ignore pg_catalog schema 

3652 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

3653 ) 

3654 elif schema != "*": 

3655 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

3656 

3657 return bool(connection.scalar(query)) 

3658 

3659 def _get_server_version_info(self, connection): 

3660 v = connection.exec_driver_sql("select pg_catalog.version()").scalar() 

3661 m = re.match( 

3662 r".*(?:PostgreSQL|EnterpriseDB) " 

3663 r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?", 

3664 v, 

3665 ) 

3666 if not m: 

3667 raise AssertionError( 

3668 "Could not determine version from string '%s'" % v 

3669 ) 

3670 return tuple([int(x) for x in m.group(1, 2, 3) if x is not None]) 

3671 

3672 @reflection.cache 

3673 def get_table_oid(self, connection, table_name, schema=None, **kw): 

3674 """Fetch the oid for schema.table_name.""" 

3675 query = select(pg_catalog.pg_class.c.oid).where( 

3676 pg_catalog.pg_class.c.relname == table_name, 

3677 self._pg_class_relkind_condition( 

3678 pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3679 ), 

3680 ) 

3681 query = self._pg_class_filter_scope_schema( 

3682 query, schema, scope=ObjectScope.ANY 

3683 ) 

3684 table_oid = connection.scalar(query) 

3685 if table_oid is None: 

3686 raise exc.NoSuchTableError( 

3687 f"{schema}.{table_name}" if schema else table_name 

3688 ) 

3689 return table_oid 

3690 

3691 @reflection.cache 

3692 def get_schema_names(self, connection, **kw): 

3693 query = ( 

3694 select(pg_catalog.pg_namespace.c.nspname) 

3695 .where(pg_catalog.pg_namespace.c.nspname.not_like("pg_%")) 

3696 .order_by(pg_catalog.pg_namespace.c.nspname) 

3697 ) 

3698 return connection.scalars(query).all() 

3699 

3700 def _get_relnames_for_relkinds(self, connection, schema, relkinds, scope): 

3701 query = select(pg_catalog.pg_class.c.relname).where( 

3702 self._pg_class_relkind_condition(relkinds) 

3703 ) 

3704 query = self._pg_class_filter_scope_schema(query, schema, scope=scope) 

3705 return connection.scalars(query).all() 

3706 

3707 @reflection.cache 

3708 def get_table_names(self, connection, schema=None, **kw): 

3709 return self._get_relnames_for_relkinds( 

3710 connection, 

3711 schema, 

3712 pg_catalog.RELKINDS_TABLE_NO_FOREIGN, 

3713 scope=ObjectScope.DEFAULT, 

3714 ) 

3715 

3716 @reflection.cache 

3717 def get_temp_table_names(self, connection, **kw): 

3718 return self._get_relnames_for_relkinds( 

3719 connection, 

3720 schema=None, 

3721 relkinds=pg_catalog.RELKINDS_TABLE_NO_FOREIGN, 

3722 scope=ObjectScope.TEMPORARY, 

3723 ) 

3724 

3725 @reflection.cache 

3726 def _get_foreign_table_names(self, connection, schema=None, **kw): 

3727 return self._get_relnames_for_relkinds( 

3728 connection, schema, relkinds=("f",), scope=ObjectScope.ANY 

3729 ) 

3730 

3731 @reflection.cache 

3732 def get_view_names(self, connection, schema=None, **kw): 

3733 return self._get_relnames_for_relkinds( 

3734 connection, 

3735 schema, 

3736 pg_catalog.RELKINDS_VIEW, 

3737 scope=ObjectScope.DEFAULT, 

3738 ) 

3739 

3740 @reflection.cache 

3741 def get_materialized_view_names(self, connection, schema=None, **kw): 

3742 return self._get_relnames_for_relkinds( 

3743 connection, 

3744 schema, 

3745 pg_catalog.RELKINDS_MAT_VIEW, 

3746 scope=ObjectScope.DEFAULT, 

3747 ) 

3748 

3749 @reflection.cache 

3750 def get_temp_view_names(self, connection, schema=None, **kw): 

3751 return self._get_relnames_for_relkinds( 

3752 connection, 

3753 schema, 

3754 # NOTE: do not include temp materialzied views (that do not 

3755 # seem to be a thing at least up to version 14) 

3756 pg_catalog.RELKINDS_VIEW, 

3757 scope=ObjectScope.TEMPORARY, 

3758 ) 

3759 

3760 @reflection.cache 

3761 def get_sequence_names(self, connection, schema=None, **kw): 

3762 return self._get_relnames_for_relkinds( 

3763 connection, schema, relkinds=("S",), scope=ObjectScope.ANY 

3764 ) 

3765 

3766 @reflection.cache 

3767 def get_view_definition(self, connection, view_name, schema=None, **kw): 

3768 query = ( 

3769 select(pg_catalog.pg_get_viewdef(pg_catalog.pg_class.c.oid)) 

3770 .select_from(pg_catalog.pg_class) 

3771 .where( 

3772 pg_catalog.pg_class.c.relname == view_name, 

3773 self._pg_class_relkind_condition( 

3774 pg_catalog.RELKINDS_VIEW + pg_catalog.RELKINDS_MAT_VIEW 

3775 ), 

3776 ) 

3777 ) 

3778 query = self._pg_class_filter_scope_schema( 

3779 query, schema, scope=ObjectScope.ANY 

3780 ) 

3781 res = connection.scalar(query) 

3782 if res is None: 

3783 raise exc.NoSuchTableError( 

3784 f"{schema}.{view_name}" if schema else view_name 

3785 ) 

3786 else: 

3787 return res 

3788 

3789 def _value_or_raise(self, data, table, schema): 

3790 try: 

3791 return dict(data)[(schema, table)] 

3792 except KeyError: 

3793 raise exc.NoSuchTableError( 

3794 f"{schema}.{table}" if schema else table 

3795 ) from None 

3796 

3797 def _prepare_filter_names(self, filter_names): 

3798 if filter_names: 

3799 return True, {"filter_names": filter_names} 

3800 else: 

3801 return False, {} 

3802 

3803 def _kind_to_relkinds(self, kind: ObjectKind) -> Tuple[str, ...]: 

3804 if kind is ObjectKind.ANY: 

3805 return pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3806 relkinds = () 

3807 if ObjectKind.TABLE in kind: 

3808 relkinds += pg_catalog.RELKINDS_TABLE 

3809 if ObjectKind.VIEW in kind: 

3810 relkinds += pg_catalog.RELKINDS_VIEW 

3811 if ObjectKind.MATERIALIZED_VIEW in kind: 

3812 relkinds += pg_catalog.RELKINDS_MAT_VIEW 

3813 return relkinds 

3814 

3815 @reflection.cache 

3816 def get_columns(self, connection, table_name, schema=None, **kw): 

3817 data = self.get_multi_columns( 

3818 connection, 

3819 schema=schema, 

3820 filter_names=[table_name], 

3821 scope=ObjectScope.ANY, 

3822 kind=ObjectKind.ANY, 

3823 **kw, 

3824 ) 

3825 return self._value_or_raise(data, table_name, schema) 

3826 

3827 @lru_cache() 

3828 def _columns_query(self, schema, has_filter_names, scope, kind): 

3829 # NOTE: the query with the default and identity options scalar 

3830 # subquery is faster than trying to use outer joins for them 

3831 generated = ( 

3832 pg_catalog.pg_attribute.c.attgenerated.label("generated") 

3833 if self.server_version_info >= (12,) 

3834 else sql.null().label("generated") 

3835 ) 

3836 if self.server_version_info >= (10,): 

3837 # join lateral performs worse (~2x slower) than a scalar_subquery 

3838 identity = ( 

3839 select( 

3840 sql.func.json_build_object( 

3841 "always", 

3842 pg_catalog.pg_attribute.c.attidentity == "a", 

3843 "start", 

3844 pg_catalog.pg_sequence.c.seqstart, 

3845 "increment", 

3846 pg_catalog.pg_sequence.c.seqincrement, 

3847 "minvalue", 

3848 pg_catalog.pg_sequence.c.seqmin, 

3849 "maxvalue", 

3850 pg_catalog.pg_sequence.c.seqmax, 

3851 "cache", 

3852 pg_catalog.pg_sequence.c.seqcache, 

3853 "cycle", 

3854 pg_catalog.pg_sequence.c.seqcycle, 

3855 type_=sqltypes.JSON(), 

3856 ) 

3857 ) 

3858 .select_from(pg_catalog.pg_sequence) 

3859 .where( 

3860 # attidentity != '' is required or it will reflect also 

3861 # serial columns as identity. 

3862 pg_catalog.pg_attribute.c.attidentity != "", 

3863 pg_catalog.pg_sequence.c.seqrelid 

3864 == sql.cast( 

3865 sql.cast( 

3866 pg_catalog.pg_get_serial_sequence( 

3867 sql.cast( 

3868 sql.cast( 

3869 pg_catalog.pg_attribute.c.attrelid, 

3870 REGCLASS, 

3871 ), 

3872 TEXT, 

3873 ), 

3874 pg_catalog.pg_attribute.c.attname, 

3875 ), 

3876 REGCLASS, 

3877 ), 

3878 OID, 

3879 ), 

3880 ) 

3881 .correlate(pg_catalog.pg_attribute) 

3882 .scalar_subquery() 

3883 .label("identity_options") 

3884 ) 

3885 else: 

3886 identity = sql.null().label("identity_options") 

3887 

3888 # join lateral performs the same as scalar_subquery here 

3889 default = ( 

3890 select( 

3891 pg_catalog.pg_get_expr( 

3892 pg_catalog.pg_attrdef.c.adbin, 

3893 pg_catalog.pg_attrdef.c.adrelid, 

3894 ) 

3895 ) 

3896 .select_from(pg_catalog.pg_attrdef) 

3897 .where( 

3898 pg_catalog.pg_attrdef.c.adrelid 

3899 == pg_catalog.pg_attribute.c.attrelid, 

3900 pg_catalog.pg_attrdef.c.adnum 

3901 == pg_catalog.pg_attribute.c.attnum, 

3902 pg_catalog.pg_attribute.c.atthasdef, 

3903 ) 

3904 .correlate(pg_catalog.pg_attribute) 

3905 .scalar_subquery() 

3906 .label("default") 

3907 ) 

3908 

3909 # get the name of the collate when it's different from the default one 

3910 collate = sql.case( 

3911 ( 

3912 sql.and_( 

3913 pg_catalog.pg_attribute.c.attcollation != 0, 

3914 select(pg_catalog.pg_type.c.typcollation) 

3915 .where( 

3916 pg_catalog.pg_type.c.oid 

3917 == pg_catalog.pg_attribute.c.atttypid, 

3918 ) 

3919 .correlate(pg_catalog.pg_attribute) 

3920 .scalar_subquery() 

3921 != pg_catalog.pg_attribute.c.attcollation, 

3922 ), 

3923 select(pg_catalog.pg_collation.c.collname) 

3924 .where( 

3925 pg_catalog.pg_collation.c.oid 

3926 == pg_catalog.pg_attribute.c.attcollation 

3927 ) 

3928 .correlate(pg_catalog.pg_attribute) 

3929 .scalar_subquery(), 

3930 ), 

3931 else_=sql.null(), 

3932 ).label("collation") 

3933 

3934 relkinds = self._kind_to_relkinds(kind) 

3935 query = ( 

3936 select( 

3937 pg_catalog.pg_attribute.c.attname.label("name"), 

3938 pg_catalog.format_type( 

3939 pg_catalog.pg_attribute.c.atttypid, 

3940 pg_catalog.pg_attribute.c.atttypmod, 

3941 ).label("format_type"), 

3942 default, 

3943 pg_catalog.pg_attribute.c.attnotnull.label("not_null"), 

3944 pg_catalog.pg_class.c.relname.label("table_name"), 

3945 pg_catalog.pg_description.c.description.label("comment"), 

3946 generated, 

3947 identity, 

3948 collate, 

3949 ) 

3950 .select_from(pg_catalog.pg_class) 

3951 # NOTE: postgresql support table with no user column, meaning 

3952 # there is no row with pg_attribute.attnum > 0. use a left outer 

3953 # join to avoid filtering these tables. 

3954 .outerjoin( 

3955 pg_catalog.pg_attribute, 

3956 sql.and_( 

3957 pg_catalog.pg_class.c.oid 

3958 == pg_catalog.pg_attribute.c.attrelid, 

3959 pg_catalog.pg_attribute.c.attnum > 0, 

3960 ~pg_catalog.pg_attribute.c.attisdropped, 

3961 ), 

3962 ) 

3963 .outerjoin( 

3964 pg_catalog.pg_description, 

3965 sql.and_( 

3966 pg_catalog.pg_description.c.objoid 

3967 == pg_catalog.pg_attribute.c.attrelid, 

3968 pg_catalog.pg_description.c.objsubid 

3969 == pg_catalog.pg_attribute.c.attnum, 

3970 ), 

3971 ) 

3972 .where(self._pg_class_relkind_condition(relkinds)) 

3973 .order_by( 

3974 pg_catalog.pg_class.c.relname, pg_catalog.pg_attribute.c.attnum 

3975 ) 

3976 ) 

3977 query = self._pg_class_filter_scope_schema(query, schema, scope=scope) 

3978 if has_filter_names: 

3979 query = query.where( 

3980 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

3981 ) 

3982 return query 

3983 

3984 def get_multi_columns( 

3985 self, connection, schema, filter_names, scope, kind, **kw 

3986 ): 

3987 has_filter_names, params = self._prepare_filter_names(filter_names) 

3988 query = self._columns_query(schema, has_filter_names, scope, kind) 

3989 rows = connection.execute(query, params).mappings() 

3990 

3991 # dictionary with (name, ) if default search path or (schema, name) 

3992 # as keys 

3993 domains = { 

3994 ((d["schema"], d["name"]) if not d["visible"] else (d["name"],)): d 

3995 for d in self._load_domains( 

3996 connection, schema="*", info_cache=kw.get("info_cache") 

3997 ) 

3998 } 

3999 

4000 # dictionary with (name, ) if default search path or (schema, name) 

4001 # as keys 

4002 enums = dict( 

4003 ( 

4004 ((rec["name"],), rec) 

4005 if rec["visible"] 

4006 else ((rec["schema"], rec["name"]), rec) 

4007 ) 

4008 for rec in self._load_enums( 

4009 connection, schema="*", info_cache=kw.get("info_cache") 

4010 ) 

4011 ) 

4012 

4013 columns = self._get_columns_info(rows, domains, enums, schema) 

4014 

4015 return columns.items() 

4016 

4017 _format_type_args_pattern = re.compile(r"\((.*)\)") 

4018 _format_type_args_delim = re.compile(r"\s*,\s*") 

4019 _format_array_spec_pattern = re.compile(r"((?:\[\])*)$") 

4020 

4021 def _reflect_type( 

4022 self, 

4023 format_type: Optional[str], 

4024 domains: Dict[str, ReflectedDomain], 

4025 enums: Dict[str, ReflectedEnum], 

4026 type_description: str, 

4027 collation: Optional[str], 

4028 ) -> sqltypes.TypeEngine[Any]: 

4029 """ 

4030 Attempts to reconstruct a column type defined in ischema_names based 

4031 on the information available in the format_type. 

4032 

4033 If the `format_type` cannot be associated with a known `ischema_names`, 

4034 it is treated as a reference to a known PostgreSQL named `ENUM` or 

4035 `DOMAIN` type. 

4036 """ 

4037 type_description = type_description or "unknown type" 

4038 if format_type is None: 

4039 util.warn( 

4040 "PostgreSQL format_type() returned NULL for %s" 

4041 % type_description 

4042 ) 

4043 return sqltypes.NULLTYPE 

4044 

4045 attype_args_match = self._format_type_args_pattern.search(format_type) 

4046 if attype_args_match and attype_args_match.group(1): 

4047 attype_args = self._format_type_args_delim.split( 

4048 attype_args_match.group(1) 

4049 ) 

4050 else: 

4051 attype_args = () 

4052 

4053 match_array_dim = self._format_array_spec_pattern.search(format_type) 

4054 # Each "[]" in array specs corresponds to an array dimension 

4055 array_dim = len(match_array_dim.group(1) or "") // 2 

4056 

4057 # Remove all parameters and array specs from format_type to obtain an 

4058 # ischema_name candidate 

4059 attype = self._format_type_args_pattern.sub("", format_type) 

4060 attype = self._format_array_spec_pattern.sub("", attype) 

4061 

4062 schema_type = self.ischema_names.get(attype.lower(), None) 

4063 args, kwargs = (), {} 

4064 

4065 if attype == "numeric": 

4066 if len(attype_args) == 2: 

4067 precision, scale = map(int, attype_args) 

4068 args = (precision, scale) 

4069 

4070 elif attype == "double precision": 

4071 args = (53,) 

4072 

4073 elif attype == "integer": 

4074 args = () 

4075 

4076 elif attype in ("timestamp with time zone", "time with time zone"): 

4077 kwargs["timezone"] = True 

4078 if len(attype_args) == 1: 

4079 kwargs["precision"] = int(attype_args[0]) 

4080 

4081 elif attype in ( 

4082 "timestamp without time zone", 

4083 "time without time zone", 

4084 "time", 

4085 ): 

4086 kwargs["timezone"] = False 

4087 if len(attype_args) == 1: 

4088 kwargs["precision"] = int(attype_args[0]) 

4089 

4090 elif attype == "bit varying": 

4091 kwargs["varying"] = True 

4092 if len(attype_args) == 1: 

4093 charlen = int(attype_args[0]) 

4094 args = (charlen,) 

4095 

4096 # a domain or enum can start with interval, so be mindful of that. 

4097 elif attype == "interval" or attype.startswith("interval "): 

4098 schema_type = INTERVAL 

4099 

4100 field_match = re.match(r"interval (.+)", attype) 

4101 if field_match: 

4102 kwargs["fields"] = field_match.group(1) 

4103 

4104 if len(attype_args) == 1: 

4105 kwargs["precision"] = int(attype_args[0]) 

4106 

4107 else: 

4108 enum_or_domain_key = tuple(util.quoted_token_parser(attype)) 

4109 

4110 if enum_or_domain_key in enums: 

4111 schema_type = ENUM 

4112 enum = enums[enum_or_domain_key] 

4113 

4114 kwargs["name"] = enum["name"] 

4115 

4116 if not enum["visible"]: 

4117 kwargs["schema"] = enum["schema"] 

4118 args = tuple(enum["labels"]) 

4119 elif enum_or_domain_key in domains: 

4120 schema_type = DOMAIN 

4121 domain = domains[enum_or_domain_key] 

4122 

4123 data_type = self._reflect_type( 

4124 domain["type"], 

4125 domains, 

4126 enums, 

4127 type_description="DOMAIN '%s'" % domain["name"], 

4128 collation=domain["collation"], 

4129 ) 

4130 args = (domain["name"], data_type) 

4131 

4132 kwargs["collation"] = domain["collation"] 

4133 kwargs["default"] = domain["default"] 

4134 kwargs["not_null"] = not domain["nullable"] 

4135 kwargs["create_type"] = False 

4136 

4137 if domain["constraints"]: 

4138 # We only support a single constraint 

4139 check_constraint = domain["constraints"][0] 

4140 

4141 kwargs["constraint_name"] = check_constraint["name"] 

4142 kwargs["check"] = check_constraint["check"] 

4143 

4144 if not domain["visible"]: 

4145 kwargs["schema"] = domain["schema"] 

4146 

4147 else: 

4148 try: 

4149 charlen = int(attype_args[0]) 

4150 args = (charlen, *attype_args[1:]) 

4151 except (ValueError, IndexError): 

4152 args = attype_args 

4153 

4154 if not schema_type: 

4155 util.warn( 

4156 "Did not recognize type '%s' of %s" 

4157 % (attype, type_description) 

4158 ) 

4159 return sqltypes.NULLTYPE 

4160 

4161 if collation is not None: 

4162 kwargs["collation"] = collation 

4163 

4164 data_type = schema_type(*args, **kwargs) 

4165 if array_dim >= 1: 

4166 # postgres does not preserve dimensionality or size of array types. 

4167 data_type = _array.ARRAY(data_type) 

4168 

4169 return data_type 

4170 

4171 def _get_columns_info(self, rows, domains, enums, schema): 

4172 columns = defaultdict(list) 

4173 for row_dict in rows: 

4174 # ensure that each table has an entry, even if it has no columns 

4175 if row_dict["name"] is None: 

4176 columns[(schema, row_dict["table_name"])] = ( 

4177 ReflectionDefaults.columns() 

4178 ) 

4179 continue 

4180 table_cols = columns[(schema, row_dict["table_name"])] 

4181 

4182 collation = row_dict["collation"] 

4183 

4184 coltype = self._reflect_type( 

4185 row_dict["format_type"], 

4186 domains, 

4187 enums, 

4188 type_description="column '%s'" % row_dict["name"], 

4189 collation=collation, 

4190 ) 

4191 

4192 default = row_dict["default"] 

4193 name = row_dict["name"] 

4194 generated = row_dict["generated"] 

4195 nullable = not row_dict["not_null"] 

4196 

4197 if isinstance(coltype, DOMAIN): 

4198 if not default: 

4199 # domain can override the default value but 

4200 # can't set it to None 

4201 if coltype.default is not None: 

4202 default = coltype.default 

4203 

4204 nullable = nullable and not coltype.not_null 

4205 

4206 identity = row_dict["identity_options"] 

4207 

4208 # If a zero byte or blank string depending on driver (is also 

4209 # absent for older PG versions), then not a generated column. 

4210 # Otherwise, s = stored. (Other values might be added in the 

4211 # future.) 

4212 if generated not in (None, "", b"\x00"): 

4213 computed = dict( 

4214 sqltext=default, persisted=generated in ("s", b"s") 

4215 ) 

4216 default = None 

4217 else: 

4218 computed = None 

4219 

4220 # adjust the default value 

4221 autoincrement = False 

4222 if default is not None: 

4223 match = re.search(r"""(nextval\(')([^']+)('.*$)""", default) 

4224 if match is not None: 

4225 if issubclass(coltype._type_affinity, sqltypes.Integer): 

4226 autoincrement = True 

4227 # the default is related to a Sequence 

4228 if "." not in match.group(2) and schema is not None: 

4229 # unconditionally quote the schema name. this could 

4230 # later be enhanced to obey quoting rules / 

4231 # "quote schema" 

4232 default = ( 

4233 match.group(1) 

4234 + ('"%s"' % schema) 

4235 + "." 

4236 + match.group(2) 

4237 + match.group(3) 

4238 ) 

4239 

4240 column_info = { 

4241 "name": name, 

4242 "type": coltype, 

4243 "nullable": nullable, 

4244 "default": default, 

4245 "autoincrement": autoincrement or identity is not None, 

4246 "comment": row_dict["comment"], 

4247 } 

4248 if computed is not None: 

4249 column_info["computed"] = computed 

4250 if identity is not None: 

4251 column_info["identity"] = identity 

4252 

4253 table_cols.append(column_info) 

4254 

4255 return columns 

4256 

4257 @lru_cache() 

4258 def _table_oids_query(self, schema, has_filter_names, scope, kind): 

4259 relkinds = self._kind_to_relkinds(kind) 

4260 oid_q = select( 

4261 pg_catalog.pg_class.c.oid, pg_catalog.pg_class.c.relname 

4262 ).where(self._pg_class_relkind_condition(relkinds)) 

4263 oid_q = self._pg_class_filter_scope_schema(oid_q, schema, scope=scope) 

4264 

4265 if has_filter_names: 

4266 oid_q = oid_q.where( 

4267 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

4268 ) 

4269 return oid_q 

4270 

4271 @reflection.flexi_cache( 

4272 ("schema", InternalTraversal.dp_string), 

4273 ("filter_names", InternalTraversal.dp_string_list), 

4274 ("kind", InternalTraversal.dp_plain_obj), 

4275 ("scope", InternalTraversal.dp_plain_obj), 

4276 ) 

4277 def _get_table_oids( 

4278 self, connection, schema, filter_names, scope, kind, **kw 

4279 ): 

4280 has_filter_names, params = self._prepare_filter_names(filter_names) 

4281 oid_q = self._table_oids_query(schema, has_filter_names, scope, kind) 

4282 result = connection.execute(oid_q, params) 

4283 return result.all() 

4284 

4285 @util.memoized_property 

4286 def _constraint_query(self): 

4287 if self.server_version_info >= (11, 0): 

4288 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts 

4289 else: 

4290 indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts") 

4291 

4292 if self.server_version_info >= (15,): 

4293 indnullsnotdistinct = pg_catalog.pg_index.c.indnullsnotdistinct 

4294 else: 

4295 indnullsnotdistinct = sql.false().label("indnullsnotdistinct") 

4296 

4297 con_sq = ( 

4298 select( 

4299 pg_catalog.pg_constraint.c.conrelid, 

4300 pg_catalog.pg_constraint.c.conname, 

4301 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"), 

4302 sql.func.generate_subscripts( 

4303 pg_catalog.pg_index.c.indkey, 1 

4304 ).label("ord"), 

4305 indnkeyatts, 

4306 indnullsnotdistinct, 

4307 pg_catalog.pg_description.c.description, 

4308 ) 

4309 .join( 

4310 pg_catalog.pg_index, 

4311 pg_catalog.pg_constraint.c.conindid 

4312 == pg_catalog.pg_index.c.indexrelid, 

4313 ) 

4314 .outerjoin( 

4315 pg_catalog.pg_description, 

4316 pg_catalog.pg_description.c.objoid 

4317 == pg_catalog.pg_constraint.c.oid, 

4318 ) 

4319 .where( 

4320 pg_catalog.pg_constraint.c.contype == bindparam("contype"), 

4321 pg_catalog.pg_constraint.c.conrelid.in_(bindparam("oids")), 

4322 # NOTE: filtering also on pg_index.indrelid for oids does 

4323 # not seem to have a performance effect, but it may be an 

4324 # option if perf problems are reported 

4325 ) 

4326 .subquery("con") 

4327 ) 

4328 

4329 attr_sq = ( 

4330 select( 

4331 con_sq.c.conrelid, 

4332 con_sq.c.conname, 

4333 con_sq.c.description, 

4334 con_sq.c.ord, 

4335 con_sq.c.indnkeyatts, 

4336 con_sq.c.indnullsnotdistinct, 

4337 pg_catalog.pg_attribute.c.attname, 

4338 ) 

4339 .select_from(pg_catalog.pg_attribute) 

4340 .join( 

4341 con_sq, 

4342 sql.and_( 

4343 pg_catalog.pg_attribute.c.attnum == con_sq.c.attnum, 

4344 pg_catalog.pg_attribute.c.attrelid == con_sq.c.conrelid, 

4345 ), 

4346 ) 

4347 .where( 

4348 # NOTE: restate the condition here, since pg15 otherwise 

4349 # seems to get confused on pscopg2 sometimes, doing 

4350 # a sequential scan of pg_attribute. 

4351 # The condition in the con_sq subquery is not actually needed 

4352 # in pg15, but it may be needed in older versions. Keeping it 

4353 # does not seems to have any impact in any case. 

4354 con_sq.c.conrelid.in_(bindparam("oids")) 

4355 ) 

4356 .subquery("attr") 

4357 ) 

4358 

4359 return ( 

4360 select( 

4361 attr_sq.c.conrelid, 

4362 sql.func.array_agg( 

4363 # NOTE: cast since some postgresql derivatives may 

4364 # not support array_agg on the name type 

4365 aggregate_order_by( 

4366 attr_sq.c.attname.cast(TEXT), attr_sq.c.ord 

4367 ) 

4368 ).label("cols"), 

4369 attr_sq.c.conname, 

4370 sql.func.min(attr_sq.c.description).label("description"), 

4371 sql.func.min(attr_sq.c.indnkeyatts).label("indnkeyatts"), 

4372 sql.func.bool_and(attr_sq.c.indnullsnotdistinct).label( 

4373 "indnullsnotdistinct" 

4374 ), 

4375 ) 

4376 .group_by(attr_sq.c.conrelid, attr_sq.c.conname) 

4377 .order_by(attr_sq.c.conrelid, attr_sq.c.conname) 

4378 ) 

4379 

4380 def _reflect_constraint( 

4381 self, connection, contype, schema, filter_names, scope, kind, **kw 

4382 ): 

4383 # used to reflect primary and unique constraint 

4384 table_oids = self._get_table_oids( 

4385 connection, schema, filter_names, scope, kind, **kw 

4386 ) 

4387 batches = list(table_oids) 

4388 is_unique = contype == "u" 

4389 

4390 while batches: 

4391 batch = batches[0:3000] 

4392 batches[0:3000] = [] 

4393 

4394 result = connection.execute( 

4395 self._constraint_query, 

4396 {"oids": [r[0] for r in batch], "contype": contype}, 

4397 ).mappings() 

4398 

4399 result_by_oid = defaultdict(list) 

4400 for row_dict in result: 

4401 result_by_oid[row_dict["conrelid"]].append(row_dict) 

4402 

4403 for oid, tablename in batch: 

4404 for_oid = result_by_oid.get(oid, ()) 

4405 if for_oid: 

4406 for row in for_oid: 

4407 # See note in get_multi_indexes 

4408 all_cols = row["cols"] 

4409 indnkeyatts = row["indnkeyatts"] 

4410 if len(all_cols) > indnkeyatts: 

4411 inc_cols = all_cols[indnkeyatts:] 

4412 cst_cols = all_cols[:indnkeyatts] 

4413 else: 

4414 inc_cols = [] 

4415 cst_cols = all_cols 

4416 

4417 opts = {} 

4418 if self.server_version_info >= (11,): 

4419 opts["postgresql_include"] = inc_cols 

4420 if is_unique: 

4421 opts["postgresql_nulls_not_distinct"] = row[ 

4422 "indnullsnotdistinct" 

4423 ] 

4424 yield ( 

4425 tablename, 

4426 cst_cols, 

4427 row["conname"], 

4428 row["description"], 

4429 opts, 

4430 ) 

4431 else: 

4432 yield tablename, None, None, None, None 

4433 

4434 @reflection.cache 

4435 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

4436 data = self.get_multi_pk_constraint( 

4437 connection, 

4438 schema=schema, 

4439 filter_names=[table_name], 

4440 scope=ObjectScope.ANY, 

4441 kind=ObjectKind.ANY, 

4442 **kw, 

4443 ) 

4444 return self._value_or_raise(data, table_name, schema) 

4445 

4446 def get_multi_pk_constraint( 

4447 self, connection, schema, filter_names, scope, kind, **kw 

4448 ): 

4449 result = self._reflect_constraint( 

4450 connection, "p", schema, filter_names, scope, kind, **kw 

4451 ) 

4452 

4453 # only a single pk can be present for each table. Return an entry 

4454 # even if a table has no primary key 

4455 default = ReflectionDefaults.pk_constraint 

4456 

4457 def pk_constraint(pk_name, cols, comment, opts): 

4458 info = { 

4459 "constrained_columns": cols, 

4460 "name": pk_name, 

4461 "comment": comment, 

4462 } 

4463 if opts: 

4464 info["dialect_options"] = opts 

4465 return info 

4466 

4467 return ( 

4468 ( 

4469 (schema, table_name), 

4470 ( 

4471 pk_constraint(pk_name, cols, comment, opts) 

4472 if pk_name is not None 

4473 else default() 

4474 ), 

4475 ) 

4476 for table_name, cols, pk_name, comment, opts in result 

4477 ) 

4478 

4479 @reflection.cache 

4480 def get_foreign_keys( 

4481 self, 

4482 connection, 

4483 table_name, 

4484 schema=None, 

4485 postgresql_ignore_search_path=False, 

4486 **kw, 

4487 ): 

4488 data = self.get_multi_foreign_keys( 

4489 connection, 

4490 schema=schema, 

4491 filter_names=[table_name], 

4492 postgresql_ignore_search_path=postgresql_ignore_search_path, 

4493 scope=ObjectScope.ANY, 

4494 kind=ObjectKind.ANY, 

4495 **kw, 

4496 ) 

4497 return self._value_or_raise(data, table_name, schema) 

4498 

4499 @lru_cache() 

4500 def _foreing_key_query(self, schema, has_filter_names, scope, kind): 

4501 pg_class_ref = pg_catalog.pg_class.alias("cls_ref") 

4502 pg_namespace_ref = pg_catalog.pg_namespace.alias("nsp_ref") 

4503 relkinds = self._kind_to_relkinds(kind) 

4504 query = ( 

4505 select( 

4506 pg_catalog.pg_class.c.relname, 

4507 pg_catalog.pg_constraint.c.conname, 

4508 # NOTE: avoid calling pg_get_constraintdef when not needed 

4509 # to speed up the query 

4510 sql.case( 

4511 ( 

4512 pg_catalog.pg_constraint.c.oid.is_not(None), 

4513 pg_catalog.pg_get_constraintdef( 

4514 pg_catalog.pg_constraint.c.oid, True 

4515 ), 

4516 ), 

4517 else_=None, 

4518 ), 

4519 pg_namespace_ref.c.nspname, 

4520 pg_catalog.pg_description.c.description, 

4521 ) 

4522 .select_from(pg_catalog.pg_class) 

4523 .outerjoin( 

4524 pg_catalog.pg_constraint, 

4525 sql.and_( 

4526 pg_catalog.pg_class.c.oid 

4527 == pg_catalog.pg_constraint.c.conrelid, 

4528 pg_catalog.pg_constraint.c.contype == "f", 

4529 ), 

4530 ) 

4531 .outerjoin( 

4532 pg_class_ref, 

4533 pg_class_ref.c.oid == pg_catalog.pg_constraint.c.confrelid, 

4534 ) 

4535 .outerjoin( 

4536 pg_namespace_ref, 

4537 pg_class_ref.c.relnamespace == pg_namespace_ref.c.oid, 

4538 ) 

4539 .outerjoin( 

4540 pg_catalog.pg_description, 

4541 pg_catalog.pg_description.c.objoid 

4542 == pg_catalog.pg_constraint.c.oid, 

4543 ) 

4544 .order_by( 

4545 pg_catalog.pg_class.c.relname, 

4546 pg_catalog.pg_constraint.c.conname, 

4547 ) 

4548 .where(self._pg_class_relkind_condition(relkinds)) 

4549 ) 

4550 query = self._pg_class_filter_scope_schema(query, schema, scope) 

4551 if has_filter_names: 

4552 query = query.where( 

4553 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

4554 ) 

4555 return query 

4556 

4557 @util.memoized_property 

4558 def _fk_regex_pattern(self): 

4559 # optionally quoted token 

4560 qtoken = r'(?:"(?:[^"]|"")+"|[\w]+?)' 

4561 

4562 # https://www.postgresql.org/docs/current/static/sql-createtable.html 

4563 return re.compile( 

4564 r"FOREIGN KEY \((.*?)\) " 

4565 rf"REFERENCES (?:({qtoken})\.)?({qtoken})\(((?:{qtoken}(?: *, *)?)+)\)" # noqa: E501 

4566 r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?" 

4567 r"[\s]?(?:ON (UPDATE|DELETE) " 

4568 r"(CASCADE|RESTRICT|NO ACTION|" 

4569 r"SET (?:NULL|DEFAULT)(?:\s\(.+\))?)+)?" 

4570 r"[\s]?(?:ON (UPDATE|DELETE) " 

4571 r"(CASCADE|RESTRICT|NO ACTION|" 

4572 r"SET (?:NULL|DEFAULT)(?:\s\(.+\))?)+)?" 

4573 r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?" 

4574 r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?" 

4575 ) 

4576 

4577 def _parse_fk(self, condef): 

4578 FK_REGEX = self._fk_regex_pattern 

4579 m = re.search(FK_REGEX, condef).groups() 

4580 

4581 ( 

4582 constrained_columns, 

4583 referred_schema, 

4584 referred_table, 

4585 referred_columns, 

4586 _, 

4587 match, 

4588 upddelkey1, 

4589 upddelval1, 

4590 upddelkey2, 

4591 upddelval2, 

4592 deferrable, 

4593 _, 

4594 initially, 

4595 ) = m 

4596 

4597 onupdate = ( 

4598 upddelval1 

4599 if upddelkey1 == "UPDATE" 

4600 else upddelval2 if upddelkey2 == "UPDATE" else None 

4601 ) 

4602 ondelete = ( 

4603 upddelval1 

4604 if upddelkey1 == "DELETE" 

4605 else upddelval2 if upddelkey2 == "DELETE" else None 

4606 ) 

4607 

4608 return ( 

4609 constrained_columns, 

4610 referred_schema, 

4611 referred_table, 

4612 referred_columns, 

4613 match, 

4614 onupdate, 

4615 ondelete, 

4616 deferrable, 

4617 initially, 

4618 ) 

4619 

4620 def get_multi_foreign_keys( 

4621 self, 

4622 connection, 

4623 schema, 

4624 filter_names, 

4625 scope, 

4626 kind, 

4627 postgresql_ignore_search_path=False, 

4628 **kw, 

4629 ): 

4630 preparer = self.identifier_preparer 

4631 

4632 has_filter_names, params = self._prepare_filter_names(filter_names) 

4633 query = self._foreing_key_query(schema, has_filter_names, scope, kind) 

4634 result = connection.execute(query, params) 

4635 

4636 fkeys = defaultdict(list) 

4637 default = ReflectionDefaults.foreign_keys 

4638 for table_name, conname, condef, conschema, comment in result: 

4639 # ensure that each table has an entry, even if it has 

4640 # no foreign keys 

4641 if conname is None: 

4642 fkeys[(schema, table_name)] = default() 

4643 continue 

4644 table_fks = fkeys[(schema, table_name)] 

4645 

4646 ( 

4647 constrained_columns, 

4648 referred_schema, 

4649 referred_table, 

4650 referred_columns, 

4651 match, 

4652 onupdate, 

4653 ondelete, 

4654 deferrable, 

4655 initially, 

4656 ) = self._parse_fk(condef) 

4657 

4658 if deferrable is not None: 

4659 deferrable = True if deferrable == "DEFERRABLE" else False 

4660 constrained_columns = [ 

4661 preparer._unquote_identifier(x) 

4662 for x in re.split(r"\s*,\s*", constrained_columns) 

4663 ] 

4664 

4665 if postgresql_ignore_search_path: 

4666 # when ignoring search path, we use the actual schema 

4667 # provided it isn't the "default" schema 

4668 if conschema != self.default_schema_name: 

4669 referred_schema = conschema 

4670 else: 

4671 referred_schema = schema 

4672 elif referred_schema: 

4673 # referred_schema is the schema that we regexp'ed from 

4674 # pg_get_constraintdef(). If the schema is in the search 

4675 # path, pg_get_constraintdef() will give us None. 

4676 referred_schema = preparer._unquote_identifier(referred_schema) 

4677 elif schema is not None and schema == conschema: 

4678 # If the actual schema matches the schema of the table 

4679 # we're reflecting, then we will use that. 

4680 referred_schema = schema 

4681 

4682 referred_table = preparer._unquote_identifier(referred_table) 

4683 referred_columns = [ 

4684 preparer._unquote_identifier(x) 

4685 for x in re.split(r"\s*,\s", referred_columns) 

4686 ] 

4687 options = { 

4688 k: v 

4689 for k, v in [ 

4690 ("onupdate", onupdate), 

4691 ("ondelete", ondelete), 

4692 ("initially", initially), 

4693 ("deferrable", deferrable), 

4694 ("match", match), 

4695 ] 

4696 if v is not None and v != "NO ACTION" 

4697 } 

4698 fkey_d = { 

4699 "name": conname, 

4700 "constrained_columns": constrained_columns, 

4701 "referred_schema": referred_schema, 

4702 "referred_table": referred_table, 

4703 "referred_columns": referred_columns, 

4704 "options": options, 

4705 "comment": comment, 

4706 } 

4707 table_fks.append(fkey_d) 

4708 return fkeys.items() 

4709 

4710 @reflection.cache 

4711 def get_indexes(self, connection, table_name, schema=None, **kw): 

4712 data = self.get_multi_indexes( 

4713 connection, 

4714 schema=schema, 

4715 filter_names=[table_name], 

4716 scope=ObjectScope.ANY, 

4717 kind=ObjectKind.ANY, 

4718 **kw, 

4719 ) 

4720 return self._value_or_raise(data, table_name, schema) 

4721 

4722 @util.memoized_property 

4723 def _index_query(self): 

4724 # NOTE: pg_index is used as from two times to improve performance, 

4725 # since extraing all the index information from `idx_sq` to avoid 

4726 # the second pg_index use leads to a worse performing query in 

4727 # particular when querying for a single table (as of pg 17) 

4728 # NOTE: repeating oids clause improve query performance 

4729 

4730 # subquery to get the columns 

4731 idx_sq = ( 

4732 select( 

4733 pg_catalog.pg_index.c.indexrelid, 

4734 pg_catalog.pg_index.c.indrelid, 

4735 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"), 

4736 sql.func.unnest(pg_catalog.pg_index.c.indclass).label( 

4737 "att_opclass" 

4738 ), 

4739 sql.func.generate_subscripts( 

4740 pg_catalog.pg_index.c.indkey, 1 

4741 ).label("ord"), 

4742 ) 

4743 .where( 

4744 ~pg_catalog.pg_index.c.indisprimary, 

4745 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), 

4746 ) 

4747 .subquery("idx") 

4748 ) 

4749 

4750 attr_sq = ( 

4751 select( 

4752 idx_sq.c.indexrelid, 

4753 idx_sq.c.indrelid, 

4754 idx_sq.c.ord, 

4755 # NOTE: always using pg_get_indexdef is too slow so just 

4756 # invoke when the element is an expression 

4757 sql.case( 

4758 ( 

4759 idx_sq.c.attnum == 0, 

4760 pg_catalog.pg_get_indexdef( 

4761 idx_sq.c.indexrelid, idx_sq.c.ord + 1, True 

4762 ), 

4763 ), 

4764 # NOTE: need to cast this since attname is of type "name" 

4765 # that's limited to 63 bytes, while pg_get_indexdef 

4766 # returns "text" so its output may get cut 

4767 else_=pg_catalog.pg_attribute.c.attname.cast(TEXT), 

4768 ).label("element"), 

4769 (idx_sq.c.attnum == 0).label("is_expr"), 

4770 pg_catalog.pg_opclass.c.opcname, 

4771 pg_catalog.pg_opclass.c.opcdefault, 

4772 ) 

4773 .select_from(idx_sq) 

4774 .outerjoin( 

4775 # do not remove rows where idx_sq.c.attnum is 0 

4776 pg_catalog.pg_attribute, 

4777 sql.and_( 

4778 pg_catalog.pg_attribute.c.attnum == idx_sq.c.attnum, 

4779 pg_catalog.pg_attribute.c.attrelid == idx_sq.c.indrelid, 

4780 ), 

4781 ) 

4782 .outerjoin( 

4783 pg_catalog.pg_opclass, 

4784 pg_catalog.pg_opclass.c.oid == idx_sq.c.att_opclass, 

4785 ) 

4786 .where(idx_sq.c.indrelid.in_(bindparam("oids"))) 

4787 .subquery("idx_attr") 

4788 ) 

4789 

4790 cols_sq = ( 

4791 select( 

4792 attr_sq.c.indexrelid, 

4793 sql.func.min(attr_sq.c.indrelid), 

4794 sql.func.array_agg( 

4795 aggregate_order_by(attr_sq.c.element, attr_sq.c.ord) 

4796 ).label("elements"), 

4797 sql.func.array_agg( 

4798 aggregate_order_by(attr_sq.c.is_expr, attr_sq.c.ord) 

4799 ).label("elements_is_expr"), 

4800 sql.func.array_agg( 

4801 aggregate_order_by(attr_sq.c.opcname, attr_sq.c.ord) 

4802 ).label("elements_opclass"), 

4803 sql.func.array_agg( 

4804 aggregate_order_by(attr_sq.c.opcdefault, attr_sq.c.ord) 

4805 ).label("elements_opdefault"), 

4806 ) 

4807 .group_by(attr_sq.c.indexrelid) 

4808 .subquery("idx_cols") 

4809 ) 

4810 

4811 if self.server_version_info >= (11, 0): 

4812 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts 

4813 else: 

4814 indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts") 

4815 

4816 if self.server_version_info >= (15,): 

4817 nulls_not_distinct = pg_catalog.pg_index.c.indnullsnotdistinct 

4818 else: 

4819 nulls_not_distinct = sql.false().label("indnullsnotdistinct") 

4820 

4821 return ( 

4822 select( 

4823 pg_catalog.pg_index.c.indrelid, 

4824 pg_catalog.pg_class.c.relname, 

4825 pg_catalog.pg_index.c.indisunique, 

4826 pg_catalog.pg_constraint.c.conrelid.is_not(None).label( 

4827 "has_constraint" 

4828 ), 

4829 pg_catalog.pg_index.c.indoption, 

4830 pg_catalog.pg_class.c.reloptions, 

4831 pg_catalog.pg_am.c.amname, 

4832 # NOTE: pg_get_expr is very fast so this case has almost no 

4833 # performance impact 

4834 sql.case( 

4835 ( 

4836 pg_catalog.pg_index.c.indpred.is_not(None), 

4837 pg_catalog.pg_get_expr( 

4838 pg_catalog.pg_index.c.indpred, 

4839 pg_catalog.pg_index.c.indrelid, 

4840 ), 

4841 ), 

4842 else_=None, 

4843 ).label("filter_definition"), 

4844 indnkeyatts, 

4845 nulls_not_distinct, 

4846 cols_sq.c.elements, 

4847 cols_sq.c.elements_is_expr, 

4848 cols_sq.c.elements_opclass, 

4849 cols_sq.c.elements_opdefault, 

4850 ) 

4851 .select_from(pg_catalog.pg_index) 

4852 .where( 

4853 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), 

4854 ~pg_catalog.pg_index.c.indisprimary, 

4855 ) 

4856 .join( 

4857 pg_catalog.pg_class, 

4858 pg_catalog.pg_index.c.indexrelid == pg_catalog.pg_class.c.oid, 

4859 ) 

4860 .join( 

4861 pg_catalog.pg_am, 

4862 pg_catalog.pg_class.c.relam == pg_catalog.pg_am.c.oid, 

4863 ) 

4864 .outerjoin( 

4865 cols_sq, 

4866 pg_catalog.pg_index.c.indexrelid == cols_sq.c.indexrelid, 

4867 ) 

4868 .outerjoin( 

4869 pg_catalog.pg_constraint, 

4870 sql.and_( 

4871 pg_catalog.pg_index.c.indrelid 

4872 == pg_catalog.pg_constraint.c.conrelid, 

4873 pg_catalog.pg_index.c.indexrelid 

4874 == pg_catalog.pg_constraint.c.conindid, 

4875 pg_catalog.pg_constraint.c.contype 

4876 == sql.any_(_array.array(("p", "u", "x"))), 

4877 ), 

4878 ) 

4879 .order_by( 

4880 pg_catalog.pg_index.c.indrelid, pg_catalog.pg_class.c.relname 

4881 ) 

4882 ) 

4883 

4884 def get_multi_indexes( 

4885 self, connection, schema, filter_names, scope, kind, **kw 

4886 ): 

4887 table_oids = self._get_table_oids( 

4888 connection, schema, filter_names, scope, kind, **kw 

4889 ) 

4890 

4891 indexes = defaultdict(list) 

4892 default = ReflectionDefaults.indexes 

4893 

4894 batches = list(table_oids) 

4895 

4896 while batches: 

4897 batch = batches[0:3000] 

4898 batches[0:3000] = [] 

4899 

4900 result = connection.execute( 

4901 self._index_query, {"oids": [r[0] for r in batch]} 

4902 ).mappings() 

4903 

4904 result_by_oid = defaultdict(list) 

4905 for row_dict in result: 

4906 result_by_oid[row_dict["indrelid"]].append(row_dict) 

4907 

4908 for oid, table_name in batch: 

4909 if oid not in result_by_oid: 

4910 # ensure that each table has an entry, even if reflection 

4911 # is skipped because not supported 

4912 indexes[(schema, table_name)] = default() 

4913 continue 

4914 

4915 for row in result_by_oid[oid]: 

4916 index_name = row["relname"] 

4917 

4918 table_indexes = indexes[(schema, table_name)] 

4919 

4920 all_elements = row["elements"] 

4921 all_elements_is_expr = row["elements_is_expr"] 

4922 all_elements_opclass = row["elements_opclass"] 

4923 all_elements_opdefault = row["elements_opdefault"] 

4924 indnkeyatts = row["indnkeyatts"] 

4925 # "The number of key columns in the index, not counting any 

4926 # included columns, which are merely stored and do not 

4927 # participate in the index semantics" 

4928 if len(all_elements) > indnkeyatts: 

4929 # this is a "covering index" which has INCLUDE columns 

4930 # as well as regular index columns 

4931 inc_cols = all_elements[indnkeyatts:] 

4932 idx_elements = all_elements[:indnkeyatts] 

4933 idx_elements_is_expr = all_elements_is_expr[ 

4934 :indnkeyatts 

4935 ] 

4936 # postgresql does not support expression on included 

4937 # columns as of v14: "ERROR: expressions are not 

4938 # supported in included columns". 

4939 assert all( 

4940 not is_expr 

4941 for is_expr in all_elements_is_expr[indnkeyatts:] 

4942 ) 

4943 idx_elements_opclass = all_elements_opclass[ 

4944 :indnkeyatts 

4945 ] 

4946 idx_elements_opdefault = all_elements_opdefault[ 

4947 :indnkeyatts 

4948 ] 

4949 else: 

4950 idx_elements = all_elements 

4951 idx_elements_is_expr = all_elements_is_expr 

4952 inc_cols = [] 

4953 idx_elements_opclass = all_elements_opclass 

4954 idx_elements_opdefault = all_elements_opdefault 

4955 

4956 index = {"name": index_name, "unique": row["indisunique"]} 

4957 if any(idx_elements_is_expr): 

4958 index["column_names"] = [ 

4959 None if is_expr else expr 

4960 for expr, is_expr in zip( 

4961 idx_elements, idx_elements_is_expr 

4962 ) 

4963 ] 

4964 index["expressions"] = idx_elements 

4965 else: 

4966 index["column_names"] = idx_elements 

4967 

4968 dialect_options = {} 

4969 

4970 if not all(idx_elements_opdefault): 

4971 dialect_options["postgresql_ops"] = { 

4972 name: opclass 

4973 for name, opclass, is_default in zip( 

4974 idx_elements, 

4975 idx_elements_opclass, 

4976 idx_elements_opdefault, 

4977 ) 

4978 if not is_default 

4979 } 

4980 

4981 sorting = {} 

4982 for col_index, col_flags in enumerate(row["indoption"]): 

4983 col_sorting = () 

4984 # try to set flags only if they differ from PG 

4985 # defaults... 

4986 if col_flags & 0x01: 

4987 col_sorting += ("desc",) 

4988 if not (col_flags & 0x02): 

4989 col_sorting += ("nulls_last",) 

4990 else: 

4991 if col_flags & 0x02: 

4992 col_sorting += ("nulls_first",) 

4993 if col_sorting: 

4994 sorting[idx_elements[col_index]] = col_sorting 

4995 if sorting: 

4996 index["column_sorting"] = sorting 

4997 if row["has_constraint"]: 

4998 index["duplicates_constraint"] = index_name 

4999 

5000 if row["reloptions"]: 

5001 dialect_options["postgresql_with"] = dict( 

5002 [ 

5003 option.split("=", 1) 

5004 for option in row["reloptions"] 

5005 ] 

5006 ) 

5007 # it *might* be nice to include that this is 'btree' in the 

5008 # reflection info. But we don't want an Index object 

5009 # to have a ``postgresql_using`` in it that is just the 

5010 # default, so for the moment leaving this out. 

5011 amname = row["amname"] 

5012 if amname != "btree": 

5013 dialect_options["postgresql_using"] = row["amname"] 

5014 if row["filter_definition"]: 

5015 dialect_options["postgresql_where"] = row[ 

5016 "filter_definition" 

5017 ] 

5018 if self.server_version_info >= (11,): 

5019 # NOTE: this is legacy, this is part of 

5020 # dialect_options now as of #7382 

5021 index["include_columns"] = inc_cols 

5022 dialect_options["postgresql_include"] = inc_cols 

5023 if row["indnullsnotdistinct"]: 

5024 # the default is False, so ignore it. 

5025 dialect_options["postgresql_nulls_not_distinct"] = row[ 

5026 "indnullsnotdistinct" 

5027 ] 

5028 

5029 if dialect_options: 

5030 index["dialect_options"] = dialect_options 

5031 

5032 table_indexes.append(index) 

5033 return indexes.items() 

5034 

5035 @reflection.cache 

5036 def get_unique_constraints( 

5037 self, connection, table_name, schema=None, **kw 

5038 ): 

5039 data = self.get_multi_unique_constraints( 

5040 connection, 

5041 schema=schema, 

5042 filter_names=[table_name], 

5043 scope=ObjectScope.ANY, 

5044 kind=ObjectKind.ANY, 

5045 **kw, 

5046 ) 

5047 return self._value_or_raise(data, table_name, schema) 

5048 

5049 def get_multi_unique_constraints( 

5050 self, 

5051 connection, 

5052 schema, 

5053 filter_names, 

5054 scope, 

5055 kind, 

5056 **kw, 

5057 ): 

5058 result = self._reflect_constraint( 

5059 connection, "u", schema, filter_names, scope, kind, **kw 

5060 ) 

5061 

5062 # each table can have multiple unique constraints 

5063 uniques = defaultdict(list) 

5064 default = ReflectionDefaults.unique_constraints 

5065 for table_name, cols, con_name, comment, options in result: 

5066 # ensure a list is created for each table. leave it empty if 

5067 # the table has no unique constraint 

5068 if con_name is None: 

5069 uniques[(schema, table_name)] = default() 

5070 continue 

5071 

5072 uc_dict = { 

5073 "column_names": cols, 

5074 "name": con_name, 

5075 "comment": comment, 

5076 } 

5077 if options: 

5078 uc_dict["dialect_options"] = options 

5079 

5080 uniques[(schema, table_name)].append(uc_dict) 

5081 return uniques.items() 

5082 

5083 @reflection.cache 

5084 def get_table_comment(self, connection, table_name, schema=None, **kw): 

5085 data = self.get_multi_table_comment( 

5086 connection, 

5087 schema, 

5088 [table_name], 

5089 scope=ObjectScope.ANY, 

5090 kind=ObjectKind.ANY, 

5091 **kw, 

5092 ) 

5093 return self._value_or_raise(data, table_name, schema) 

5094 

5095 @lru_cache() 

5096 def _comment_query(self, schema, has_filter_names, scope, kind): 

5097 relkinds = self._kind_to_relkinds(kind) 

5098 query = ( 

5099 select( 

5100 pg_catalog.pg_class.c.relname, 

5101 pg_catalog.pg_description.c.description, 

5102 ) 

5103 .select_from(pg_catalog.pg_class) 

5104 .outerjoin( 

5105 pg_catalog.pg_description, 

5106 sql.and_( 

5107 pg_catalog.pg_class.c.oid 

5108 == pg_catalog.pg_description.c.objoid, 

5109 pg_catalog.pg_description.c.objsubid == 0, 

5110 pg_catalog.pg_description.c.classoid 

5111 == sql.func.cast("pg_catalog.pg_class", REGCLASS), 

5112 ), 

5113 ) 

5114 .where(self._pg_class_relkind_condition(relkinds)) 

5115 ) 

5116 query = self._pg_class_filter_scope_schema(query, schema, scope) 

5117 if has_filter_names: 

5118 query = query.where( 

5119 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

5120 ) 

5121 return query 

5122 

5123 def get_multi_table_comment( 

5124 self, connection, schema, filter_names, scope, kind, **kw 

5125 ): 

5126 has_filter_names, params = self._prepare_filter_names(filter_names) 

5127 query = self._comment_query(schema, has_filter_names, scope, kind) 

5128 result = connection.execute(query, params) 

5129 

5130 default = ReflectionDefaults.table_comment 

5131 return ( 

5132 ( 

5133 (schema, table), 

5134 {"text": comment} if comment is not None else default(), 

5135 ) 

5136 for table, comment in result 

5137 ) 

5138 

5139 @reflection.cache 

5140 def get_check_constraints(self, connection, table_name, schema=None, **kw): 

5141 data = self.get_multi_check_constraints( 

5142 connection, 

5143 schema, 

5144 [table_name], 

5145 scope=ObjectScope.ANY, 

5146 kind=ObjectKind.ANY, 

5147 **kw, 

5148 ) 

5149 return self._value_or_raise(data, table_name, schema) 

5150 

5151 @lru_cache() 

5152 def _check_constraint_query(self, schema, has_filter_names, scope, kind): 

5153 relkinds = self._kind_to_relkinds(kind) 

5154 query = ( 

5155 select( 

5156 pg_catalog.pg_class.c.relname, 

5157 pg_catalog.pg_constraint.c.conname, 

5158 # NOTE: avoid calling pg_get_constraintdef when not needed 

5159 # to speed up the query 

5160 sql.case( 

5161 ( 

5162 pg_catalog.pg_constraint.c.oid.is_not(None), 

5163 pg_catalog.pg_get_constraintdef( 

5164 pg_catalog.pg_constraint.c.oid, True 

5165 ), 

5166 ), 

5167 else_=None, 

5168 ), 

5169 pg_catalog.pg_description.c.description, 

5170 ) 

5171 .select_from(pg_catalog.pg_class) 

5172 .outerjoin( 

5173 pg_catalog.pg_constraint, 

5174 sql.and_( 

5175 pg_catalog.pg_class.c.oid 

5176 == pg_catalog.pg_constraint.c.conrelid, 

5177 pg_catalog.pg_constraint.c.contype == "c", 

5178 ), 

5179 ) 

5180 .outerjoin( 

5181 pg_catalog.pg_description, 

5182 pg_catalog.pg_description.c.objoid 

5183 == pg_catalog.pg_constraint.c.oid, 

5184 ) 

5185 .order_by( 

5186 pg_catalog.pg_class.c.relname, 

5187 pg_catalog.pg_constraint.c.conname, 

5188 ) 

5189 .where(self._pg_class_relkind_condition(relkinds)) 

5190 ) 

5191 query = self._pg_class_filter_scope_schema(query, schema, scope) 

5192 if has_filter_names: 

5193 query = query.where( 

5194 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

5195 ) 

5196 return query 

5197 

5198 def get_multi_check_constraints( 

5199 self, connection, schema, filter_names, scope, kind, **kw 

5200 ): 

5201 has_filter_names, params = self._prepare_filter_names(filter_names) 

5202 query = self._check_constraint_query( 

5203 schema, has_filter_names, scope, kind 

5204 ) 

5205 result = connection.execute(query, params) 

5206 

5207 check_constraints = defaultdict(list) 

5208 default = ReflectionDefaults.check_constraints 

5209 for table_name, check_name, src, comment in result: 

5210 # only two cases for check_name and src: both null or both defined 

5211 if check_name is None and src is None: 

5212 check_constraints[(schema, table_name)] = default() 

5213 continue 

5214 # samples: 

5215 # "CHECK (((a > 1) AND (a < 5)))" 

5216 # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))" 

5217 # "CHECK (((a > 1) AND (a < 5))) NOT VALID" 

5218 # "CHECK (some_boolean_function(a))" 

5219 # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)" 

5220 # "CHECK (a NOT NULL) NO INHERIT" 

5221 # "CHECK (a NOT NULL) NO INHERIT NOT VALID" 

5222 

5223 m = re.match( 

5224 r"^CHECK *\((.+)\)( NO INHERIT)?( NOT VALID)?$", 

5225 src, 

5226 flags=re.DOTALL, 

5227 ) 

5228 if not m: 

5229 util.warn("Could not parse CHECK constraint text: %r" % src) 

5230 sqltext = "" 

5231 else: 

5232 sqltext = re.compile( 

5233 r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL 

5234 ).sub(r"\1", m.group(1)) 

5235 entry = { 

5236 "name": check_name, 

5237 "sqltext": sqltext, 

5238 "comment": comment, 

5239 } 

5240 if m: 

5241 do = {} 

5242 if " NOT VALID" in m.groups(): 

5243 do["not_valid"] = True 

5244 if " NO INHERIT" in m.groups(): 

5245 do["no_inherit"] = True 

5246 if do: 

5247 entry["dialect_options"] = do 

5248 

5249 check_constraints[(schema, table_name)].append(entry) 

5250 return check_constraints.items() 

5251 

5252 def _pg_type_filter_schema(self, query, schema): 

5253 if schema is None: 

5254 query = query.where( 

5255 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), 

5256 # ignore pg_catalog schema 

5257 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

5258 ) 

5259 elif schema != "*": 

5260 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

5261 return query 

5262 

5263 @lru_cache() 

5264 def _enum_query(self, schema): 

5265 lbl_agg_sq = ( 

5266 select( 

5267 pg_catalog.pg_enum.c.enumtypid, 

5268 sql.func.array_agg( 

5269 aggregate_order_by( 

5270 # NOTE: cast since some postgresql derivatives may 

5271 # not support array_agg on the name type 

5272 pg_catalog.pg_enum.c.enumlabel.cast(TEXT), 

5273 pg_catalog.pg_enum.c.enumsortorder, 

5274 ) 

5275 ).label("labels"), 

5276 ) 

5277 .group_by(pg_catalog.pg_enum.c.enumtypid) 

5278 .subquery("lbl_agg") 

5279 ) 

5280 

5281 query = ( 

5282 select( 

5283 pg_catalog.pg_type.c.typname.label("name"), 

5284 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( 

5285 "visible" 

5286 ), 

5287 pg_catalog.pg_namespace.c.nspname.label("schema"), 

5288 lbl_agg_sq.c.labels.label("labels"), 

5289 ) 

5290 .join( 

5291 pg_catalog.pg_namespace, 

5292 pg_catalog.pg_namespace.c.oid 

5293 == pg_catalog.pg_type.c.typnamespace, 

5294 ) 

5295 .outerjoin( 

5296 lbl_agg_sq, pg_catalog.pg_type.c.oid == lbl_agg_sq.c.enumtypid 

5297 ) 

5298 .where(pg_catalog.pg_type.c.typtype == "e") 

5299 .order_by( 

5300 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname 

5301 ) 

5302 ) 

5303 

5304 return self._pg_type_filter_schema(query, schema) 

5305 

5306 @reflection.cache 

5307 def _load_enums(self, connection, schema=None, **kw): 

5308 if not self.supports_native_enum: 

5309 return [] 

5310 

5311 result = connection.execute(self._enum_query(schema)) 

5312 

5313 enums = [] 

5314 for name, visible, schema, labels in result: 

5315 enums.append( 

5316 { 

5317 "name": name, 

5318 "schema": schema, 

5319 "visible": visible, 

5320 "labels": [] if labels is None else labels, 

5321 } 

5322 ) 

5323 return enums 

5324 

5325 @lru_cache() 

5326 def _domain_query(self, schema): 

5327 con_sq = ( 

5328 select( 

5329 pg_catalog.pg_constraint.c.contypid, 

5330 sql.func.array_agg( 

5331 pg_catalog.pg_get_constraintdef( 

5332 pg_catalog.pg_constraint.c.oid, True 

5333 ) 

5334 ).label("condefs"), 

5335 sql.func.array_agg( 

5336 # NOTE: cast since some postgresql derivatives may 

5337 # not support array_agg on the name type 

5338 pg_catalog.pg_constraint.c.conname.cast(TEXT) 

5339 ).label("connames"), 

5340 ) 

5341 # The domain this constraint is on; zero if not a domain constraint 

5342 .where(pg_catalog.pg_constraint.c.contypid != 0) 

5343 .group_by(pg_catalog.pg_constraint.c.contypid) 

5344 .subquery("domain_constraints") 

5345 ) 

5346 

5347 query = ( 

5348 select( 

5349 pg_catalog.pg_type.c.typname.label("name"), 

5350 pg_catalog.format_type( 

5351 pg_catalog.pg_type.c.typbasetype, 

5352 pg_catalog.pg_type.c.typtypmod, 

5353 ).label("attype"), 

5354 (~pg_catalog.pg_type.c.typnotnull).label("nullable"), 

5355 pg_catalog.pg_type.c.typdefault.label("default"), 

5356 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( 

5357 "visible" 

5358 ), 

5359 pg_catalog.pg_namespace.c.nspname.label("schema"), 

5360 con_sq.c.condefs, 

5361 con_sq.c.connames, 

5362 pg_catalog.pg_collation.c.collname, 

5363 ) 

5364 .join( 

5365 pg_catalog.pg_namespace, 

5366 pg_catalog.pg_namespace.c.oid 

5367 == pg_catalog.pg_type.c.typnamespace, 

5368 ) 

5369 .outerjoin( 

5370 pg_catalog.pg_collation, 

5371 pg_catalog.pg_type.c.typcollation 

5372 == pg_catalog.pg_collation.c.oid, 

5373 ) 

5374 .outerjoin( 

5375 con_sq, 

5376 pg_catalog.pg_type.c.oid == con_sq.c.contypid, 

5377 ) 

5378 .where(pg_catalog.pg_type.c.typtype == "d") 

5379 .order_by( 

5380 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname 

5381 ) 

5382 ) 

5383 return self._pg_type_filter_schema(query, schema) 

5384 

5385 @reflection.cache 

5386 def _load_domains(self, connection, schema=None, **kw): 

5387 result = connection.execute(self._domain_query(schema)) 

5388 

5389 domains: List[ReflectedDomain] = [] 

5390 for domain in result.mappings(): 

5391 # strip (30) from character varying(30) 

5392 attype = re.search(r"([^\(]+)", domain["attype"]).group(1) 

5393 constraints: List[ReflectedDomainConstraint] = [] 

5394 if domain["connames"]: 

5395 # When a domain has multiple CHECK constraints, they will 

5396 # be tested in alphabetical order by name. 

5397 sorted_constraints = sorted( 

5398 zip(domain["connames"], domain["condefs"]), 

5399 key=lambda t: t[0], 

5400 ) 

5401 for name, def_ in sorted_constraints: 

5402 # constraint is in the form "CHECK (expression)" 

5403 # or "NOT NULL". Ignore the "NOT NULL" and 

5404 # remove "CHECK (" and the tailing ")". 

5405 if def_.casefold().startswith("check"): 

5406 check = def_[7:-1] 

5407 constraints.append({"name": name, "check": check}) 

5408 domain_rec: ReflectedDomain = { 

5409 "name": domain["name"], 

5410 "schema": domain["schema"], 

5411 "visible": domain["visible"], 

5412 "type": attype, 

5413 "nullable": domain["nullable"], 

5414 "default": domain["default"], 

5415 "constraints": constraints, 

5416 "collation": domain["collname"], 

5417 } 

5418 domains.append(domain_rec) 

5419 

5420 return domains 

5421 

5422 def _set_backslash_escapes(self, connection): 

5423 # this method is provided as an override hook for descendant 

5424 # dialects (e.g. Redshift), so removing it may break them 

5425 std_string = connection.exec_driver_sql( 

5426 "show standard_conforming_strings" 

5427 ).scalar() 

5428 self._backslash_escapes = std_string == "off"