Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1437 statements  

1# dialects/postgresql/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9r""" 

10.. dialect:: postgresql 

11 :name: PostgreSQL 

12 :normal_support: 9.6+ 

13 :best_effort: 9+ 

14 

15.. _postgresql_sequences: 

16 

17Sequences/SERIAL/IDENTITY 

18------------------------- 

19 

20PostgreSQL supports sequences, and SQLAlchemy uses these as the default means 

21of creating new primary key values for integer-based primary key columns. When 

22creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for 

23integer-based primary key columns, which generates a sequence and server side 

24default corresponding to the column. 

25 

26To specify a specific named sequence to be used for primary key generation, 

27use the :func:`~sqlalchemy.schema.Sequence` construct:: 

28 

29 Table( 

30 "sometable", 

31 metadata, 

32 Column( 

33 "id", Integer, Sequence("some_id_seq", start=1), primary_key=True 

34 ), 

35 ) 

36 

37When SQLAlchemy issues a single INSERT statement, to fulfill the contract of 

38having the "last insert identifier" available, a RETURNING clause is added to 

39the INSERT statement which specifies the primary key columns should be 

40returned after the statement completes. The RETURNING functionality only takes 

41place if PostgreSQL 8.2 or later is in use. As a fallback approach, the 

42sequence, whether specified explicitly or implicitly via ``SERIAL``, is 

43executed independently beforehand, the returned value to be used in the 

44subsequent insert. Note that when an 

45:func:`~sqlalchemy.sql.expression.insert()` construct is executed using 

46"executemany" semantics, the "last inserted identifier" functionality does not 

47apply; no RETURNING clause is emitted nor is the sequence pre-executed in this 

48case. 

49 

50 

51PostgreSQL 10 and above IDENTITY columns 

52^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

53 

54PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use 

55of SERIAL. The :class:`_schema.Identity` construct in a 

56:class:`_schema.Column` can be used to control its behavior:: 

57 

58 from sqlalchemy import Table, Column, MetaData, Integer, Computed 

59 

60 metadata = MetaData() 

61 

62 data = Table( 

63 "data", 

64 metadata, 

65 Column( 

66 "id", Integer, Identity(start=42, cycle=True), primary_key=True 

67 ), 

68 Column("data", String), 

69 ) 

70 

71The CREATE TABLE for the above :class:`_schema.Table` object would be: 

72 

73.. sourcecode:: sql 

74 

75 CREATE TABLE data ( 

76 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE), 

77 data VARCHAR, 

78 PRIMARY KEY (id) 

79 ) 

80 

81.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct 

82 in a :class:`_schema.Column` to specify the option of an autoincrementing 

83 column. 

84 

85.. note:: 

86 

87 Previous versions of SQLAlchemy did not have built-in support for rendering 

88 of IDENTITY, and could use the following compilation hook to replace 

89 occurrences of SERIAL with IDENTITY:: 

90 

91 from sqlalchemy.schema import CreateColumn 

92 from sqlalchemy.ext.compiler import compiles 

93 

94 

95 @compiles(CreateColumn, "postgresql") 

96 def use_identity(element, compiler, **kw): 

97 text = compiler.visit_create_column(element, **kw) 

98 text = text.replace("SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY") 

99 return text 

100 

101 Using the above, a table such as:: 

102 

103 t = Table( 

104 "t", m, Column("id", Integer, primary_key=True), Column("data", String) 

105 ) 

106 

107 Will generate on the backing database as: 

108 

109 .. sourcecode:: sql 

110 

111 CREATE TABLE t ( 

112 id INT GENERATED BY DEFAULT AS IDENTITY, 

113 data VARCHAR, 

114 PRIMARY KEY (id) 

115 ) 

116 

117.. _postgresql_ss_cursors: 

118 

119Server Side Cursors 

120------------------- 

121 

122Server-side cursor support is available for the psycopg2, asyncpg 

123dialects and may also be available in others. 

124 

125Server side cursors are enabled on a per-statement basis by using the 

126:paramref:`.Connection.execution_options.stream_results` connection execution 

127option:: 

128 

129 with engine.connect() as conn: 

130 result = conn.execution_options(stream_results=True).execute( 

131 text("select * from table") 

132 ) 

133 

134Note that some kinds of SQL statements may not be supported with 

135server side cursors; generally, only SQL statements that return rows should be 

136used with this option. 

137 

138.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated 

139 and will be removed in a future release. Please use the 

140 :paramref:`_engine.Connection.stream_results` execution option for 

141 unbuffered cursor support. 

142 

143.. seealso:: 

144 

145 :ref:`engine_stream_results` 

146 

147.. _postgresql_isolation_level: 

148 

149Transaction Isolation Level 

150--------------------------- 

151 

152Most SQLAlchemy dialects support setting of transaction isolation level 

153using the :paramref:`_sa.create_engine.isolation_level` parameter 

154at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection` 

155level via the :paramref:`.Connection.execution_options.isolation_level` 

156parameter. 

157 

158For PostgreSQL dialects, this feature works either by making use of the 

159DBAPI-specific features, such as psycopg2's isolation level flags which will 

160embed the isolation level setting inline with the ``"BEGIN"`` statement, or for 

161DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS 

162TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement 

163emitted by the DBAPI. For the special AUTOCOMMIT isolation level, 

164DBAPI-specific techniques are used which is typically an ``.autocommit`` 

165flag on the DBAPI connection object. 

166 

167To set isolation level using :func:`_sa.create_engine`:: 

168 

169 engine = create_engine( 

170 "postgresql+pg8000://scott:tiger@localhost/test", 

171 isolation_level="REPEATABLE READ", 

172 ) 

173 

174To set using per-connection execution options:: 

175 

176 with engine.connect() as conn: 

177 conn = conn.execution_options(isolation_level="REPEATABLE READ") 

178 with conn.begin(): 

179 ... # work with transaction 

180 

181There are also more options for isolation level configurations, such as 

182"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply 

183different isolation level settings. See the discussion at 

184:ref:`dbapi_autocommit` for background. 

185 

186Valid values for ``isolation_level`` on most PostgreSQL dialects include: 

187 

188* ``READ COMMITTED`` 

189* ``READ UNCOMMITTED`` 

190* ``REPEATABLE READ`` 

191* ``SERIALIZABLE`` 

192* ``AUTOCOMMIT`` 

193 

194.. seealso:: 

195 

196 :ref:`dbapi_autocommit` 

197 

198 :ref:`postgresql_readonly_deferrable` 

199 

200 :ref:`psycopg2_isolation_level` 

201 

202 :ref:`pg8000_isolation_level` 

203 

204.. _postgresql_readonly_deferrable: 

205 

206Setting READ ONLY / DEFERRABLE 

207------------------------------ 

208 

209Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE" 

210characteristics of the transaction, which is in addition to the isolation level 

211setting. These two attributes can be established either in conjunction with or 

212independently of the isolation level by passing the ``postgresql_readonly`` and 

213``postgresql_deferrable`` flags with 

214:meth:`_engine.Connection.execution_options`. The example below illustrates 

215passing the ``"SERIALIZABLE"`` isolation level at the same time as setting 

216"READ ONLY" and "DEFERRABLE":: 

217 

218 with engine.connect() as conn: 

219 conn = conn.execution_options( 

220 isolation_level="SERIALIZABLE", 

221 postgresql_readonly=True, 

222 postgresql_deferrable=True, 

223 ) 

224 with conn.begin(): 

225 ... # work with transaction 

226 

227Note that some DBAPIs such as asyncpg only support "readonly" with 

228SERIALIZABLE isolation. 

229 

230.. versionadded:: 1.4 added support for the ``postgresql_readonly`` 

231 and ``postgresql_deferrable`` execution options. 

232 

233.. _postgresql_reset_on_return: 

234 

235Temporary Table / Resource Reset for Connection Pooling 

236------------------------------------------------------- 

237 

238The :class:`.QueuePool` connection pool implementation used 

239by the SQLAlchemy :class:`.Engine` object includes 

240:ref:`reset on return <pool_reset_on_return>` behavior that will invoke 

241the DBAPI ``.rollback()`` method when connections are returned to the pool. 

242While this rollback will clear out the immediate state used by the previous 

243transaction, it does not cover a wider range of session-level state, including 

244temporary tables as well as other server state such as prepared statement 

245handles and statement caches. The PostgreSQL database includes a variety 

246of commands which may be used to reset this state, including 

247``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``. 

248 

249 

250To install 

251one or more of these commands as the means of performing reset-on-return, 

252the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated 

253in the example below. The implementation 

254will end transactions in progress as well as discard temporary tables 

255using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL 

256documentation for background on what each of these statements do. 

257 

258The :paramref:`_sa.create_engine.pool_reset_on_return` parameter 

259is set to ``None`` so that the custom scheme can replace the default behavior 

260completely. The custom hook implementation calls ``.rollback()`` in any case, 

261as it's usually important that the DBAPI's own tracking of commit/rollback 

262will remain consistent with the state of the transaction:: 

263 

264 

265 from sqlalchemy import create_engine 

266 from sqlalchemy import event 

267 

268 postgresql_engine = create_engine( 

269 "postgresql+psycopg2://scott:tiger@hostname/dbname", 

270 # disable default reset-on-return scheme 

271 pool_reset_on_return=None, 

272 ) 

273 

274 

275 @event.listens_for(postgresql_engine, "reset") 

276 def _reset_postgresql(dbapi_connection, connection_record, reset_state): 

277 if not reset_state.terminate_only: 

278 dbapi_connection.execute("CLOSE ALL") 

279 dbapi_connection.execute("RESET ALL") 

280 dbapi_connection.execute("DISCARD TEMP") 

281 

282 # so that the DBAPI itself knows that the connection has been 

283 # reset 

284 dbapi_connection.rollback() 

285 

286.. versionchanged:: 2.0.0b3 Added additional state arguments to 

287 the :meth:`.PoolEvents.reset` event and additionally ensured the event 

288 is invoked for all "reset" occurrences, so that it's appropriate 

289 as a place for custom "reset" handlers. Previous schemes which 

290 use the :meth:`.PoolEvents.checkin` handler remain usable as well. 

291 

292.. seealso:: 

293 

294 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation 

295 

296.. _postgresql_alternate_search_path: 

297 

298Setting Alternate Search Paths on Connect 

299------------------------------------------ 

300 

301The PostgreSQL ``search_path`` variable refers to the list of schema names 

302that will be implicitly referenced when a particular table or other 

303object is referenced in a SQL statement. As detailed in the next section 

304:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around 

305the concept of keeping this variable at its default value of ``public``, 

306however, in order to have it set to any arbitrary name or names when connections 

307are used automatically, the "SET SESSION search_path" command may be invoked 

308for all connections in a pool using the following event handler, as discussed 

309at :ref:`schema_set_default_connections`:: 

310 

311 from sqlalchemy import event 

312 from sqlalchemy import create_engine 

313 

314 engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname") 

315 

316 

317 @event.listens_for(engine, "connect", insert=True) 

318 def set_search_path(dbapi_connection, connection_record): 

319 existing_autocommit = dbapi_connection.autocommit 

320 dbapi_connection.autocommit = True 

321 cursor = dbapi_connection.cursor() 

322 cursor.execute("SET SESSION search_path='%s'" % schema_name) 

323 cursor.close() 

324 dbapi_connection.autocommit = existing_autocommit 

325 

326The reason the recipe is complicated by use of the ``.autocommit`` DBAPI 

327attribute is so that when the ``SET SESSION search_path`` directive is invoked, 

328it is invoked outside of the scope of any transaction and therefore will not 

329be reverted when the DBAPI connection has a rollback. 

330 

331.. seealso:: 

332 

333 :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation 

334 

335.. _postgresql_schema_reflection: 

336 

337Remote-Schema Table Introspection and PostgreSQL search_path 

338------------------------------------------------------------ 

339 

340.. admonition:: Section Best Practices Summarized 

341 

342 keep the ``search_path`` variable set to its default of ``public``, without 

343 any other schema names. Ensure the username used to connect **does not** 

344 match remote schemas, or ensure the ``"$user"`` token is **removed** from 

345 ``search_path``. For other schema names, name these explicitly 

346 within :class:`_schema.Table` definitions. Alternatively, the 

347 ``postgresql_ignore_search_path`` option will cause all reflected 

348 :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema` 

349 attribute set up. 

350 

351The PostgreSQL dialect can reflect tables from any schema, as outlined in 

352:ref:`metadata_reflection_schemas`. 

353 

354In all cases, the first thing SQLAlchemy does when reflecting tables is 

355to **determine the default schema for the current database connection**. 

356It does this using the PostgreSQL ``current_schema()`` 

357function, illustated below using a PostgreSQL client session (i.e. using 

358the ``psql`` tool): 

359 

360.. sourcecode:: sql 

361 

362 test=> select current_schema(); 

363 current_schema 

364 ---------------- 

365 public 

366 (1 row) 

367 

368Above we see that on a plain install of PostgreSQL, the default schema name 

369is the name ``public``. 

370 

371However, if your database username **matches the name of a schema**, PostgreSQL's 

372default is to then **use that name as the default schema**. Below, we log in 

373using the username ``scott``. When we create a schema named ``scott``, **it 

374implicitly changes the default schema**: 

375 

376.. sourcecode:: sql 

377 

378 test=> select current_schema(); 

379 current_schema 

380 ---------------- 

381 public 

382 (1 row) 

383 

384 test=> create schema scott; 

385 CREATE SCHEMA 

386 test=> select current_schema(); 

387 current_schema 

388 ---------------- 

389 scott 

390 (1 row) 

391 

392The behavior of ``current_schema()`` is derived from the 

393`PostgreSQL search path 

394<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_ 

395variable ``search_path``, which in modern PostgreSQL versions defaults to this: 

396 

397.. sourcecode:: sql 

398 

399 test=> show search_path; 

400 search_path 

401 ----------------- 

402 "$user", public 

403 (1 row) 

404 

405Where above, the ``"$user"`` variable will inject the current username as the 

406default schema, if one exists. Otherwise, ``public`` is used. 

407 

408When a :class:`_schema.Table` object is reflected, if it is present in the 

409schema indicated by the ``current_schema()`` function, **the schema name assigned 

410to the ".schema" attribute of the Table is the Python "None" value**. Otherwise, the 

411".schema" attribute will be assigned the string name of that schema. 

412 

413With regards to tables which these :class:`_schema.Table` 

414objects refer to via foreign key constraint, a decision must be made as to how 

415the ``.schema`` is represented in those remote tables, in the case where that 

416remote schema name is also a member of the current ``search_path``. 

417 

418By default, the PostgreSQL dialect mimics the behavior encouraged by 

419PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function 

420returns a sample definition for a particular foreign key constraint, 

421omitting the referenced schema name from that definition when the name is 

422also in the PostgreSQL schema search path. The interaction below 

423illustrates this behavior: 

424 

425.. sourcecode:: sql 

426 

427 test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY); 

428 CREATE TABLE 

429 test=> CREATE TABLE referring( 

430 test(> id INTEGER PRIMARY KEY, 

431 test(> referred_id INTEGER REFERENCES test_schema.referred(id)); 

432 CREATE TABLE 

433 test=> SET search_path TO public, test_schema; 

434 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM 

435 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n 

436 test-> ON n.oid = c.relnamespace 

437 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid 

438 test-> WHERE c.relname='referring' AND r.contype = 'f' 

439 test-> ; 

440 pg_get_constraintdef 

441 --------------------------------------------------- 

442 FOREIGN KEY (referred_id) REFERENCES referred(id) 

443 (1 row) 

444 

445Above, we created a table ``referred`` as a member of the remote schema 

446``test_schema``, however when we added ``test_schema`` to the 

447PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the 

448``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of 

449the function. 

450 

451On the other hand, if we set the search path back to the typical default 

452of ``public``: 

453 

454.. sourcecode:: sql 

455 

456 test=> SET search_path TO public; 

457 SET 

458 

459The same query against ``pg_get_constraintdef()`` now returns the fully 

460schema-qualified name for us: 

461 

462.. sourcecode:: sql 

463 

464 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM 

465 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n 

466 test-> ON n.oid = c.relnamespace 

467 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid 

468 test-> WHERE c.relname='referring' AND r.contype = 'f'; 

469 pg_get_constraintdef 

470 --------------------------------------------------------------- 

471 FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id) 

472 (1 row) 

473 

474SQLAlchemy will by default use the return value of ``pg_get_constraintdef()`` 

475in order to determine the remote schema name. That is, if our ``search_path`` 

476were set to include ``test_schema``, and we invoked a table 

477reflection process as follows:: 

478 

479 >>> from sqlalchemy import Table, MetaData, create_engine, text 

480 >>> engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

481 >>> with engine.connect() as conn: 

482 ... conn.execute(text("SET search_path TO test_schema, public")) 

483 ... metadata_obj = MetaData() 

484 ... referring = Table("referring", metadata_obj, autoload_with=conn) 

485 <sqlalchemy.engine.result.CursorResult object at 0x101612ed0> 

486 

487The above process would deliver to the :attr:`_schema.MetaData.tables` 

488collection 

489``referred`` table named **without** the schema:: 

490 

491 >>> metadata_obj.tables["referred"].schema is None 

492 True 

493 

494To alter the behavior of reflection such that the referred schema is 

495maintained regardless of the ``search_path`` setting, use the 

496``postgresql_ignore_search_path`` option, which can be specified as a 

497dialect-specific argument to both :class:`_schema.Table` as well as 

498:meth:`_schema.MetaData.reflect`:: 

499 

500 >>> with engine.connect() as conn: 

501 ... conn.execute(text("SET search_path TO test_schema, public")) 

502 ... metadata_obj = MetaData() 

503 ... referring = Table( 

504 ... "referring", 

505 ... metadata_obj, 

506 ... autoload_with=conn, 

507 ... postgresql_ignore_search_path=True, 

508 ... ) 

509 <sqlalchemy.engine.result.CursorResult object at 0x1016126d0> 

510 

511We will now have ``test_schema.referred`` stored as schema-qualified:: 

512 

513 >>> metadata_obj.tables["test_schema.referred"].schema 

514 'test_schema' 

515 

516.. sidebar:: Best Practices for PostgreSQL Schema reflection 

517 

518 The description of PostgreSQL schema reflection behavior is complex, and 

519 is the product of many years of dealing with widely varied use cases and 

520 user preferences. But in fact, there's no need to understand any of it if 

521 you just stick to the simplest use pattern: leave the ``search_path`` set 

522 to its default of ``public`` only, never refer to the name ``public`` as 

523 an explicit schema name otherwise, and refer to all other schema names 

524 explicitly when building up a :class:`_schema.Table` object. The options 

525 described here are only for those users who can't, or prefer not to, stay 

526 within these guidelines. 

527 

528.. seealso:: 

529 

530 :ref:`reflection_schema_qualified_interaction` - discussion of the issue 

531 from a backend-agnostic perspective 

532 

533 `The Schema Search Path 

534 <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_ 

535 - on the PostgreSQL website. 

536 

537INSERT/UPDATE...RETURNING 

538------------------------- 

539 

540The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and 

541``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default 

542for single-row INSERT statements in order to fetch newly generated 

543primary key identifiers. To specify an explicit ``RETURNING`` clause, 

544use the :meth:`._UpdateBase.returning` method on a per-statement basis:: 

545 

546 # INSERT..RETURNING 

547 result = ( 

548 table.insert().returning(table.c.col1, table.c.col2).values(name="foo") 

549 ) 

550 print(result.fetchall()) 

551 

552 # UPDATE..RETURNING 

553 result = ( 

554 table.update() 

555 .returning(table.c.col1, table.c.col2) 

556 .where(table.c.name == "foo") 

557 .values(name="bar") 

558 ) 

559 print(result.fetchall()) 

560 

561 # DELETE..RETURNING 

562 result = ( 

563 table.delete() 

564 .returning(table.c.col1, table.c.col2) 

565 .where(table.c.name == "foo") 

566 ) 

567 print(result.fetchall()) 

568 

569.. _postgresql_insert_on_conflict: 

570 

571INSERT...ON CONFLICT (Upsert) 

572------------------------------ 

573 

574Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of 

575rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A 

576candidate row will only be inserted if that row does not violate any unique 

577constraints. In the case of a unique constraint violation, a secondary action 

578can occur which can be either "DO UPDATE", indicating that the data in the 

579target row should be updated, or "DO NOTHING", which indicates to silently skip 

580this row. 

581 

582Conflicts are determined using existing unique constraints and indexes. These 

583constraints may be identified either using their name as stated in DDL, 

584or they may be inferred by stating the columns and conditions that comprise 

585the indexes. 

586 

587SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific 

588:func:`_postgresql.insert()` function, which provides 

589the generative methods :meth:`_postgresql.Insert.on_conflict_do_update` 

590and :meth:`~.postgresql.Insert.on_conflict_do_nothing`: 

591 

592.. sourcecode:: pycon+sql 

593 

594 >>> from sqlalchemy.dialects.postgresql import insert 

595 >>> insert_stmt = insert(my_table).values( 

596 ... id="some_existing_id", data="inserted value" 

597 ... ) 

598 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"]) 

599 >>> print(do_nothing_stmt) 

600 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

601 ON CONFLICT (id) DO NOTHING 

602 {stop} 

603 

604 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

605 ... constraint="pk_my_table", set_=dict(data="updated value") 

606 ... ) 

607 >>> print(do_update_stmt) 

608 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

609 ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s 

610 

611.. seealso:: 

612 

613 `INSERT .. ON CONFLICT 

614 <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_ 

615 - in the PostgreSQL documentation. 

616 

617Specifying the Target 

618^^^^^^^^^^^^^^^^^^^^^ 

619 

620Both methods supply the "target" of the conflict using either the 

621named constraint or by column inference: 

622 

623* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument 

624 specifies a sequence containing string column names, :class:`_schema.Column` 

625 objects, and/or SQL expression elements, which would identify a unique 

626 index: 

627 

628 .. sourcecode:: pycon+sql 

629 

630 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

631 ... index_elements=["id"], set_=dict(data="updated value") 

632 ... ) 

633 >>> print(do_update_stmt) 

634 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

635 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

636 {stop} 

637 

638 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

639 ... index_elements=[my_table.c.id], set_=dict(data="updated value") 

640 ... ) 

641 >>> print(do_update_stmt) 

642 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

643 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

644 

645* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to 

646 infer an index, a partial index can be inferred by also specifying the 

647 use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter: 

648 

649 .. sourcecode:: pycon+sql 

650 

651 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data") 

652 >>> stmt = stmt.on_conflict_do_update( 

653 ... index_elements=[my_table.c.user_email], 

654 ... index_where=my_table.c.user_email.like("%@gmail.com"), 

655 ... set_=dict(data=stmt.excluded.data), 

656 ... ) 

657 >>> print(stmt) 

658 {printsql}INSERT INTO my_table (data, user_email) 

659 VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email) 

660 WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data 

661 

662* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is 

663 used to specify an index directly rather than inferring it. This can be 

664 the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX: 

665 

666 .. sourcecode:: pycon+sql 

667 

668 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

669 ... constraint="my_table_idx_1", set_=dict(data="updated value") 

670 ... ) 

671 >>> print(do_update_stmt) 

672 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

673 ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s 

674 {stop} 

675 

676 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

677 ... constraint="my_table_pk", set_=dict(data="updated value") 

678 ... ) 

679 >>> print(do_update_stmt) 

680 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

681 ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s 

682 {stop} 

683 

684* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may 

685 also refer to a SQLAlchemy construct representing a constraint, 

686 e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`, 

687 :class:`.Index`, or :class:`.ExcludeConstraint`. In this use, 

688 if the constraint has a name, it is used directly. Otherwise, if the 

689 constraint is unnamed, then inference will be used, where the expressions 

690 and optional WHERE clause of the constraint will be spelled out in the 

691 construct. This use is especially convenient 

692 to refer to the named or unnamed primary key of a :class:`_schema.Table` 

693 using the 

694 :attr:`_schema.Table.primary_key` attribute: 

695 

696 .. sourcecode:: pycon+sql 

697 

698 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

699 ... constraint=my_table.primary_key, set_=dict(data="updated value") 

700 ... ) 

701 >>> print(do_update_stmt) 

702 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

703 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

704 

705The SET Clause 

706^^^^^^^^^^^^^^^ 

707 

708``ON CONFLICT...DO UPDATE`` is used to perform an update of the already 

709existing row, using any combination of new values as well as values 

710from the proposed insertion. These values are specified using the 

711:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This 

712parameter accepts a dictionary which consists of direct values 

713for UPDATE: 

714 

715.. sourcecode:: pycon+sql 

716 

717 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

718 >>> do_update_stmt = stmt.on_conflict_do_update( 

719 ... index_elements=["id"], set_=dict(data="updated value") 

720 ... ) 

721 >>> print(do_update_stmt) 

722 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

723 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

724 

725.. warning:: 

726 

727 The :meth:`_expression.Insert.on_conflict_do_update` 

728 method does **not** take into 

729 account Python-side default UPDATE values or generation functions, e.g. 

730 those specified using :paramref:`_schema.Column.onupdate`. 

731 These values will not be exercised for an ON CONFLICT style of UPDATE, 

732 unless they are manually specified in the 

733 :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary. 

734 

735Updating using the Excluded INSERT Values 

736^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

737 

738In order to refer to the proposed insertion row, the special alias 

739:attr:`~.postgresql.Insert.excluded` is available as an attribute on 

740the :class:`_postgresql.Insert` object; this object is a 

741:class:`_expression.ColumnCollection` 

742which alias contains all columns of the target 

743table: 

744 

745.. sourcecode:: pycon+sql 

746 

747 >>> stmt = insert(my_table).values( 

748 ... id="some_id", data="inserted value", author="jlh" 

749 ... ) 

750 >>> do_update_stmt = stmt.on_conflict_do_update( 

751 ... index_elements=["id"], 

752 ... set_=dict(data="updated value", author=stmt.excluded.author), 

753 ... ) 

754 >>> print(do_update_stmt) 

755 {printsql}INSERT INTO my_table (id, data, author) 

756 VALUES (%(id)s, %(data)s, %(author)s) 

757 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author 

758 

759Additional WHERE Criteria 

760^^^^^^^^^^^^^^^^^^^^^^^^^ 

761 

762The :meth:`_expression.Insert.on_conflict_do_update` method also accepts 

763a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where` 

764parameter, which will limit those rows which receive an UPDATE: 

765 

766.. sourcecode:: pycon+sql 

767 

768 >>> stmt = insert(my_table).values( 

769 ... id="some_id", data="inserted value", author="jlh" 

770 ... ) 

771 >>> on_update_stmt = stmt.on_conflict_do_update( 

772 ... index_elements=["id"], 

773 ... set_=dict(data="updated value", author=stmt.excluded.author), 

774 ... where=(my_table.c.status == 2), 

775 ... ) 

776 >>> print(on_update_stmt) 

777 {printsql}INSERT INTO my_table (id, data, author) 

778 VALUES (%(id)s, %(data)s, %(author)s) 

779 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author 

780 WHERE my_table.status = %(status_1)s 

781 

782Skipping Rows with DO NOTHING 

783^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

784 

785``ON CONFLICT`` may be used to skip inserting a row entirely 

786if any conflict with a unique or exclusion constraint occurs; below 

787this is illustrated using the 

788:meth:`~.postgresql.Insert.on_conflict_do_nothing` method: 

789 

790.. sourcecode:: pycon+sql 

791 

792 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

793 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) 

794 >>> print(stmt) 

795 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

796 ON CONFLICT (id) DO NOTHING 

797 

798If ``DO NOTHING`` is used without specifying any columns or constraint, 

799it has the effect of skipping the INSERT for any unique or exclusion 

800constraint violation which occurs: 

801 

802.. sourcecode:: pycon+sql 

803 

804 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

805 >>> stmt = stmt.on_conflict_do_nothing() 

806 >>> print(stmt) 

807 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

808 ON CONFLICT DO NOTHING 

809 

810.. _postgresql_match: 

811 

812Full Text Search 

813---------------- 

814 

815PostgreSQL's full text search system is available through the use of the 

816:data:`.func` namespace, combined with the use of custom operators 

817via the :meth:`.Operators.bool_op` method. For simple cases with some 

818degree of cross-backend compatibility, the :meth:`.Operators.match` operator 

819may also be used. 

820 

821.. _postgresql_simple_match: 

822 

823Simple plain text matching with ``match()`` 

824^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

825 

826The :meth:`.Operators.match` operator provides for cross-compatible simple 

827text matching. For the PostgreSQL backend, it's hardcoded to generate 

828an expression using the ``@@`` operator in conjunction with the 

829``plainto_tsquery()`` PostgreSQL function. 

830 

831On the PostgreSQL dialect, an expression like the following:: 

832 

833 select(sometable.c.text.match("search string")) 

834 

835would emit to the database: 

836 

837.. sourcecode:: sql 

838 

839 SELECT text @@ plainto_tsquery('search string') FROM table 

840 

841Above, passing a plain string to :meth:`.Operators.match` will automatically 

842make use of ``plainto_tsquery()`` to specify the type of tsquery. This 

843establishes basic database cross-compatibility for :meth:`.Operators.match` 

844with other backends. 

845 

846.. versionchanged:: 2.0 The default tsquery generation function used by the 

847 PostgreSQL dialect with :meth:`.Operators.match` is ``plainto_tsquery()``. 

848 

849 To render exactly what was rendered in 1.4, use the following form:: 

850 

851 from sqlalchemy import func 

852 

853 select(sometable.c.text.bool_op("@@")(func.to_tsquery("search string"))) 

854 

855 Which would emit: 

856 

857 .. sourcecode:: sql 

858 

859 SELECT text @@ to_tsquery('search string') FROM table 

860 

861Using PostgreSQL full text functions and operators directly 

862^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

863 

864Text search operations beyond the simple use of :meth:`.Operators.match` 

865may make use of the :data:`.func` namespace to generate PostgreSQL full-text 

866functions, in combination with :meth:`.Operators.bool_op` to generate 

867any boolean operator. 

868 

869For example, the query:: 

870 

871 select(func.to_tsquery("cat").bool_op("@>")(func.to_tsquery("cat & rat"))) 

872 

873would generate: 

874 

875.. sourcecode:: sql 

876 

877 SELECT to_tsquery('cat') @> to_tsquery('cat & rat') 

878 

879 

880The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST:: 

881 

882 from sqlalchemy.dialects.postgresql import TSVECTOR 

883 from sqlalchemy import select, cast 

884 

885 select(cast("some text", TSVECTOR)) 

886 

887produces a statement equivalent to: 

888 

889.. sourcecode:: sql 

890 

891 SELECT CAST('some text' AS TSVECTOR) AS anon_1 

892 

893The ``func`` namespace is augmented by the PostgreSQL dialect to set up 

894correct argument and return types for most full text search functions. 

895These functions are used automatically by the :attr:`_sql.func` namespace 

896assuming the ``sqlalchemy.dialects.postgresql`` package has been imported, 

897or :func:`_sa.create_engine` has been invoked using a ``postgresql`` 

898dialect. These functions are documented at: 

899 

900* :class:`_postgresql.to_tsvector` 

901* :class:`_postgresql.to_tsquery` 

902* :class:`_postgresql.plainto_tsquery` 

903* :class:`_postgresql.phraseto_tsquery` 

904* :class:`_postgresql.websearch_to_tsquery` 

905* :class:`_postgresql.ts_headline` 

906 

907Specifying the "regconfig" with ``match()`` or custom operators 

908^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

909 

910PostgreSQL's ``plainto_tsquery()`` function accepts an optional 

911"regconfig" argument that is used to instruct PostgreSQL to use a 

912particular pre-computed GIN or GiST index in order to perform the search. 

913When using :meth:`.Operators.match`, this additional parameter may be 

914specified using the ``postgresql_regconfig`` parameter, such as:: 

915 

916 select(mytable.c.id).where( 

917 mytable.c.title.match("somestring", postgresql_regconfig="english") 

918 ) 

919 

920Which would emit: 

921 

922.. sourcecode:: sql 

923 

924 SELECT mytable.id FROM mytable 

925 WHERE mytable.title @@ plainto_tsquery('english', 'somestring') 

926 

927When using other PostgreSQL search functions with :data:`.func`, the 

928"regconfig" parameter may be passed directly as the initial argument:: 

929 

930 select(mytable.c.id).where( 

931 func.to_tsvector("english", mytable.c.title).bool_op("@@")( 

932 func.to_tsquery("english", "somestring") 

933 ) 

934 ) 

935 

936produces a statement equivalent to: 

937 

938.. sourcecode:: sql 

939 

940 SELECT mytable.id FROM mytable 

941 WHERE to_tsvector('english', mytable.title) @@ 

942 to_tsquery('english', 'somestring') 

943 

944It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from 

945PostgreSQL to ensure that you are generating queries with SQLAlchemy that 

946take full advantage of any indexes you may have created for full text search. 

947 

948.. seealso:: 

949 

950 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation 

951 

952 

953FROM ONLY ... 

954------------- 

955 

956The dialect supports PostgreSQL's ONLY keyword for targeting only a particular 

957table in an inheritance hierarchy. This can be used to produce the 

958``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...`` 

959syntaxes. It uses SQLAlchemy's hints mechanism:: 

960 

961 # SELECT ... FROM ONLY ... 

962 result = table.select().with_hint(table, "ONLY", "postgresql") 

963 print(result.fetchall()) 

964 

965 # UPDATE ONLY ... 

966 table.update(values=dict(foo="bar")).with_hint( 

967 "ONLY", dialect_name="postgresql" 

968 ) 

969 

970 # DELETE FROM ONLY ... 

971 table.delete().with_hint("ONLY", dialect_name="postgresql") 

972 

973.. _postgresql_indexes: 

974 

975PostgreSQL-Specific Index Options 

976--------------------------------- 

977 

978Several extensions to the :class:`.Index` construct are available, specific 

979to the PostgreSQL dialect. 

980 

981.. _postgresql_covering_indexes: 

982 

983Covering Indexes 

984^^^^^^^^^^^^^^^^ 

985 

986A covering index includes additional columns that are not part of the index key 

987but are stored in the index, allowing PostgreSQL to satisfy queries using only 

988the index without accessing the table (an "index-only scan"). This is 

989indicated on the index using the ``INCLUDE`` clause. The 

990``postgresql_include`` option for :class:`.Index` (as well as 

991:class:`.UniqueConstraint`) renders ``INCLUDE(colname)`` for the given string 

992names:: 

993 

994 Index("my_index", table.c.x, postgresql_include=["y"]) 

995 

996would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` 

997 

998Note that this feature requires PostgreSQL 11 or later. 

999 

1000.. seealso:: 

1001 

1002 :ref:`postgresql_constraint_options_include` - the same feature implemented 

1003 for :class:`.UniqueConstraint` 

1004 

1005.. versionadded:: 1.4 - support for covering indexes with :class:`.Index`. 

1006 support for :class:`.UniqueConstraint` was in 2.0.41 

1007 

1008.. _postgresql_partial_indexes: 

1009 

1010Partial Indexes 

1011^^^^^^^^^^^^^^^ 

1012 

1013Partial indexes add criterion to the index definition so that the index is 

1014applied to a subset of rows. These can be specified on :class:`.Index` 

1015using the ``postgresql_where`` keyword argument:: 

1016 

1017 Index("my_index", my_table.c.id, postgresql_where=my_table.c.value > 10) 

1018 

1019.. _postgresql_operator_classes: 

1020 

1021Operator Classes 

1022^^^^^^^^^^^^^^^^ 

1023 

1024PostgreSQL allows the specification of an *operator class* for each column of 

1025an index (see 

1026https://www.postgresql.org/docs/current/interactive/indexes-opclass.html). 

1027The :class:`.Index` construct allows these to be specified via the 

1028``postgresql_ops`` keyword argument:: 

1029 

1030 Index( 

1031 "my_index", 

1032 my_table.c.id, 

1033 my_table.c.data, 

1034 postgresql_ops={"data": "text_pattern_ops", "id": "int4_ops"}, 

1035 ) 

1036 

1037Note that the keys in the ``postgresql_ops`` dictionaries are the 

1038"key" name of the :class:`_schema.Column`, i.e. the name used to access it from 

1039the ``.c`` collection of :class:`_schema.Table`, which can be configured to be 

1040different than the actual name of the column as expressed in the database. 

1041 

1042If ``postgresql_ops`` is to be used against a complex SQL expression such 

1043as a function call, then to apply to the column it must be given a label 

1044that is identified in the dictionary by name, e.g.:: 

1045 

1046 Index( 

1047 "my_index", 

1048 my_table.c.id, 

1049 func.lower(my_table.c.data).label("data_lower"), 

1050 postgresql_ops={"data_lower": "text_pattern_ops", "id": "int4_ops"}, 

1051 ) 

1052 

1053Operator classes are also supported by the 

1054:class:`_postgresql.ExcludeConstraint` construct using the 

1055:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for 

1056details. 

1057 

1058.. versionadded:: 1.3.21 added support for operator classes with 

1059 :class:`_postgresql.ExcludeConstraint`. 

1060 

1061 

1062Index Types 

1063^^^^^^^^^^^ 

1064 

1065PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well 

1066as the ability for users to create their own (see 

1067https://www.postgresql.org/docs/current/static/indexes-types.html). These can be 

1068specified on :class:`.Index` using the ``postgresql_using`` keyword argument:: 

1069 

1070 Index("my_index", my_table.c.data, postgresql_using="gin") 

1071 

1072The value passed to the keyword argument will be simply passed through to the 

1073underlying CREATE INDEX command, so it *must* be a valid index type for your 

1074version of PostgreSQL. 

1075 

1076.. _postgresql_index_storage: 

1077 

1078Index Storage Parameters 

1079^^^^^^^^^^^^^^^^^^^^^^^^ 

1080 

1081PostgreSQL allows storage parameters to be set on indexes. The storage 

1082parameters available depend on the index method used by the index. Storage 

1083parameters can be specified on :class:`.Index` using the ``postgresql_with`` 

1084keyword argument:: 

1085 

1086 Index("my_index", my_table.c.data, postgresql_with={"fillfactor": 50}) 

1087 

1088PostgreSQL allows to define the tablespace in which to create the index. 

1089The tablespace can be specified on :class:`.Index` using the 

1090``postgresql_tablespace`` keyword argument:: 

1091 

1092 Index("my_index", my_table.c.data, postgresql_tablespace="my_tablespace") 

1093 

1094Note that the same option is available on :class:`_schema.Table` as well. 

1095 

1096.. _postgresql_index_concurrently: 

1097 

1098Indexes with CONCURRENTLY 

1099^^^^^^^^^^^^^^^^^^^^^^^^^ 

1100 

1101The PostgreSQL index option CONCURRENTLY is supported by passing the 

1102flag ``postgresql_concurrently`` to the :class:`.Index` construct:: 

1103 

1104 tbl = Table("testtbl", m, Column("data", Integer)) 

1105 

1106 idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True) 

1107 

1108The above index construct will render DDL for CREATE INDEX, assuming 

1109PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as: 

1110 

1111.. sourcecode:: sql 

1112 

1113 CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data) 

1114 

1115For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for 

1116a connection-less dialect, it will emit: 

1117 

1118.. sourcecode:: sql 

1119 

1120 DROP INDEX CONCURRENTLY test_idx1 

1121 

1122When using CONCURRENTLY, the PostgreSQL database requires that the statement 

1123be invoked outside of a transaction block. The Python DBAPI enforces that 

1124even for a single statement, a transaction is present, so to use this 

1125construct, the DBAPI's "autocommit" mode must be used:: 

1126 

1127 metadata = MetaData() 

1128 table = Table("foo", metadata, Column("id", String)) 

1129 index = Index("foo_idx", table.c.id, postgresql_concurrently=True) 

1130 

1131 with engine.connect() as conn: 

1132 with conn.execution_options(isolation_level="AUTOCOMMIT"): 

1133 table.create(conn) 

1134 

1135.. seealso:: 

1136 

1137 :ref:`postgresql_isolation_level` 

1138 

1139.. _postgresql_index_reflection: 

1140 

1141PostgreSQL Index Reflection 

1142--------------------------- 

1143 

1144The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the 

1145UNIQUE CONSTRAINT construct is used. When inspecting a table using 

1146:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes` 

1147and the :meth:`_reflection.Inspector.get_unique_constraints` 

1148will report on these 

1149two constructs distinctly; in the case of the index, the key 

1150``duplicates_constraint`` will be present in the index entry if it is 

1151detected as mirroring a constraint. When performing reflection using 

1152``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned 

1153in :attr:`_schema.Table.indexes` when it is detected as mirroring a 

1154:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection 

1155. 

1156 

1157Special Reflection Options 

1158-------------------------- 

1159 

1160The :class:`_reflection.Inspector` 

1161used for the PostgreSQL backend is an instance 

1162of :class:`.PGInspector`, which offers additional methods:: 

1163 

1164 from sqlalchemy import create_engine, inspect 

1165 

1166 engine = create_engine("postgresql+psycopg2://localhost/test") 

1167 insp = inspect(engine) # will be a PGInspector 

1168 

1169 print(insp.get_enums()) 

1170 

1171.. autoclass:: PGInspector 

1172 :members: 

1173 

1174.. _postgresql_table_options: 

1175 

1176PostgreSQL Table Options 

1177------------------------ 

1178 

1179Several options for CREATE TABLE are supported directly by the PostgreSQL 

1180dialect in conjunction with the :class:`_schema.Table` construct, listed in 

1181the following sections. 

1182 

1183.. seealso:: 

1184 

1185 `PostgreSQL CREATE TABLE options 

1186 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ - 

1187 in the PostgreSQL documentation. 

1188 

1189``INHERITS`` 

1190^^^^^^^^^^^^ 

1191 

1192Specifies one or more parent tables from which this table inherits columns and 

1193constraints, enabling table inheritance hierarchies in PostgreSQL. 

1194 

1195:: 

1196 

1197 Table("some_table", metadata, ..., postgresql_inherits="some_supertable") 

1198 

1199 Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...)) 

1200 

1201``ON COMMIT`` 

1202^^^^^^^^^^^^^ 

1203 

1204Controls the behavior of temporary tables at transaction commit, with options 

1205to preserve rows, delete rows, or drop the table. 

1206 

1207:: 

1208 

1209 Table("some_table", metadata, ..., postgresql_on_commit="PRESERVE ROWS") 

1210 

1211``PARTITION BY`` 

1212^^^^^^^^^^^^^^^^ 

1213 

1214Declares the table as a partitioned table using the specified partitioning 

1215strategy (RANGE, LIST, or HASH) on the given column(s). 

1216 

1217:: 

1218 

1219 Table( 

1220 "some_table", 

1221 metadata, 

1222 ..., 

1223 postgresql_partition_by="LIST (part_column)", 

1224 ) 

1225 

1226``TABLESPACE`` 

1227^^^^^^^^^^^^^^ 

1228 

1229Specifies the tablespace where the table will be stored, allowing control over 

1230the physical location of table data on disk. 

1231 

1232:: 

1233 

1234 Table("some_table", metadata, ..., postgresql_tablespace="some_tablespace") 

1235 

1236The above option is also available on the :class:`.Index` construct. 

1237 

1238``USING`` 

1239^^^^^^^^^ 

1240 

1241Specifies the table access method to use for storing table data, such as 

1242``heap`` (the default) or other custom access methods. 

1243 

1244:: 

1245 

1246 Table("some_table", metadata, ..., postgresql_using="heap") 

1247 

1248.. versionadded:: 2.0.26 

1249 

1250``WITH OIDS`` 

1251^^^^^^^^^^^^^ 

1252 

1253Enables the legacy OID (object identifier) system column for the table, which 

1254assigns a unique identifier to each row. 

1255 

1256:: 

1257 

1258 Table("some_table", metadata, ..., postgresql_with_oids=True) 

1259 

1260``WITHOUT OIDS`` 

1261^^^^^^^^^^^^^^^^ 

1262 

1263Explicitly disables the OID system column for the table (the default behavior 

1264in modern PostgreSQL versions). 

1265 

1266:: 

1267 

1268 Table("some_table", metadata, ..., postgresql_with_oids=False) 

1269 

1270.. _postgresql_constraint_options: 

1271 

1272PostgreSQL Constraint Options 

1273----------------------------- 

1274 

1275The following sections indicate options which are supported by the PostgreSQL 

1276dialect in conjunction with selected constraint constructs. 

1277 

1278 

1279``NOT VALID`` 

1280^^^^^^^^^^^^^ 

1281 

1282Allows a constraint to be added without validating existing rows, improving 

1283performance when adding constraints to large tables. This option applies 

1284towards CHECK and FOREIGN KEY constraints when the constraint is being added 

1285to an existing table via ALTER TABLE, and has the effect that existing rows 

1286are not scanned during the ALTER operation against the constraint being added. 

1287 

1288When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_ 

1289that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument 

1290may be specified as an additional keyword argument within the operation 

1291that creates the constraint, as in the following Alembic example:: 

1292 

1293 def update(): 

1294 op.create_foreign_key( 

1295 "fk_user_address", 

1296 "address", 

1297 "user", 

1298 ["user_id"], 

1299 ["id"], 

1300 postgresql_not_valid=True, 

1301 ) 

1302 

1303The keyword is ultimately accepted directly by the 

1304:class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint` 

1305and :class:`_schema.ForeignKey` constructs; when using a tool like 

1306Alembic, dialect-specific keyword arguments are passed through to 

1307these constructs from the migration operation directives:: 

1308 

1309 CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True) 

1310 

1311 ForeignKeyConstraint( 

1312 ["some_id"], ["some_table.some_id"], postgresql_not_valid=True 

1313 ) 

1314 

1315.. versionadded:: 1.4.32 

1316 

1317.. seealso:: 

1318 

1319 `PostgreSQL ALTER TABLE options 

1320 <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ - 

1321 in the PostgreSQL documentation. 

1322 

1323.. _postgresql_constraint_options_include: 

1324 

1325``INCLUDE`` 

1326^^^^^^^^^^^ 

1327 

1328This keyword is applicable to both a ``UNIQUE`` constraint as well as an 

1329``INDEX``. The ``postgresql_include`` option available for 

1330:class:`.UniqueConstraint` as well as :class:`.Index` creates a covering index 

1331by including additional columns in the underlying index without making them 

1332part of the key constraint. This option adds one or more columns as a "payload" 

1333to the index created automatically by PostgreSQL for the constraint. For 

1334example, the following table definition:: 

1335 

1336 Table( 

1337 "mytable", 

1338 metadata, 

1339 Column("id", Integer, nullable=False), 

1340 Column("value", Integer, nullable=False), 

1341 UniqueConstraint("id", postgresql_include=["value"]), 

1342 ) 

1343 

1344would produce the DDL statement 

1345 

1346.. sourcecode:: sql 

1347 

1348 CREATE TABLE mytable ( 

1349 id INTEGER NOT NULL, 

1350 value INTEGER NOT NULL, 

1351 UNIQUE (id) INCLUDE (value) 

1352 ) 

1353 

1354Note that this feature requires PostgreSQL 11 or later. 

1355 

1356.. versionadded:: 2.0.41 - added support for ``postgresql_include`` to 

1357 :class:`.UniqueConstraint`, to complement the existing feature in 

1358 :class:`.Index`. 

1359 

1360.. seealso:: 

1361 

1362 :ref:`postgresql_covering_indexes` - background on ``postgresql_include`` 

1363 for the :class:`.Index` construct. 

1364 

1365 

1366Column list with foreign key ``ON DELETE SET`` actions 

1367^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

1368 

1369Allows selective column updates when a foreign key action is triggered, limiting 

1370which columns are set to NULL or DEFAULT upon deletion of a referenced row. 

1371This applies to :class:`.ForeignKey` and :class:`.ForeignKeyConstraint`, the 

1372:paramref:`.ForeignKey.ondelete` parameter will accept on the PostgreSQL 

1373backend only a string list of column names inside parenthesis, following the 

1374``SET NULL`` or ``SET DEFAULT`` phrases, which will limit the set of columns 

1375that are subject to the action:: 

1376 

1377 fktable = Table( 

1378 "fktable", 

1379 metadata, 

1380 Column("tid", Integer), 

1381 Column("id", Integer), 

1382 Column("fk_id_del_set_null", Integer), 

1383 ForeignKeyConstraint( 

1384 columns=["tid", "fk_id_del_set_null"], 

1385 refcolumns=[pktable.c.tid, pktable.c.id], 

1386 ondelete="SET NULL (fk_id_del_set_null)", 

1387 ), 

1388 ) 

1389 

1390.. versionadded:: 2.0.40 

1391 

1392 

1393.. _postgresql_table_valued_overview: 

1394 

1395Table values, Table and Column valued functions, Row and Tuple objects 

1396----------------------------------------------------------------------- 

1397 

1398PostgreSQL makes great use of modern SQL forms such as table-valued functions, 

1399tables and rows as values. These constructs are commonly used as part 

1400of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other 

1401datatypes. SQLAlchemy's SQL expression language has native support for 

1402most table-valued and row-valued forms. 

1403 

1404.. _postgresql_table_valued: 

1405 

1406Table-Valued Functions 

1407^^^^^^^^^^^^^^^^^^^^^^^ 

1408 

1409Many PostgreSQL built-in functions are intended to be used in the FROM clause 

1410of a SELECT statement, and are capable of returning table rows or sets of table 

1411rows. A large portion of PostgreSQL's JSON functions for example such as 

1412``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``, 

1413``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such 

1414forms. These classes of SQL function calling forms in SQLAlchemy are available 

1415using the :meth:`_functions.FunctionElement.table_valued` method in conjunction 

1416with :class:`_functions.Function` objects generated from the :data:`_sql.func` 

1417namespace. 

1418 

1419Examples from PostgreSQL's reference documentation follow below: 

1420 

1421* ``json_each()``: 

1422 

1423 .. sourcecode:: pycon+sql 

1424 

1425 >>> from sqlalchemy import select, func 

1426 >>> stmt = select( 

1427 ... func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value") 

1428 ... ) 

1429 >>> print(stmt) 

1430 {printsql}SELECT anon_1.key, anon_1.value 

1431 FROM json_each(:json_each_1) AS anon_1 

1432 

1433* ``json_populate_record()``: 

1434 

1435 .. sourcecode:: pycon+sql 

1436 

1437 >>> from sqlalchemy import select, func, literal_column 

1438 >>> stmt = select( 

1439 ... func.json_populate_record( 

1440 ... literal_column("null::myrowtype"), '{"a":1,"b":2}' 

1441 ... ).table_valued("a", "b", name="x") 

1442 ... ) 

1443 >>> print(stmt) 

1444 {printsql}SELECT x.a, x.b 

1445 FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x 

1446 

1447* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived 

1448 columns in the alias, where we may make use of :func:`_sql.column` elements with 

1449 types to produce them. The :meth:`_functions.FunctionElement.table_valued` 

1450 method produces a :class:`_sql.TableValuedAlias` construct, and the method 

1451 :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived 

1452 columns specification: 

1453 

1454 .. sourcecode:: pycon+sql 

1455 

1456 >>> from sqlalchemy import select, func, column, Integer, Text 

1457 >>> stmt = select( 

1458 ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}') 

1459 ... .table_valued( 

1460 ... column("a", Integer), 

1461 ... column("b", Text), 

1462 ... column("d", Text), 

1463 ... ) 

1464 ... .render_derived(name="x", with_types=True) 

1465 ... ) 

1466 >>> print(stmt) 

1467 {printsql}SELECT x.a, x.b, x.d 

1468 FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT) 

1469 

1470* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an 

1471 ordinal counter to the output of a function and is accepted by a limited set 

1472 of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The 

1473 :meth:`_functions.FunctionElement.table_valued` method accepts a keyword 

1474 parameter ``with_ordinality`` for this purpose, which accepts the string name 

1475 that will be applied to the "ordinality" column: 

1476 

1477 .. sourcecode:: pycon+sql 

1478 

1479 >>> from sqlalchemy import select, func 

1480 >>> stmt = select( 

1481 ... func.generate_series(4, 1, -1) 

1482 ... .table_valued("value", with_ordinality="ordinality") 

1483 ... .render_derived() 

1484 ... ) 

1485 >>> print(stmt) 

1486 {printsql}SELECT anon_1.value, anon_1.ordinality 

1487 FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3) 

1488 WITH ORDINALITY AS anon_1(value, ordinality) 

1489 

1490.. versionadded:: 1.4.0b2 

1491 

1492.. seealso:: 

1493 

1494 :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial` 

1495 

1496.. _postgresql_column_valued: 

1497 

1498Column Valued Functions 

1499^^^^^^^^^^^^^^^^^^^^^^^ 

1500 

1501Similar to the table valued function, a column valued function is present 

1502in the FROM clause, but delivers itself to the columns clause as a single 

1503scalar value. PostgreSQL functions such as ``json_array_elements()``, 

1504``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the 

1505:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`: 

1506 

1507* ``json_array_elements()``: 

1508 

1509 .. sourcecode:: pycon+sql 

1510 

1511 >>> from sqlalchemy import select, func 

1512 >>> stmt = select( 

1513 ... func.json_array_elements('["one", "two"]').column_valued("x") 

1514 ... ) 

1515 >>> print(stmt) 

1516 {printsql}SELECT x 

1517 FROM json_array_elements(:json_array_elements_1) AS x 

1518 

1519* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the 

1520 :func:`_postgresql.array` construct may be used: 

1521 

1522 .. sourcecode:: pycon+sql 

1523 

1524 >>> from sqlalchemy.dialects.postgresql import array 

1525 >>> from sqlalchemy import select, func 

1526 >>> stmt = select(func.unnest(array([1, 2])).column_valued()) 

1527 >>> print(stmt) 

1528 {printsql}SELECT anon_1 

1529 FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1 

1530 

1531 The function can of course be used against an existing table-bound column 

1532 that's of type :class:`_types.ARRAY`: 

1533 

1534 .. sourcecode:: pycon+sql 

1535 

1536 >>> from sqlalchemy import table, column, ARRAY, Integer 

1537 >>> from sqlalchemy import select, func 

1538 >>> t = table("t", column("value", ARRAY(Integer))) 

1539 >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value")) 

1540 >>> print(stmt) 

1541 {printsql}SELECT unnested_value 

1542 FROM unnest(t.value) AS unnested_value 

1543 

1544.. seealso:: 

1545 

1546 :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial` 

1547 

1548 

1549Row Types 

1550^^^^^^^^^ 

1551 

1552Built-in support for rendering a ``ROW`` may be approximated using 

1553``func.ROW`` with the :attr:`_sa.func` namespace, or by using the 

1554:func:`_sql.tuple_` construct: 

1555 

1556.. sourcecode:: pycon+sql 

1557 

1558 >>> from sqlalchemy import table, column, func, tuple_ 

1559 >>> t = table("t", column("id"), column("fk")) 

1560 >>> stmt = ( 

1561 ... t.select() 

1562 ... .where(tuple_(t.c.id, t.c.fk) > (1, 2)) 

1563 ... .where(func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7)) 

1564 ... ) 

1565 >>> print(stmt) 

1566 {printsql}SELECT t.id, t.fk 

1567 FROM t 

1568 WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2) 

1569 

1570.. seealso:: 

1571 

1572 `PostgreSQL Row Constructors 

1573 <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_ 

1574 

1575 `PostgreSQL Row Constructor Comparison 

1576 <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_ 

1577 

1578Table Types passed to Functions 

1579^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

1580 

1581PostgreSQL supports passing a table as an argument to a function, which is 

1582known as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects 

1583such as :class:`_schema.Table` support this special form using the 

1584:meth:`_sql.FromClause.table_valued` method, which is comparable to the 

1585:meth:`_functions.FunctionElement.table_valued` method except that the collection 

1586of columns is already established by that of the :class:`_sql.FromClause` 

1587itself: 

1588 

1589.. sourcecode:: pycon+sql 

1590 

1591 >>> from sqlalchemy import table, column, func, select 

1592 >>> a = table("a", column("id"), column("x"), column("y")) 

1593 >>> stmt = select(func.row_to_json(a.table_valued())) 

1594 >>> print(stmt) 

1595 {printsql}SELECT row_to_json(a) AS row_to_json_1 

1596 FROM a 

1597 

1598.. versionadded:: 1.4.0b2 

1599 

1600 

1601 

1602""" # noqa: E501 

1603 

1604from __future__ import annotations 

1605 

1606from collections import defaultdict 

1607from functools import lru_cache 

1608import re 

1609from typing import Any 

1610from typing import cast 

1611from typing import Dict 

1612from typing import List 

1613from typing import Optional 

1614from typing import Tuple 

1615from typing import TYPE_CHECKING 

1616from typing import Union 

1617 

1618from . import arraylib as _array 

1619from . import json as _json 

1620from . import pg_catalog 

1621from . import ranges as _ranges 

1622from .ext import _regconfig_fn 

1623from .ext import aggregate_order_by 

1624from .hstore import HSTORE 

1625from .named_types import CreateDomainType as CreateDomainType # noqa: F401 

1626from .named_types import CreateEnumType as CreateEnumType # noqa: F401 

1627from .named_types import DOMAIN as DOMAIN # noqa: F401 

1628from .named_types import DropDomainType as DropDomainType # noqa: F401 

1629from .named_types import DropEnumType as DropEnumType # noqa: F401 

1630from .named_types import ENUM as ENUM # noqa: F401 

1631from .named_types import NamedType as NamedType # noqa: F401 

1632from .types import _DECIMAL_TYPES # noqa: F401 

1633from .types import _FLOAT_TYPES # noqa: F401 

1634from .types import _INT_TYPES # noqa: F401 

1635from .types import BIT as BIT 

1636from .types import BYTEA as BYTEA 

1637from .types import CIDR as CIDR 

1638from .types import CITEXT as CITEXT 

1639from .types import INET as INET 

1640from .types import INTERVAL as INTERVAL 

1641from .types import MACADDR as MACADDR 

1642from .types import MACADDR8 as MACADDR8 

1643from .types import MONEY as MONEY 

1644from .types import OID as OID 

1645from .types import PGBit as PGBit # noqa: F401 

1646from .types import PGCidr as PGCidr # noqa: F401 

1647from .types import PGInet as PGInet # noqa: F401 

1648from .types import PGInterval as PGInterval # noqa: F401 

1649from .types import PGMacAddr as PGMacAddr # noqa: F401 

1650from .types import PGMacAddr8 as PGMacAddr8 # noqa: F401 

1651from .types import PGUuid as PGUuid 

1652from .types import REGCLASS as REGCLASS 

1653from .types import REGCONFIG as REGCONFIG # noqa: F401 

1654from .types import TIME as TIME 

1655from .types import TIMESTAMP as TIMESTAMP 

1656from .types import TSVECTOR as TSVECTOR 

1657from ... import exc 

1658from ... import schema 

1659from ... import select 

1660from ... import sql 

1661from ... import util 

1662from ...engine import characteristics 

1663from ...engine import default 

1664from ...engine import interfaces 

1665from ...engine import ObjectKind 

1666from ...engine import ObjectScope 

1667from ...engine import reflection 

1668from ...engine import URL 

1669from ...engine.reflection import ReflectionDefaults 

1670from ...sql import bindparam 

1671from ...sql import coercions 

1672from ...sql import compiler 

1673from ...sql import elements 

1674from ...sql import expression 

1675from ...sql import roles 

1676from ...sql import sqltypes 

1677from ...sql import util as sql_util 

1678from ...sql.compiler import InsertmanyvaluesSentinelOpts 

1679from ...sql.visitors import InternalTraversal 

1680from ...types import BIGINT 

1681from ...types import BOOLEAN 

1682from ...types import CHAR 

1683from ...types import DATE 

1684from ...types import DOUBLE_PRECISION 

1685from ...types import FLOAT 

1686from ...types import INTEGER 

1687from ...types import NUMERIC 

1688from ...types import REAL 

1689from ...types import SMALLINT 

1690from ...types import TEXT 

1691from ...types import UUID as UUID 

1692from ...types import VARCHAR 

1693from ...util.typing import TypedDict 

1694 

1695IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I) 

1696 

1697RESERVED_WORDS = { 

1698 "all", 

1699 "analyse", 

1700 "analyze", 

1701 "and", 

1702 "any", 

1703 "array", 

1704 "as", 

1705 "asc", 

1706 "asymmetric", 

1707 "both", 

1708 "case", 

1709 "cast", 

1710 "check", 

1711 "collate", 

1712 "column", 

1713 "constraint", 

1714 "create", 

1715 "current_catalog", 

1716 "current_date", 

1717 "current_role", 

1718 "current_time", 

1719 "current_timestamp", 

1720 "current_user", 

1721 "default", 

1722 "deferrable", 

1723 "desc", 

1724 "distinct", 

1725 "do", 

1726 "else", 

1727 "end", 

1728 "except", 

1729 "false", 

1730 "fetch", 

1731 "for", 

1732 "foreign", 

1733 "from", 

1734 "grant", 

1735 "group", 

1736 "having", 

1737 "in", 

1738 "initially", 

1739 "intersect", 

1740 "into", 

1741 "leading", 

1742 "limit", 

1743 "localtime", 

1744 "localtimestamp", 

1745 "new", 

1746 "not", 

1747 "null", 

1748 "of", 

1749 "off", 

1750 "offset", 

1751 "old", 

1752 "on", 

1753 "only", 

1754 "or", 

1755 "order", 

1756 "placing", 

1757 "primary", 

1758 "references", 

1759 "returning", 

1760 "select", 

1761 "session_user", 

1762 "some", 

1763 "symmetric", 

1764 "table", 

1765 "then", 

1766 "to", 

1767 "trailing", 

1768 "true", 

1769 "union", 

1770 "unique", 

1771 "user", 

1772 "using", 

1773 "variadic", 

1774 "when", 

1775 "where", 

1776 "window", 

1777 "with", 

1778 "authorization", 

1779 "between", 

1780 "binary", 

1781 "cross", 

1782 "current_schema", 

1783 "freeze", 

1784 "full", 

1785 "ilike", 

1786 "inner", 

1787 "is", 

1788 "isnull", 

1789 "join", 

1790 "left", 

1791 "like", 

1792 "natural", 

1793 "notnull", 

1794 "outer", 

1795 "over", 

1796 "overlaps", 

1797 "right", 

1798 "similar", 

1799 "verbose", 

1800} 

1801 

1802 

1803colspecs = { 

1804 sqltypes.ARRAY: _array.ARRAY, 

1805 sqltypes.Interval: INTERVAL, 

1806 sqltypes.Enum: ENUM, 

1807 sqltypes.JSON.JSONPathType: _json.JSONPATH, 

1808 sqltypes.JSON: _json.JSON, 

1809 sqltypes.Uuid: PGUuid, 

1810} 

1811 

1812 

1813ischema_names = { 

1814 "_array": _array.ARRAY, 

1815 "hstore": HSTORE, 

1816 "json": _json.JSON, 

1817 "jsonb": _json.JSONB, 

1818 "int4range": _ranges.INT4RANGE, 

1819 "int8range": _ranges.INT8RANGE, 

1820 "numrange": _ranges.NUMRANGE, 

1821 "daterange": _ranges.DATERANGE, 

1822 "tsrange": _ranges.TSRANGE, 

1823 "tstzrange": _ranges.TSTZRANGE, 

1824 "int4multirange": _ranges.INT4MULTIRANGE, 

1825 "int8multirange": _ranges.INT8MULTIRANGE, 

1826 "nummultirange": _ranges.NUMMULTIRANGE, 

1827 "datemultirange": _ranges.DATEMULTIRANGE, 

1828 "tsmultirange": _ranges.TSMULTIRANGE, 

1829 "tstzmultirange": _ranges.TSTZMULTIRANGE, 

1830 "integer": INTEGER, 

1831 "bigint": BIGINT, 

1832 "smallint": SMALLINT, 

1833 "character varying": VARCHAR, 

1834 "character": CHAR, 

1835 '"char"': sqltypes.String, 

1836 "name": sqltypes.String, 

1837 "text": TEXT, 

1838 "numeric": NUMERIC, 

1839 "float": FLOAT, 

1840 "real": REAL, 

1841 "inet": INET, 

1842 "cidr": CIDR, 

1843 "citext": CITEXT, 

1844 "uuid": UUID, 

1845 "bit": BIT, 

1846 "bit varying": BIT, 

1847 "macaddr": MACADDR, 

1848 "macaddr8": MACADDR8, 

1849 "money": MONEY, 

1850 "oid": OID, 

1851 "regclass": REGCLASS, 

1852 "double precision": DOUBLE_PRECISION, 

1853 "timestamp": TIMESTAMP, 

1854 "timestamp with time zone": TIMESTAMP, 

1855 "timestamp without time zone": TIMESTAMP, 

1856 "time with time zone": TIME, 

1857 "time without time zone": TIME, 

1858 "date": DATE, 

1859 "time": TIME, 

1860 "bytea": BYTEA, 

1861 "boolean": BOOLEAN, 

1862 "interval": INTERVAL, 

1863 "tsvector": TSVECTOR, 

1864} 

1865 

1866 

1867class PGCompiler(compiler.SQLCompiler): 

1868 def visit_to_tsvector_func(self, element, **kw): 

1869 return self._assert_pg_ts_ext(element, **kw) 

1870 

1871 def visit_to_tsquery_func(self, element, **kw): 

1872 return self._assert_pg_ts_ext(element, **kw) 

1873 

1874 def visit_plainto_tsquery_func(self, element, **kw): 

1875 return self._assert_pg_ts_ext(element, **kw) 

1876 

1877 def visit_phraseto_tsquery_func(self, element, **kw): 

1878 return self._assert_pg_ts_ext(element, **kw) 

1879 

1880 def visit_websearch_to_tsquery_func(self, element, **kw): 

1881 return self._assert_pg_ts_ext(element, **kw) 

1882 

1883 def visit_ts_headline_func(self, element, **kw): 

1884 return self._assert_pg_ts_ext(element, **kw) 

1885 

1886 def _assert_pg_ts_ext(self, element, **kw): 

1887 if not isinstance(element, _regconfig_fn): 

1888 # other options here include trying to rewrite the function 

1889 # with the correct types. however, that means we have to 

1890 # "un-SQL-ize" the first argument, which can't work in a 

1891 # generalized way. Also, parent compiler class has already added 

1892 # the incorrect return type to the result map. So let's just 

1893 # make sure the function we want is used up front. 

1894 

1895 raise exc.CompileError( 

1896 f'Can\'t compile "{element.name}()" full text search ' 

1897 f"function construct that does not originate from the " 

1898 f'"sqlalchemy.dialects.postgresql" package. ' 

1899 f'Please ensure "import sqlalchemy.dialects.postgresql" is ' 

1900 f"called before constructing " 

1901 f'"sqlalchemy.func.{element.name}()" to ensure registration ' 

1902 f"of the correct argument and return types." 

1903 ) 

1904 

1905 return f"{element.name}{self.function_argspec(element, **kw)}" 

1906 

1907 def render_bind_cast(self, type_, dbapi_type, sqltext): 

1908 if dbapi_type._type_affinity is sqltypes.String and dbapi_type.length: 

1909 # use VARCHAR with no length for VARCHAR cast. 

1910 # see #9511 

1911 dbapi_type = sqltypes.STRINGTYPE 

1912 return f"""{sqltext}::{ 

1913 self.dialect.type_compiler_instance.process( 

1914 dbapi_type, identifier_preparer=self.preparer 

1915 ) 

1916 }""" 

1917 

1918 def visit_array(self, element, **kw): 

1919 if not element.clauses and not element.type.item_type._isnull: 

1920 return "ARRAY[]::%s" % element.type.compile(self.dialect) 

1921 return "ARRAY[%s]" % self.visit_clauselist(element, **kw) 

1922 

1923 def visit_slice(self, element, **kw): 

1924 return "%s:%s" % ( 

1925 self.process(element.start, **kw), 

1926 self.process(element.stop, **kw), 

1927 ) 

1928 

1929 def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 

1930 return self._generate_generic_binary(binary, " # ", **kw) 

1931 

1932 def visit_json_getitem_op_binary( 

1933 self, binary, operator, _cast_applied=False, **kw 

1934 ): 

1935 if ( 

1936 not _cast_applied 

1937 and binary.type._type_affinity is not sqltypes.JSON 

1938 ): 

1939 kw["_cast_applied"] = True 

1940 return self.process(sql.cast(binary, binary.type), **kw) 

1941 

1942 kw["eager_grouping"] = True 

1943 

1944 if ( 

1945 not _cast_applied 

1946 and isinstance(binary.left.type, _json.JSONB) 

1947 and self.dialect._supports_jsonb_subscripting 

1948 ): 

1949 # for pg14+JSONB use subscript notation: col['key'] instead 

1950 # of col -> 'key' 

1951 return "%s[%s]" % ( 

1952 self.process(binary.left, **kw), 

1953 self.process(binary.right, **kw), 

1954 ) 

1955 else: 

1956 # Fall back to arrow notation for older versions or when cast 

1957 # is applied 

1958 return self._generate_generic_binary( 

1959 binary, " -> " if not _cast_applied else " ->> ", **kw 

1960 ) 

1961 

1962 def visit_json_path_getitem_op_binary( 

1963 self, binary, operator, _cast_applied=False, **kw 

1964 ): 

1965 if ( 

1966 not _cast_applied 

1967 and binary.type._type_affinity is not sqltypes.JSON 

1968 ): 

1969 kw["_cast_applied"] = True 

1970 return self.process(sql.cast(binary, binary.type), **kw) 

1971 

1972 kw["eager_grouping"] = True 

1973 return self._generate_generic_binary( 

1974 binary, " #> " if not _cast_applied else " #>> ", **kw 

1975 ) 

1976 

1977 def visit_getitem_binary(self, binary, operator, **kw): 

1978 return "%s[%s]" % ( 

1979 self.process(binary.left, **kw), 

1980 self.process(binary.right, **kw), 

1981 ) 

1982 

1983 def visit_aggregate_order_by(self, element, **kw): 

1984 return "%s ORDER BY %s" % ( 

1985 self.process(element.target, **kw), 

1986 self.process(element.order_by, **kw), 

1987 ) 

1988 

1989 def visit_match_op_binary(self, binary, operator, **kw): 

1990 if "postgresql_regconfig" in binary.modifiers: 

1991 regconfig = self.render_literal_value( 

1992 binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE 

1993 ) 

1994 if regconfig: 

1995 return "%s @@ plainto_tsquery(%s, %s)" % ( 

1996 self.process(binary.left, **kw), 

1997 regconfig, 

1998 self.process(binary.right, **kw), 

1999 ) 

2000 return "%s @@ plainto_tsquery(%s)" % ( 

2001 self.process(binary.left, **kw), 

2002 self.process(binary.right, **kw), 

2003 ) 

2004 

2005 def visit_ilike_case_insensitive_operand(self, element, **kw): 

2006 return element.element._compiler_dispatch(self, **kw) 

2007 

2008 def visit_ilike_op_binary(self, binary, operator, **kw): 

2009 escape = binary.modifiers.get("escape", None) 

2010 

2011 return "%s ILIKE %s" % ( 

2012 self.process(binary.left, **kw), 

2013 self.process(binary.right, **kw), 

2014 ) + ( 

2015 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

2016 if escape is not None 

2017 else "" 

2018 ) 

2019 

2020 def visit_not_ilike_op_binary(self, binary, operator, **kw): 

2021 escape = binary.modifiers.get("escape", None) 

2022 return "%s NOT ILIKE %s" % ( 

2023 self.process(binary.left, **kw), 

2024 self.process(binary.right, **kw), 

2025 ) + ( 

2026 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

2027 if escape is not None 

2028 else "" 

2029 ) 

2030 

2031 def _regexp_match(self, base_op, binary, operator, kw): 

2032 flags = binary.modifiers["flags"] 

2033 if flags is None: 

2034 return self._generate_generic_binary( 

2035 binary, " %s " % base_op, **kw 

2036 ) 

2037 if flags == "i": 

2038 return self._generate_generic_binary( 

2039 binary, " %s* " % base_op, **kw 

2040 ) 

2041 return "%s %s CONCAT('(?', %s, ')', %s)" % ( 

2042 self.process(binary.left, **kw), 

2043 base_op, 

2044 self.render_literal_value(flags, sqltypes.STRINGTYPE), 

2045 self.process(binary.right, **kw), 

2046 ) 

2047 

2048 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

2049 return self._regexp_match("~", binary, operator, kw) 

2050 

2051 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

2052 return self._regexp_match("!~", binary, operator, kw) 

2053 

2054 def visit_regexp_replace_op_binary(self, binary, operator, **kw): 

2055 string = self.process(binary.left, **kw) 

2056 pattern_replace = self.process(binary.right, **kw) 

2057 flags = binary.modifiers["flags"] 

2058 if flags is None: 

2059 return "REGEXP_REPLACE(%s, %s)" % ( 

2060 string, 

2061 pattern_replace, 

2062 ) 

2063 else: 

2064 return "REGEXP_REPLACE(%s, %s, %s)" % ( 

2065 string, 

2066 pattern_replace, 

2067 self.render_literal_value(flags, sqltypes.STRINGTYPE), 

2068 ) 

2069 

2070 def visit_empty_set_expr(self, element_types, **kw): 

2071 # cast the empty set to the type we are comparing against. if 

2072 # we are comparing against the null type, pick an arbitrary 

2073 # datatype for the empty set 

2074 return "SELECT %s WHERE 1!=1" % ( 

2075 ", ".join( 

2076 "CAST(NULL AS %s)" 

2077 % self.dialect.type_compiler_instance.process( 

2078 INTEGER() if type_._isnull else type_ 

2079 ) 

2080 for type_ in element_types or [INTEGER()] 

2081 ), 

2082 ) 

2083 

2084 def render_literal_value(self, value, type_): 

2085 value = super().render_literal_value(value, type_) 

2086 

2087 if self.dialect._backslash_escapes: 

2088 value = value.replace("\\", "\\\\") 

2089 return value 

2090 

2091 def visit_aggregate_strings_func(self, fn, **kw): 

2092 return "string_agg%s" % self.function_argspec(fn) 

2093 

2094 def visit_sequence(self, seq, **kw): 

2095 return "nextval('%s')" % self.preparer.format_sequence(seq) 

2096 

2097 def limit_clause(self, select, **kw): 

2098 text = "" 

2099 if select._limit_clause is not None: 

2100 text += " \n LIMIT " + self.process(select._limit_clause, **kw) 

2101 if select._offset_clause is not None: 

2102 if select._limit_clause is None: 

2103 text += "\n LIMIT ALL" 

2104 text += " OFFSET " + self.process(select._offset_clause, **kw) 

2105 return text 

2106 

2107 def format_from_hint_text(self, sqltext, table, hint, iscrud): 

2108 if hint.upper() != "ONLY": 

2109 raise exc.CompileError("Unrecognized hint: %r" % hint) 

2110 return "ONLY " + sqltext 

2111 

2112 def get_select_precolumns(self, select, **kw): 

2113 # Do not call super().get_select_precolumns because 

2114 # it will warn/raise when distinct on is present 

2115 if select._distinct or select._distinct_on: 

2116 if select._distinct_on: 

2117 return ( 

2118 "DISTINCT ON (" 

2119 + ", ".join( 

2120 [ 

2121 self.process(col, **kw) 

2122 for col in select._distinct_on 

2123 ] 

2124 ) 

2125 + ") " 

2126 ) 

2127 else: 

2128 return "DISTINCT " 

2129 else: 

2130 return "" 

2131 

2132 def for_update_clause(self, select, **kw): 

2133 if select._for_update_arg.read: 

2134 if select._for_update_arg.key_share: 

2135 tmp = " FOR KEY SHARE" 

2136 else: 

2137 tmp = " FOR SHARE" 

2138 elif select._for_update_arg.key_share: 

2139 tmp = " FOR NO KEY UPDATE" 

2140 else: 

2141 tmp = " FOR UPDATE" 

2142 

2143 if select._for_update_arg.of: 

2144 tables = util.OrderedSet() 

2145 for c in select._for_update_arg.of: 

2146 tables.update(sql_util.surface_selectables_only(c)) 

2147 

2148 of_kw = dict(kw) 

2149 of_kw.update(ashint=True, use_schema=False) 

2150 tmp += " OF " + ", ".join( 

2151 self.process(table, **of_kw) for table in tables 

2152 ) 

2153 

2154 if select._for_update_arg.nowait: 

2155 tmp += " NOWAIT" 

2156 if select._for_update_arg.skip_locked: 

2157 tmp += " SKIP LOCKED" 

2158 

2159 return tmp 

2160 

2161 def visit_substring_func(self, func, **kw): 

2162 s = self.process(func.clauses.clauses[0], **kw) 

2163 start = self.process(func.clauses.clauses[1], **kw) 

2164 if len(func.clauses.clauses) > 2: 

2165 length = self.process(func.clauses.clauses[2], **kw) 

2166 return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length) 

2167 else: 

2168 return "SUBSTRING(%s FROM %s)" % (s, start) 

2169 

2170 def _on_conflict_target(self, clause, **kw): 

2171 if clause.constraint_target is not None: 

2172 # target may be a name of an Index, UniqueConstraint or 

2173 # ExcludeConstraint. While there is a separate 

2174 # "max_identifier_length" for indexes, PostgreSQL uses the same 

2175 # length for all objects so we can use 

2176 # truncate_and_render_constraint_name 

2177 target_text = ( 

2178 "ON CONSTRAINT %s" 

2179 % self.preparer.truncate_and_render_constraint_name( 

2180 clause.constraint_target 

2181 ) 

2182 ) 

2183 elif clause.inferred_target_elements is not None: 

2184 target_text = "(%s)" % ", ".join( 

2185 ( 

2186 self.preparer.quote(c) 

2187 if isinstance(c, str) 

2188 else self.process(c, include_table=False, use_schema=False) 

2189 ) 

2190 for c in clause.inferred_target_elements 

2191 ) 

2192 if clause.inferred_target_whereclause is not None: 

2193 target_text += " WHERE %s" % self.process( 

2194 clause.inferred_target_whereclause, 

2195 include_table=False, 

2196 use_schema=False, 

2197 ) 

2198 else: 

2199 target_text = "" 

2200 

2201 return target_text 

2202 

2203 def visit_on_conflict_do_nothing(self, on_conflict, **kw): 

2204 target_text = self._on_conflict_target(on_conflict, **kw) 

2205 

2206 if target_text: 

2207 return "ON CONFLICT %s DO NOTHING" % target_text 

2208 else: 

2209 return "ON CONFLICT DO NOTHING" 

2210 

2211 def visit_on_conflict_do_update(self, on_conflict, **kw): 

2212 clause = on_conflict 

2213 

2214 target_text = self._on_conflict_target(on_conflict, **kw) 

2215 

2216 action_set_ops = [] 

2217 

2218 set_parameters = dict(clause.update_values_to_set) 

2219 # create a list of column assignment clauses as tuples 

2220 

2221 insert_statement = self.stack[-1]["selectable"] 

2222 cols = insert_statement.table.c 

2223 for c in cols: 

2224 col_key = c.key 

2225 

2226 if col_key in set_parameters: 

2227 value = set_parameters.pop(col_key) 

2228 elif c in set_parameters: 

2229 value = set_parameters.pop(c) 

2230 else: 

2231 continue 

2232 

2233 # TODO: this coercion should be up front. we can't cache 

2234 # SQL constructs with non-bound literals buried in them 

2235 if coercions._is_literal(value): 

2236 value = elements.BindParameter(None, value, type_=c.type) 

2237 

2238 else: 

2239 if ( 

2240 isinstance(value, elements.BindParameter) 

2241 and value.type._isnull 

2242 ): 

2243 value = value._clone() 

2244 value.type = c.type 

2245 value_text = self.process(value.self_group(), use_schema=False) 

2246 

2247 key_text = self.preparer.quote(c.name) 

2248 action_set_ops.append("%s = %s" % (key_text, value_text)) 

2249 

2250 # check for names that don't match columns 

2251 if set_parameters: 

2252 util.warn( 

2253 "Additional column names not matching " 

2254 "any column keys in table '%s': %s" 

2255 % ( 

2256 self.current_executable.table.name, 

2257 (", ".join("'%s'" % c for c in set_parameters)), 

2258 ) 

2259 ) 

2260 for k, v in set_parameters.items(): 

2261 key_text = ( 

2262 self.preparer.quote(k) 

2263 if isinstance(k, str) 

2264 else self.process(k, use_schema=False) 

2265 ) 

2266 value_text = self.process( 

2267 coercions.expect(roles.ExpressionElementRole, v), 

2268 use_schema=False, 

2269 ) 

2270 action_set_ops.append("%s = %s" % (key_text, value_text)) 

2271 

2272 action_text = ", ".join(action_set_ops) 

2273 if clause.update_whereclause is not None: 

2274 action_text += " WHERE %s" % self.process( 

2275 clause.update_whereclause, include_table=True, use_schema=False 

2276 ) 

2277 

2278 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) 

2279 

2280 def update_from_clause( 

2281 self, update_stmt, from_table, extra_froms, from_hints, **kw 

2282 ): 

2283 kw["asfrom"] = True 

2284 return "FROM " + ", ".join( 

2285 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

2286 for t in extra_froms 

2287 ) 

2288 

2289 def delete_extra_from_clause( 

2290 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

2291 ): 

2292 """Render the DELETE .. USING clause specific to PostgreSQL.""" 

2293 kw["asfrom"] = True 

2294 return "USING " + ", ".join( 

2295 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

2296 for t in extra_froms 

2297 ) 

2298 

2299 def fetch_clause(self, select, **kw): 

2300 # pg requires parens for non literal clauses. It's also required for 

2301 # bind parameters if a ::type casts is used by the driver (asyncpg), 

2302 # so it's easiest to just always add it 

2303 text = "" 

2304 if select._offset_clause is not None: 

2305 text += "\n OFFSET (%s) ROWS" % self.process( 

2306 select._offset_clause, **kw 

2307 ) 

2308 if select._fetch_clause is not None: 

2309 text += "\n FETCH FIRST (%s)%s ROWS %s" % ( 

2310 self.process(select._fetch_clause, **kw), 

2311 " PERCENT" if select._fetch_clause_options["percent"] else "", 

2312 ( 

2313 "WITH TIES" 

2314 if select._fetch_clause_options["with_ties"] 

2315 else "ONLY" 

2316 ), 

2317 ) 

2318 return text 

2319 

2320 

2321class PGDDLCompiler(compiler.DDLCompiler): 

2322 def get_column_specification(self, column, **kwargs): 

2323 colspec = self.preparer.format_column(column) 

2324 impl_type = column.type.dialect_impl(self.dialect) 

2325 if isinstance(impl_type, sqltypes.TypeDecorator): 

2326 impl_type = impl_type.impl 

2327 

2328 has_identity = ( 

2329 column.identity is not None 

2330 and self.dialect.supports_identity_columns 

2331 ) 

2332 

2333 if ( 

2334 column.primary_key 

2335 and column is column.table._autoincrement_column 

2336 and ( 

2337 self.dialect.supports_smallserial 

2338 or not isinstance(impl_type, sqltypes.SmallInteger) 

2339 ) 

2340 and not has_identity 

2341 and ( 

2342 column.default is None 

2343 or ( 

2344 isinstance(column.default, schema.Sequence) 

2345 and column.default.optional 

2346 ) 

2347 ) 

2348 ): 

2349 if isinstance(impl_type, sqltypes.BigInteger): 

2350 colspec += " BIGSERIAL" 

2351 elif isinstance(impl_type, sqltypes.SmallInteger): 

2352 colspec += " SMALLSERIAL" 

2353 else: 

2354 colspec += " SERIAL" 

2355 else: 

2356 colspec += " " + self.dialect.type_compiler_instance.process( 

2357 column.type, 

2358 type_expression=column, 

2359 identifier_preparer=self.preparer, 

2360 ) 

2361 default = self.get_column_default_string(column) 

2362 if default is not None: 

2363 colspec += " DEFAULT " + default 

2364 

2365 if column.computed is not None: 

2366 colspec += " " + self.process(column.computed) 

2367 if has_identity: 

2368 colspec += " " + self.process(column.identity) 

2369 

2370 if not column.nullable and not has_identity: 

2371 colspec += " NOT NULL" 

2372 elif column.nullable and has_identity: 

2373 colspec += " NULL" 

2374 return colspec 

2375 

2376 def _define_constraint_validity(self, constraint): 

2377 not_valid = constraint.dialect_options["postgresql"]["not_valid"] 

2378 return " NOT VALID" if not_valid else "" 

2379 

2380 def _define_include(self, obj): 

2381 includeclause = obj.dialect_options["postgresql"]["include"] 

2382 if not includeclause: 

2383 return "" 

2384 inclusions = [ 

2385 obj.table.c[col] if isinstance(col, str) else col 

2386 for col in includeclause 

2387 ] 

2388 return " INCLUDE (%s)" % ", ".join( 

2389 [self.preparer.quote(c.name) for c in inclusions] 

2390 ) 

2391 

2392 def visit_check_constraint(self, constraint, **kw): 

2393 if constraint._type_bound: 

2394 typ = list(constraint.columns)[0].type 

2395 if ( 

2396 isinstance(typ, sqltypes.ARRAY) 

2397 and isinstance(typ.item_type, sqltypes.Enum) 

2398 and not typ.item_type.native_enum 

2399 ): 

2400 raise exc.CompileError( 

2401 "PostgreSQL dialect cannot produce the CHECK constraint " 

2402 "for ARRAY of non-native ENUM; please specify " 

2403 "create_constraint=False on this Enum datatype." 

2404 ) 

2405 

2406 text = super().visit_check_constraint(constraint) 

2407 text += self._define_constraint_validity(constraint) 

2408 return text 

2409 

2410 def visit_foreign_key_constraint(self, constraint, **kw): 

2411 text = super().visit_foreign_key_constraint(constraint) 

2412 text += self._define_constraint_validity(constraint) 

2413 return text 

2414 

2415 def visit_primary_key_constraint(self, constraint, **kw): 

2416 text = self.define_constraint_preamble(constraint, **kw) 

2417 text += self.define_primary_key_body(constraint, **kw) 

2418 text += self._define_include(constraint) 

2419 text += self.define_constraint_deferrability(constraint) 

2420 return text 

2421 

2422 def visit_unique_constraint(self, constraint, **kw): 

2423 if len(constraint) == 0: 

2424 return "" 

2425 text = self.define_constraint_preamble(constraint, **kw) 

2426 text += self.define_unique_body(constraint, **kw) 

2427 text += self._define_include(constraint) 

2428 text += self.define_constraint_deferrability(constraint) 

2429 return text 

2430 

2431 @util.memoized_property 

2432 def _fk_ondelete_pattern(self): 

2433 return re.compile( 

2434 r"^(?:RESTRICT|CASCADE|SET (?:NULL|DEFAULT)(?:\s*\(.+\))?" 

2435 r"|NO ACTION)$", 

2436 re.I, 

2437 ) 

2438 

2439 def define_constraint_ondelete_cascade(self, constraint): 

2440 return " ON DELETE %s" % self.preparer.validate_sql_phrase( 

2441 constraint.ondelete, self._fk_ondelete_pattern 

2442 ) 

2443 

2444 def visit_create_enum_type(self, create, **kw): 

2445 type_ = create.element 

2446 

2447 return "CREATE TYPE %s AS ENUM (%s)" % ( 

2448 self.preparer.format_type(type_), 

2449 ", ".join( 

2450 self.sql_compiler.process(sql.literal(e), literal_binds=True) 

2451 for e in type_.enums 

2452 ), 

2453 ) 

2454 

2455 def visit_drop_enum_type(self, drop, **kw): 

2456 type_ = drop.element 

2457 

2458 return "DROP TYPE %s" % (self.preparer.format_type(type_)) 

2459 

2460 def visit_create_domain_type(self, create, **kw): 

2461 domain: DOMAIN = create.element 

2462 

2463 options = [] 

2464 if domain.collation is not None: 

2465 options.append(f"COLLATE {self.preparer.quote(domain.collation)}") 

2466 if domain.default is not None: 

2467 default = self.render_default_string(domain.default) 

2468 options.append(f"DEFAULT {default}") 

2469 if domain.constraint_name is not None: 

2470 name = self.preparer.truncate_and_render_constraint_name( 

2471 domain.constraint_name 

2472 ) 

2473 options.append(f"CONSTRAINT {name}") 

2474 if domain.not_null: 

2475 options.append("NOT NULL") 

2476 if domain.check is not None: 

2477 check = self.sql_compiler.process( 

2478 domain.check, include_table=False, literal_binds=True 

2479 ) 

2480 options.append(f"CHECK ({check})") 

2481 

2482 return ( 

2483 f"CREATE DOMAIN {self.preparer.format_type(domain)} AS " 

2484 f"{self.type_compiler.process(domain.data_type)} " 

2485 f"{' '.join(options)}" 

2486 ) 

2487 

2488 def visit_drop_domain_type(self, drop, **kw): 

2489 domain = drop.element 

2490 return f"DROP DOMAIN {self.preparer.format_type(domain)}" 

2491 

2492 def visit_create_index(self, create, **kw): 

2493 preparer = self.preparer 

2494 index = create.element 

2495 self._verify_index_table(index) 

2496 text = "CREATE " 

2497 if index.unique: 

2498 text += "UNIQUE " 

2499 

2500 text += "INDEX " 

2501 

2502 if self.dialect._supports_create_index_concurrently: 

2503 concurrently = index.dialect_options["postgresql"]["concurrently"] 

2504 if concurrently: 

2505 text += "CONCURRENTLY " 

2506 

2507 if create.if_not_exists: 

2508 text += "IF NOT EXISTS " 

2509 

2510 text += "%s ON %s " % ( 

2511 self._prepared_index_name(index, include_schema=False), 

2512 preparer.format_table(index.table), 

2513 ) 

2514 

2515 using = index.dialect_options["postgresql"]["using"] 

2516 if using: 

2517 text += ( 

2518 "USING %s " 

2519 % self.preparer.validate_sql_phrase(using, IDX_USING).lower() 

2520 ) 

2521 

2522 ops = index.dialect_options["postgresql"]["ops"] 

2523 text += "(%s)" % ( 

2524 ", ".join( 

2525 [ 

2526 self.sql_compiler.process( 

2527 ( 

2528 expr.self_group() 

2529 if not isinstance(expr, expression.ColumnClause) 

2530 else expr 

2531 ), 

2532 include_table=False, 

2533 literal_binds=True, 

2534 ) 

2535 + ( 

2536 (" " + ops[expr.key]) 

2537 if hasattr(expr, "key") and expr.key in ops 

2538 else "" 

2539 ) 

2540 for expr in index.expressions 

2541 ] 

2542 ) 

2543 ) 

2544 

2545 text += self._define_include(index) 

2546 

2547 nulls_not_distinct = index.dialect_options["postgresql"][ 

2548 "nulls_not_distinct" 

2549 ] 

2550 if nulls_not_distinct is True: 

2551 text += " NULLS NOT DISTINCT" 

2552 elif nulls_not_distinct is False: 

2553 text += " NULLS DISTINCT" 

2554 

2555 withclause = index.dialect_options["postgresql"]["with"] 

2556 if withclause: 

2557 text += " WITH (%s)" % ( 

2558 ", ".join( 

2559 [ 

2560 "%s = %s" % storage_parameter 

2561 for storage_parameter in withclause.items() 

2562 ] 

2563 ) 

2564 ) 

2565 

2566 tablespace_name = index.dialect_options["postgresql"]["tablespace"] 

2567 if tablespace_name: 

2568 text += " TABLESPACE %s" % preparer.quote(tablespace_name) 

2569 

2570 whereclause = index.dialect_options["postgresql"]["where"] 

2571 if whereclause is not None: 

2572 whereclause = coercions.expect( 

2573 roles.DDLExpressionRole, whereclause 

2574 ) 

2575 

2576 where_compiled = self.sql_compiler.process( 

2577 whereclause, include_table=False, literal_binds=True 

2578 ) 

2579 text += " WHERE " + where_compiled 

2580 

2581 return text 

2582 

2583 def define_unique_constraint_distinct(self, constraint, **kw): 

2584 nulls_not_distinct = constraint.dialect_options["postgresql"][ 

2585 "nulls_not_distinct" 

2586 ] 

2587 if nulls_not_distinct is True: 

2588 nulls_not_distinct_param = "NULLS NOT DISTINCT " 

2589 elif nulls_not_distinct is False: 

2590 nulls_not_distinct_param = "NULLS DISTINCT " 

2591 else: 

2592 nulls_not_distinct_param = "" 

2593 return nulls_not_distinct_param 

2594 

2595 def visit_drop_index(self, drop, **kw): 

2596 index = drop.element 

2597 

2598 text = "\nDROP INDEX " 

2599 

2600 if self.dialect._supports_drop_index_concurrently: 

2601 concurrently = index.dialect_options["postgresql"]["concurrently"] 

2602 if concurrently: 

2603 text += "CONCURRENTLY " 

2604 

2605 if drop.if_exists: 

2606 text += "IF EXISTS " 

2607 

2608 text += self._prepared_index_name(index, include_schema=True) 

2609 return text 

2610 

2611 def visit_exclude_constraint(self, constraint, **kw): 

2612 text = "" 

2613 if constraint.name is not None: 

2614 text += "CONSTRAINT %s " % self.preparer.format_constraint( 

2615 constraint 

2616 ) 

2617 elements = [] 

2618 kw["include_table"] = False 

2619 kw["literal_binds"] = True 

2620 for expr, name, op in constraint._render_exprs: 

2621 exclude_element = self.sql_compiler.process(expr, **kw) + ( 

2622 (" " + constraint.ops[expr.key]) 

2623 if hasattr(expr, "key") and expr.key in constraint.ops 

2624 else "" 

2625 ) 

2626 

2627 elements.append("%s WITH %s" % (exclude_element, op)) 

2628 text += "EXCLUDE USING %s (%s)" % ( 

2629 self.preparer.validate_sql_phrase( 

2630 constraint.using, IDX_USING 

2631 ).lower(), 

2632 ", ".join(elements), 

2633 ) 

2634 if constraint.where is not None: 

2635 text += " WHERE (%s)" % self.sql_compiler.process( 

2636 constraint.where, literal_binds=True 

2637 ) 

2638 text += self.define_constraint_deferrability(constraint) 

2639 return text 

2640 

2641 def post_create_table(self, table): 

2642 table_opts = [] 

2643 pg_opts = table.dialect_options["postgresql"] 

2644 

2645 inherits = pg_opts.get("inherits") 

2646 if inherits is not None: 

2647 if not isinstance(inherits, (list, tuple)): 

2648 inherits = (inherits,) 

2649 table_opts.append( 

2650 "\n INHERITS ( " 

2651 + ", ".join(self.preparer.quote(name) for name in inherits) 

2652 + " )" 

2653 ) 

2654 

2655 if pg_opts["partition_by"]: 

2656 table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"]) 

2657 

2658 if pg_opts["using"]: 

2659 table_opts.append("\n USING %s" % pg_opts["using"]) 

2660 

2661 if pg_opts["with_oids"] is True: 

2662 table_opts.append("\n WITH OIDS") 

2663 elif pg_opts["with_oids"] is False: 

2664 table_opts.append("\n WITHOUT OIDS") 

2665 

2666 if pg_opts["on_commit"]: 

2667 on_commit_options = pg_opts["on_commit"].replace("_", " ").upper() 

2668 table_opts.append("\n ON COMMIT %s" % on_commit_options) 

2669 

2670 if pg_opts["tablespace"]: 

2671 tablespace_name = pg_opts["tablespace"] 

2672 table_opts.append( 

2673 "\n TABLESPACE %s" % self.preparer.quote(tablespace_name) 

2674 ) 

2675 

2676 return "".join(table_opts) 

2677 

2678 def visit_computed_column(self, generated, **kw): 

2679 if generated.persisted is False: 

2680 raise exc.CompileError( 

2681 "PostrgreSQL computed columns do not support 'virtual' " 

2682 "persistence; set the 'persisted' flag to None or True for " 

2683 "PostgreSQL support." 

2684 ) 

2685 

2686 return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process( 

2687 generated.sqltext, include_table=False, literal_binds=True 

2688 ) 

2689 

2690 def visit_create_sequence(self, create, **kw): 

2691 prefix = None 

2692 if create.element.data_type is not None: 

2693 prefix = " AS %s" % self.type_compiler.process( 

2694 create.element.data_type 

2695 ) 

2696 

2697 return super().visit_create_sequence(create, prefix=prefix, **kw) 

2698 

2699 def _can_comment_on_constraint(self, ddl_instance): 

2700 constraint = ddl_instance.element 

2701 if constraint.name is None: 

2702 raise exc.CompileError( 

2703 f"Can't emit COMMENT ON for constraint {constraint!r}: " 

2704 "it has no name" 

2705 ) 

2706 if constraint.table is None: 

2707 raise exc.CompileError( 

2708 f"Can't emit COMMENT ON for constraint {constraint!r}: " 

2709 "it has no associated table" 

2710 ) 

2711 

2712 def visit_set_constraint_comment(self, create, **kw): 

2713 self._can_comment_on_constraint(create) 

2714 return "COMMENT ON CONSTRAINT %s ON %s IS %s" % ( 

2715 self.preparer.format_constraint(create.element), 

2716 self.preparer.format_table(create.element.table), 

2717 self.sql_compiler.render_literal_value( 

2718 create.element.comment, sqltypes.String() 

2719 ), 

2720 ) 

2721 

2722 def visit_drop_constraint_comment(self, drop, **kw): 

2723 self._can_comment_on_constraint(drop) 

2724 return "COMMENT ON CONSTRAINT %s ON %s IS NULL" % ( 

2725 self.preparer.format_constraint(drop.element), 

2726 self.preparer.format_table(drop.element.table), 

2727 ) 

2728 

2729 

2730class PGTypeCompiler(compiler.GenericTypeCompiler): 

2731 def visit_TSVECTOR(self, type_, **kw): 

2732 return "TSVECTOR" 

2733 

2734 def visit_TSQUERY(self, type_, **kw): 

2735 return "TSQUERY" 

2736 

2737 def visit_INET(self, type_, **kw): 

2738 return "INET" 

2739 

2740 def visit_CIDR(self, type_, **kw): 

2741 return "CIDR" 

2742 

2743 def visit_CITEXT(self, type_, **kw): 

2744 return "CITEXT" 

2745 

2746 def visit_MACADDR(self, type_, **kw): 

2747 return "MACADDR" 

2748 

2749 def visit_MACADDR8(self, type_, **kw): 

2750 return "MACADDR8" 

2751 

2752 def visit_MONEY(self, type_, **kw): 

2753 return "MONEY" 

2754 

2755 def visit_OID(self, type_, **kw): 

2756 return "OID" 

2757 

2758 def visit_REGCONFIG(self, type_, **kw): 

2759 return "REGCONFIG" 

2760 

2761 def visit_REGCLASS(self, type_, **kw): 

2762 return "REGCLASS" 

2763 

2764 def visit_FLOAT(self, type_, **kw): 

2765 if not type_.precision: 

2766 return "FLOAT" 

2767 else: 

2768 return "FLOAT(%(precision)s)" % {"precision": type_.precision} 

2769 

2770 def visit_double(self, type_, **kw): 

2771 return self.visit_DOUBLE_PRECISION(type, **kw) 

2772 

2773 def visit_BIGINT(self, type_, **kw): 

2774 return "BIGINT" 

2775 

2776 def visit_HSTORE(self, type_, **kw): 

2777 return "HSTORE" 

2778 

2779 def visit_JSON(self, type_, **kw): 

2780 return "JSON" 

2781 

2782 def visit_JSONB(self, type_, **kw): 

2783 return "JSONB" 

2784 

2785 def visit_INT4MULTIRANGE(self, type_, **kw): 

2786 return "INT4MULTIRANGE" 

2787 

2788 def visit_INT8MULTIRANGE(self, type_, **kw): 

2789 return "INT8MULTIRANGE" 

2790 

2791 def visit_NUMMULTIRANGE(self, type_, **kw): 

2792 return "NUMMULTIRANGE" 

2793 

2794 def visit_DATEMULTIRANGE(self, type_, **kw): 

2795 return "DATEMULTIRANGE" 

2796 

2797 def visit_TSMULTIRANGE(self, type_, **kw): 

2798 return "TSMULTIRANGE" 

2799 

2800 def visit_TSTZMULTIRANGE(self, type_, **kw): 

2801 return "TSTZMULTIRANGE" 

2802 

2803 def visit_INT4RANGE(self, type_, **kw): 

2804 return "INT4RANGE" 

2805 

2806 def visit_INT8RANGE(self, type_, **kw): 

2807 return "INT8RANGE" 

2808 

2809 def visit_NUMRANGE(self, type_, **kw): 

2810 return "NUMRANGE" 

2811 

2812 def visit_DATERANGE(self, type_, **kw): 

2813 return "DATERANGE" 

2814 

2815 def visit_TSRANGE(self, type_, **kw): 

2816 return "TSRANGE" 

2817 

2818 def visit_TSTZRANGE(self, type_, **kw): 

2819 return "TSTZRANGE" 

2820 

2821 def visit_json_int_index(self, type_, **kw): 

2822 return "INT" 

2823 

2824 def visit_json_str_index(self, type_, **kw): 

2825 return "TEXT" 

2826 

2827 def visit_datetime(self, type_, **kw): 

2828 return self.visit_TIMESTAMP(type_, **kw) 

2829 

2830 def visit_enum(self, type_, **kw): 

2831 if not type_.native_enum or not self.dialect.supports_native_enum: 

2832 return super().visit_enum(type_, **kw) 

2833 else: 

2834 return self.visit_ENUM(type_, **kw) 

2835 

2836 def visit_ENUM(self, type_, identifier_preparer=None, **kw): 

2837 if identifier_preparer is None: 

2838 identifier_preparer = self.dialect.identifier_preparer 

2839 return identifier_preparer.format_type(type_) 

2840 

2841 def visit_DOMAIN(self, type_, identifier_preparer=None, **kw): 

2842 if identifier_preparer is None: 

2843 identifier_preparer = self.dialect.identifier_preparer 

2844 return identifier_preparer.format_type(type_) 

2845 

2846 def visit_TIMESTAMP(self, type_, **kw): 

2847 return "TIMESTAMP%s %s" % ( 

2848 ( 

2849 "(%d)" % type_.precision 

2850 if getattr(type_, "precision", None) is not None 

2851 else "" 

2852 ), 

2853 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", 

2854 ) 

2855 

2856 def visit_TIME(self, type_, **kw): 

2857 return "TIME%s %s" % ( 

2858 ( 

2859 "(%d)" % type_.precision 

2860 if getattr(type_, "precision", None) is not None 

2861 else "" 

2862 ), 

2863 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", 

2864 ) 

2865 

2866 def visit_INTERVAL(self, type_, **kw): 

2867 text = "INTERVAL" 

2868 if type_.fields is not None: 

2869 text += " " + type_.fields 

2870 if type_.precision is not None: 

2871 text += " (%d)" % type_.precision 

2872 return text 

2873 

2874 def visit_BIT(self, type_, **kw): 

2875 if type_.varying: 

2876 compiled = "BIT VARYING" 

2877 if type_.length is not None: 

2878 compiled += "(%d)" % type_.length 

2879 else: 

2880 compiled = "BIT(%d)" % type_.length 

2881 return compiled 

2882 

2883 def visit_uuid(self, type_, **kw): 

2884 if type_.native_uuid: 

2885 return self.visit_UUID(type_, **kw) 

2886 else: 

2887 return super().visit_uuid(type_, **kw) 

2888 

2889 def visit_UUID(self, type_, **kw): 

2890 return "UUID" 

2891 

2892 def visit_large_binary(self, type_, **kw): 

2893 return self.visit_BYTEA(type_, **kw) 

2894 

2895 def visit_BYTEA(self, type_, **kw): 

2896 return "BYTEA" 

2897 

2898 def visit_ARRAY(self, type_, **kw): 

2899 inner = self.process(type_.item_type, **kw) 

2900 return re.sub( 

2901 r"((?: COLLATE.*)?)$", 

2902 ( 

2903 r"%s\1" 

2904 % ( 

2905 "[]" 

2906 * (type_.dimensions if type_.dimensions is not None else 1) 

2907 ) 

2908 ), 

2909 inner, 

2910 count=1, 

2911 ) 

2912 

2913 def visit_json_path(self, type_, **kw): 

2914 return self.visit_JSONPATH(type_, **kw) 

2915 

2916 def visit_JSONPATH(self, type_, **kw): 

2917 return "JSONPATH" 

2918 

2919 

2920class PGIdentifierPreparer(compiler.IdentifierPreparer): 

2921 reserved_words = RESERVED_WORDS 

2922 

2923 def _unquote_identifier(self, value): 

2924 if value[0] == self.initial_quote: 

2925 value = value[1:-1].replace( 

2926 self.escape_to_quote, self.escape_quote 

2927 ) 

2928 return value 

2929 

2930 def format_type(self, type_, use_schema=True): 

2931 if not type_.name: 

2932 raise exc.CompileError( 

2933 f"PostgreSQL {type_.__class__.__name__} type requires a name." 

2934 ) 

2935 

2936 name = self.quote(type_.name) 

2937 effective_schema = self.schema_for_object(type_) 

2938 

2939 if ( 

2940 not self.omit_schema 

2941 and use_schema 

2942 and effective_schema is not None 

2943 ): 

2944 name = f"{self.quote_schema(effective_schema)}.{name}" 

2945 return name 

2946 

2947 

2948class ReflectedNamedType(TypedDict): 

2949 """Represents a reflected named type.""" 

2950 

2951 name: str 

2952 """Name of the type.""" 

2953 schema: str 

2954 """The schema of the type.""" 

2955 visible: bool 

2956 """Indicates if this type is in the current search path.""" 

2957 

2958 

2959class ReflectedDomainConstraint(TypedDict): 

2960 """Represents a reflect check constraint of a domain.""" 

2961 

2962 name: str 

2963 """Name of the constraint.""" 

2964 check: str 

2965 """The check constraint text.""" 

2966 

2967 

2968class ReflectedDomain(ReflectedNamedType): 

2969 """Represents a reflected enum.""" 

2970 

2971 type: str 

2972 """The string name of the underlying data type of the domain.""" 

2973 nullable: bool 

2974 """Indicates if the domain allows null or not.""" 

2975 default: Optional[str] 

2976 """The string representation of the default value of this domain 

2977 or ``None`` if none present. 

2978 """ 

2979 constraints: List[ReflectedDomainConstraint] 

2980 """The constraints defined in the domain, if any. 

2981 The constraint are in order of evaluation by postgresql. 

2982 """ 

2983 collation: Optional[str] 

2984 """The collation for the domain.""" 

2985 

2986 

2987class ReflectedEnum(ReflectedNamedType): 

2988 """Represents a reflected enum.""" 

2989 

2990 labels: List[str] 

2991 """The labels that compose the enum.""" 

2992 

2993 

2994class PGInspector(reflection.Inspector): 

2995 dialect: PGDialect 

2996 

2997 def get_table_oid( 

2998 self, table_name: str, schema: Optional[str] = None 

2999 ) -> int: 

3000 """Return the OID for the given table name. 

3001 

3002 :param table_name: string name of the table. For special quoting, 

3003 use :class:`.quoted_name`. 

3004 

3005 :param schema: string schema name; if omitted, uses the default schema 

3006 of the database connection. For special quoting, 

3007 use :class:`.quoted_name`. 

3008 

3009 """ 

3010 

3011 with self._operation_context() as conn: 

3012 return self.dialect.get_table_oid( 

3013 conn, table_name, schema, info_cache=self.info_cache 

3014 ) 

3015 

3016 def get_domains( 

3017 self, schema: Optional[str] = None 

3018 ) -> List[ReflectedDomain]: 

3019 """Return a list of DOMAIN objects. 

3020 

3021 Each member is a dictionary containing these fields: 

3022 

3023 * name - name of the domain 

3024 * schema - the schema name for the domain. 

3025 * visible - boolean, whether or not this domain is visible 

3026 in the default search path. 

3027 * type - the type defined by this domain. 

3028 * nullable - Indicates if this domain can be ``NULL``. 

3029 * default - The default value of the domain or ``None`` if the 

3030 domain has no default. 

3031 * constraints - A list of dict wit the constraint defined by this 

3032 domain. Each element constaints two keys: ``name`` of the 

3033 constraint and ``check`` with the constraint text. 

3034 

3035 :param schema: schema name. If None, the default schema 

3036 (typically 'public') is used. May also be set to ``'*'`` to 

3037 indicate load domains for all schemas. 

3038 

3039 .. versionadded:: 2.0 

3040 

3041 """ 

3042 with self._operation_context() as conn: 

3043 return self.dialect._load_domains( 

3044 conn, schema, info_cache=self.info_cache 

3045 ) 

3046 

3047 def get_enums(self, schema: Optional[str] = None) -> List[ReflectedEnum]: 

3048 """Return a list of ENUM objects. 

3049 

3050 Each member is a dictionary containing these fields: 

3051 

3052 * name - name of the enum 

3053 * schema - the schema name for the enum. 

3054 * visible - boolean, whether or not this enum is visible 

3055 in the default search path. 

3056 * labels - a list of string labels that apply to the enum. 

3057 

3058 :param schema: schema name. If None, the default schema 

3059 (typically 'public') is used. May also be set to ``'*'`` to 

3060 indicate load enums for all schemas. 

3061 

3062 """ 

3063 with self._operation_context() as conn: 

3064 return self.dialect._load_enums( 

3065 conn, schema, info_cache=self.info_cache 

3066 ) 

3067 

3068 def get_foreign_table_names( 

3069 self, schema: Optional[str] = None 

3070 ) -> List[str]: 

3071 """Return a list of FOREIGN TABLE names. 

3072 

3073 Behavior is similar to that of 

3074 :meth:`_reflection.Inspector.get_table_names`, 

3075 except that the list is limited to those tables that report a 

3076 ``relkind`` value of ``f``. 

3077 

3078 """ 

3079 with self._operation_context() as conn: 

3080 return self.dialect._get_foreign_table_names( 

3081 conn, schema, info_cache=self.info_cache 

3082 ) 

3083 

3084 def has_type( 

3085 self, type_name: str, schema: Optional[str] = None, **kw: Any 

3086 ) -> bool: 

3087 """Return if the database has the specified type in the provided 

3088 schema. 

3089 

3090 :param type_name: the type to check. 

3091 :param schema: schema name. If None, the default schema 

3092 (typically 'public') is used. May also be set to ``'*'`` to 

3093 check in all schemas. 

3094 

3095 .. versionadded:: 2.0 

3096 

3097 """ 

3098 with self._operation_context() as conn: 

3099 return self.dialect.has_type( 

3100 conn, type_name, schema, info_cache=self.info_cache 

3101 ) 

3102 

3103 

3104class PGExecutionContext(default.DefaultExecutionContext): 

3105 def fire_sequence(self, seq, type_): 

3106 return self._execute_scalar( 

3107 ( 

3108 "select nextval('%s')" 

3109 % self.identifier_preparer.format_sequence(seq) 

3110 ), 

3111 type_, 

3112 ) 

3113 

3114 def get_insert_default(self, column): 

3115 if column.primary_key and column is column.table._autoincrement_column: 

3116 if column.server_default and column.server_default.has_argument: 

3117 # pre-execute passive defaults on primary key columns 

3118 return self._execute_scalar( 

3119 "select %s" % column.server_default.arg, column.type 

3120 ) 

3121 

3122 elif column.default is None or ( 

3123 column.default.is_sequence and column.default.optional 

3124 ): 

3125 # execute the sequence associated with a SERIAL primary 

3126 # key column. for non-primary-key SERIAL, the ID just 

3127 # generates server side. 

3128 

3129 try: 

3130 seq_name = column._postgresql_seq_name 

3131 except AttributeError: 

3132 tab = column.table.name 

3133 col = column.name 

3134 tab = tab[0 : 29 + max(0, (29 - len(col)))] 

3135 col = col[0 : 29 + max(0, (29 - len(tab)))] 

3136 name = "%s_%s_seq" % (tab, col) 

3137 column._postgresql_seq_name = seq_name = name 

3138 

3139 if column.table is not None: 

3140 effective_schema = self.connection.schema_for_object( 

3141 column.table 

3142 ) 

3143 else: 

3144 effective_schema = None 

3145 

3146 if effective_schema is not None: 

3147 exc = 'select nextval(\'"%s"."%s"\')' % ( 

3148 effective_schema, 

3149 seq_name, 

3150 ) 

3151 else: 

3152 exc = "select nextval('\"%s\"')" % (seq_name,) 

3153 

3154 return self._execute_scalar(exc, column.type) 

3155 

3156 return super().get_insert_default(column) 

3157 

3158 

3159class PGReadOnlyConnectionCharacteristic( 

3160 characteristics.ConnectionCharacteristic 

3161): 

3162 transactional = True 

3163 

3164 def reset_characteristic(self, dialect, dbapi_conn): 

3165 dialect.set_readonly(dbapi_conn, False) 

3166 

3167 def set_characteristic(self, dialect, dbapi_conn, value): 

3168 dialect.set_readonly(dbapi_conn, value) 

3169 

3170 def get_characteristic(self, dialect, dbapi_conn): 

3171 return dialect.get_readonly(dbapi_conn) 

3172 

3173 

3174class PGDeferrableConnectionCharacteristic( 

3175 characteristics.ConnectionCharacteristic 

3176): 

3177 transactional = True 

3178 

3179 def reset_characteristic(self, dialect, dbapi_conn): 

3180 dialect.set_deferrable(dbapi_conn, False) 

3181 

3182 def set_characteristic(self, dialect, dbapi_conn, value): 

3183 dialect.set_deferrable(dbapi_conn, value) 

3184 

3185 def get_characteristic(self, dialect, dbapi_conn): 

3186 return dialect.get_deferrable(dbapi_conn) 

3187 

3188 

3189class PGDialect(default.DefaultDialect): 

3190 name = "postgresql" 

3191 supports_statement_cache = True 

3192 supports_alter = True 

3193 max_identifier_length = 63 

3194 supports_sane_rowcount = True 

3195 

3196 bind_typing = interfaces.BindTyping.RENDER_CASTS 

3197 

3198 supports_native_enum = True 

3199 supports_native_boolean = True 

3200 supports_native_uuid = True 

3201 supports_smallserial = True 

3202 

3203 supports_sequences = True 

3204 sequences_optional = True 

3205 preexecute_autoincrement_sequences = True 

3206 postfetch_lastrowid = False 

3207 use_insertmanyvalues = True 

3208 

3209 returns_native_bytes = True 

3210 

3211 insertmanyvalues_implicit_sentinel = ( 

3212 InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT 

3213 | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT 

3214 | InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS 

3215 ) 

3216 

3217 supports_comments = True 

3218 supports_constraint_comments = True 

3219 supports_default_values = True 

3220 

3221 supports_default_metavalue = True 

3222 

3223 supports_empty_insert = False 

3224 supports_multivalues_insert = True 

3225 

3226 supports_identity_columns = True 

3227 

3228 default_paramstyle = "pyformat" 

3229 ischema_names = ischema_names 

3230 colspecs = colspecs 

3231 

3232 statement_compiler = PGCompiler 

3233 ddl_compiler = PGDDLCompiler 

3234 type_compiler_cls = PGTypeCompiler 

3235 preparer = PGIdentifierPreparer 

3236 execution_ctx_cls = PGExecutionContext 

3237 inspector = PGInspector 

3238 

3239 update_returning = True 

3240 delete_returning = True 

3241 insert_returning = True 

3242 update_returning_multifrom = True 

3243 delete_returning_multifrom = True 

3244 

3245 connection_characteristics = ( 

3246 default.DefaultDialect.connection_characteristics 

3247 ) 

3248 connection_characteristics = connection_characteristics.union( 

3249 { 

3250 "postgresql_readonly": PGReadOnlyConnectionCharacteristic(), 

3251 "postgresql_deferrable": PGDeferrableConnectionCharacteristic(), 

3252 } 

3253 ) 

3254 

3255 construct_arguments = [ 

3256 ( 

3257 schema.Index, 

3258 { 

3259 "using": False, 

3260 "include": None, 

3261 "where": None, 

3262 "ops": {}, 

3263 "concurrently": False, 

3264 "with": {}, 

3265 "tablespace": None, 

3266 "nulls_not_distinct": None, 

3267 }, 

3268 ), 

3269 ( 

3270 schema.Table, 

3271 { 

3272 "ignore_search_path": False, 

3273 "tablespace": None, 

3274 "partition_by": None, 

3275 "with_oids": None, 

3276 "on_commit": None, 

3277 "inherits": None, 

3278 "using": None, 

3279 }, 

3280 ), 

3281 ( 

3282 schema.CheckConstraint, 

3283 { 

3284 "not_valid": False, 

3285 }, 

3286 ), 

3287 ( 

3288 schema.ForeignKeyConstraint, 

3289 { 

3290 "not_valid": False, 

3291 }, 

3292 ), 

3293 ( 

3294 schema.PrimaryKeyConstraint, 

3295 {"include": None}, 

3296 ), 

3297 ( 

3298 schema.UniqueConstraint, 

3299 { 

3300 "include": None, 

3301 "nulls_not_distinct": None, 

3302 }, 

3303 ), 

3304 ] 

3305 

3306 reflection_options = ("postgresql_ignore_search_path",) 

3307 

3308 _backslash_escapes = True 

3309 _supports_create_index_concurrently = True 

3310 _supports_drop_index_concurrently = True 

3311 _supports_jsonb_subscripting = True 

3312 

3313 def __init__( 

3314 self, 

3315 native_inet_types=None, 

3316 json_serializer=None, 

3317 json_deserializer=None, 

3318 **kwargs, 

3319 ): 

3320 default.DefaultDialect.__init__(self, **kwargs) 

3321 

3322 self._native_inet_types = native_inet_types 

3323 self._json_deserializer = json_deserializer 

3324 self._json_serializer = json_serializer 

3325 

3326 def initialize(self, connection): 

3327 super().initialize(connection) 

3328 

3329 # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689 

3330 self.supports_smallserial = self.server_version_info >= (9, 2) 

3331 

3332 self._set_backslash_escapes(connection) 

3333 

3334 self._supports_drop_index_concurrently = self.server_version_info >= ( 

3335 9, 

3336 2, 

3337 ) 

3338 self.supports_identity_columns = self.server_version_info >= (10,) 

3339 

3340 self._supports_jsonb_subscripting = self.server_version_info >= (14,) 

3341 

3342 def get_isolation_level_values(self, dbapi_conn): 

3343 # note the generic dialect doesn't have AUTOCOMMIT, however 

3344 # all postgresql dialects should include AUTOCOMMIT. 

3345 return ( 

3346 "SERIALIZABLE", 

3347 "READ UNCOMMITTED", 

3348 "READ COMMITTED", 

3349 "REPEATABLE READ", 

3350 ) 

3351 

3352 def set_isolation_level(self, dbapi_connection, level): 

3353 cursor = dbapi_connection.cursor() 

3354 cursor.execute( 

3355 "SET SESSION CHARACTERISTICS AS TRANSACTION " 

3356 f"ISOLATION LEVEL {level}" 

3357 ) 

3358 cursor.execute("COMMIT") 

3359 cursor.close() 

3360 

3361 def get_isolation_level(self, dbapi_connection): 

3362 cursor = dbapi_connection.cursor() 

3363 cursor.execute("show transaction isolation level") 

3364 val = cursor.fetchone()[0] 

3365 cursor.close() 

3366 return val.upper() 

3367 

3368 def set_readonly(self, connection, value): 

3369 raise NotImplementedError() 

3370 

3371 def get_readonly(self, connection): 

3372 raise NotImplementedError() 

3373 

3374 def set_deferrable(self, connection, value): 

3375 raise NotImplementedError() 

3376 

3377 def get_deferrable(self, connection): 

3378 raise NotImplementedError() 

3379 

3380 def _split_multihost_from_url(self, url: URL) -> Union[ 

3381 Tuple[None, None], 

3382 Tuple[Tuple[Optional[str], ...], Tuple[Optional[int], ...]], 

3383 ]: 

3384 hosts: Optional[Tuple[Optional[str], ...]] = None 

3385 ports_str: Union[str, Tuple[Optional[str], ...], None] = None 

3386 

3387 integrated_multihost = False 

3388 

3389 if "host" in url.query: 

3390 if isinstance(url.query["host"], (list, tuple)): 

3391 integrated_multihost = True 

3392 hosts, ports_str = zip( 

3393 *[ 

3394 token.split(":") if ":" in token else (token, None) 

3395 for token in url.query["host"] 

3396 ] 

3397 ) 

3398 

3399 elif isinstance(url.query["host"], str): 

3400 hosts = tuple(url.query["host"].split(",")) 

3401 

3402 if ( 

3403 "port" not in url.query 

3404 and len(hosts) == 1 

3405 and ":" in hosts[0] 

3406 ): 

3407 # internet host is alphanumeric plus dots or hyphens. 

3408 # this is essentially rfc1123, which refers to rfc952. 

3409 # https://stackoverflow.com/questions/3523028/ 

3410 # valid-characters-of-a-hostname 

3411 host_port_match = re.match( 

3412 r"^([a-zA-Z0-9\-\.]*)(?:\:(\d*))?$", hosts[0] 

3413 ) 

3414 if host_port_match: 

3415 integrated_multihost = True 

3416 h, p = host_port_match.group(1, 2) 

3417 if TYPE_CHECKING: 

3418 assert isinstance(h, str) 

3419 assert isinstance(p, str) 

3420 hosts = (h,) 

3421 ports_str = cast( 

3422 "Tuple[Optional[str], ...]", (p,) if p else (None,) 

3423 ) 

3424 

3425 if "port" in url.query: 

3426 if integrated_multihost: 

3427 raise exc.ArgumentError( 

3428 "Can't mix 'multihost' formats together; use " 

3429 '"host=h1,h2,h3&port=p1,p2,p3" or ' 

3430 '"host=h1:p1&host=h2:p2&host=h3:p3" separately' 

3431 ) 

3432 if isinstance(url.query["port"], (list, tuple)): 

3433 ports_str = url.query["port"] 

3434 elif isinstance(url.query["port"], str): 

3435 ports_str = tuple(url.query["port"].split(",")) 

3436 

3437 ports: Optional[Tuple[Optional[int], ...]] = None 

3438 

3439 if ports_str: 

3440 try: 

3441 ports = tuple(int(x) if x else None for x in ports_str) 

3442 except ValueError: 

3443 raise exc.ArgumentError( 

3444 f"Received non-integer port arguments: {ports_str}" 

3445 ) from None 

3446 

3447 if ports and ( 

3448 (not hosts and len(ports) > 1) 

3449 or ( 

3450 hosts 

3451 and ports 

3452 and len(hosts) != len(ports) 

3453 and (len(hosts) > 1 or len(ports) > 1) 

3454 ) 

3455 ): 

3456 raise exc.ArgumentError("number of hosts and ports don't match") 

3457 

3458 if hosts is not None: 

3459 if ports is None: 

3460 ports = tuple(None for _ in hosts) 

3461 

3462 return hosts, ports # type: ignore 

3463 

3464 def do_begin_twophase(self, connection, xid): 

3465 self.do_begin(connection.connection) 

3466 

3467 def do_prepare_twophase(self, connection, xid): 

3468 connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid) 

3469 

3470 def do_rollback_twophase( 

3471 self, connection, xid, is_prepared=True, recover=False 

3472 ): 

3473 if is_prepared: 

3474 if recover: 

3475 # FIXME: ugly hack to get out of transaction 

3476 # context when committing recoverable transactions 

3477 # Must find out a way how to make the dbapi not 

3478 # open a transaction. 

3479 connection.exec_driver_sql("ROLLBACK") 

3480 connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid) 

3481 connection.exec_driver_sql("BEGIN") 

3482 self.do_rollback(connection.connection) 

3483 else: 

3484 self.do_rollback(connection.connection) 

3485 

3486 def do_commit_twophase( 

3487 self, connection, xid, is_prepared=True, recover=False 

3488 ): 

3489 if is_prepared: 

3490 if recover: 

3491 connection.exec_driver_sql("ROLLBACK") 

3492 connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid) 

3493 connection.exec_driver_sql("BEGIN") 

3494 self.do_rollback(connection.connection) 

3495 else: 

3496 self.do_commit(connection.connection) 

3497 

3498 def do_recover_twophase(self, connection): 

3499 return connection.scalars( 

3500 sql.text("SELECT gid FROM pg_prepared_xacts") 

3501 ).all() 

3502 

3503 def _get_default_schema_name(self, connection): 

3504 return connection.exec_driver_sql("select current_schema()").scalar() 

3505 

3506 @reflection.cache 

3507 def has_schema(self, connection, schema, **kw): 

3508 query = select(pg_catalog.pg_namespace.c.nspname).where( 

3509 pg_catalog.pg_namespace.c.nspname == schema 

3510 ) 

3511 return bool(connection.scalar(query)) 

3512 

3513 def _pg_class_filter_scope_schema( 

3514 self, query, schema, scope, pg_class_table=None 

3515 ): 

3516 if pg_class_table is None: 

3517 pg_class_table = pg_catalog.pg_class 

3518 query = query.join( 

3519 pg_catalog.pg_namespace, 

3520 pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace, 

3521 ) 

3522 

3523 if scope is ObjectScope.DEFAULT: 

3524 query = query.where(pg_class_table.c.relpersistence != "t") 

3525 elif scope is ObjectScope.TEMPORARY: 

3526 query = query.where(pg_class_table.c.relpersistence == "t") 

3527 

3528 if schema is None: 

3529 query = query.where( 

3530 pg_catalog.pg_table_is_visible(pg_class_table.c.oid), 

3531 # ignore pg_catalog schema 

3532 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

3533 ) 

3534 else: 

3535 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

3536 return query 

3537 

3538 def _pg_class_relkind_condition(self, relkinds, pg_class_table=None): 

3539 if pg_class_table is None: 

3540 pg_class_table = pg_catalog.pg_class 

3541 # uses the any form instead of in otherwise postgresql complaings 

3542 # that 'IN could not convert type character to "char"' 

3543 return pg_class_table.c.relkind == sql.any_(_array.array(relkinds)) 

3544 

3545 @lru_cache() 

3546 def _has_table_query(self, schema): 

3547 query = select(pg_catalog.pg_class.c.relname).where( 

3548 pg_catalog.pg_class.c.relname == bindparam("table_name"), 

3549 self._pg_class_relkind_condition( 

3550 pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3551 ), 

3552 ) 

3553 return self._pg_class_filter_scope_schema( 

3554 query, schema, scope=ObjectScope.ANY 

3555 ) 

3556 

3557 @reflection.cache 

3558 def has_table(self, connection, table_name, schema=None, **kw): 

3559 self._ensure_has_table_connection(connection) 

3560 query = self._has_table_query(schema) 

3561 return bool(connection.scalar(query, {"table_name": table_name})) 

3562 

3563 @reflection.cache 

3564 def has_sequence(self, connection, sequence_name, schema=None, **kw): 

3565 query = select(pg_catalog.pg_class.c.relname).where( 

3566 pg_catalog.pg_class.c.relkind == "S", 

3567 pg_catalog.pg_class.c.relname == sequence_name, 

3568 ) 

3569 query = self._pg_class_filter_scope_schema( 

3570 query, schema, scope=ObjectScope.ANY 

3571 ) 

3572 return bool(connection.scalar(query)) 

3573 

3574 @reflection.cache 

3575 def has_type(self, connection, type_name, schema=None, **kw): 

3576 query = ( 

3577 select(pg_catalog.pg_type.c.typname) 

3578 .join( 

3579 pg_catalog.pg_namespace, 

3580 pg_catalog.pg_namespace.c.oid 

3581 == pg_catalog.pg_type.c.typnamespace, 

3582 ) 

3583 .where(pg_catalog.pg_type.c.typname == type_name) 

3584 ) 

3585 if schema is None: 

3586 query = query.where( 

3587 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), 

3588 # ignore pg_catalog schema 

3589 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

3590 ) 

3591 elif schema != "*": 

3592 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

3593 

3594 return bool(connection.scalar(query)) 

3595 

3596 def _get_server_version_info(self, connection): 

3597 v = connection.exec_driver_sql("select pg_catalog.version()").scalar() 

3598 m = re.match( 

3599 r".*(?:PostgreSQL|EnterpriseDB) " 

3600 r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?", 

3601 v, 

3602 ) 

3603 if not m: 

3604 raise AssertionError( 

3605 "Could not determine version from string '%s'" % v 

3606 ) 

3607 return tuple([int(x) for x in m.group(1, 2, 3) if x is not None]) 

3608 

3609 @reflection.cache 

3610 def get_table_oid(self, connection, table_name, schema=None, **kw): 

3611 """Fetch the oid for schema.table_name.""" 

3612 query = select(pg_catalog.pg_class.c.oid).where( 

3613 pg_catalog.pg_class.c.relname == table_name, 

3614 self._pg_class_relkind_condition( 

3615 pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3616 ), 

3617 ) 

3618 query = self._pg_class_filter_scope_schema( 

3619 query, schema, scope=ObjectScope.ANY 

3620 ) 

3621 table_oid = connection.scalar(query) 

3622 if table_oid is None: 

3623 raise exc.NoSuchTableError( 

3624 f"{schema}.{table_name}" if schema else table_name 

3625 ) 

3626 return table_oid 

3627 

3628 @reflection.cache 

3629 def get_schema_names(self, connection, **kw): 

3630 query = ( 

3631 select(pg_catalog.pg_namespace.c.nspname) 

3632 .where(pg_catalog.pg_namespace.c.nspname.not_like("pg_%")) 

3633 .order_by(pg_catalog.pg_namespace.c.nspname) 

3634 ) 

3635 return connection.scalars(query).all() 

3636 

3637 def _get_relnames_for_relkinds(self, connection, schema, relkinds, scope): 

3638 query = select(pg_catalog.pg_class.c.relname).where( 

3639 self._pg_class_relkind_condition(relkinds) 

3640 ) 

3641 query = self._pg_class_filter_scope_schema(query, schema, scope=scope) 

3642 return connection.scalars(query).all() 

3643 

3644 @reflection.cache 

3645 def get_table_names(self, connection, schema=None, **kw): 

3646 return self._get_relnames_for_relkinds( 

3647 connection, 

3648 schema, 

3649 pg_catalog.RELKINDS_TABLE_NO_FOREIGN, 

3650 scope=ObjectScope.DEFAULT, 

3651 ) 

3652 

3653 @reflection.cache 

3654 def get_temp_table_names(self, connection, **kw): 

3655 return self._get_relnames_for_relkinds( 

3656 connection, 

3657 schema=None, 

3658 relkinds=pg_catalog.RELKINDS_TABLE_NO_FOREIGN, 

3659 scope=ObjectScope.TEMPORARY, 

3660 ) 

3661 

3662 @reflection.cache 

3663 def _get_foreign_table_names(self, connection, schema=None, **kw): 

3664 return self._get_relnames_for_relkinds( 

3665 connection, schema, relkinds=("f",), scope=ObjectScope.ANY 

3666 ) 

3667 

3668 @reflection.cache 

3669 def get_view_names(self, connection, schema=None, **kw): 

3670 return self._get_relnames_for_relkinds( 

3671 connection, 

3672 schema, 

3673 pg_catalog.RELKINDS_VIEW, 

3674 scope=ObjectScope.DEFAULT, 

3675 ) 

3676 

3677 @reflection.cache 

3678 def get_materialized_view_names(self, connection, schema=None, **kw): 

3679 return self._get_relnames_for_relkinds( 

3680 connection, 

3681 schema, 

3682 pg_catalog.RELKINDS_MAT_VIEW, 

3683 scope=ObjectScope.DEFAULT, 

3684 ) 

3685 

3686 @reflection.cache 

3687 def get_temp_view_names(self, connection, schema=None, **kw): 

3688 return self._get_relnames_for_relkinds( 

3689 connection, 

3690 schema, 

3691 # NOTE: do not include temp materialzied views (that do not 

3692 # seem to be a thing at least up to version 14) 

3693 pg_catalog.RELKINDS_VIEW, 

3694 scope=ObjectScope.TEMPORARY, 

3695 ) 

3696 

3697 @reflection.cache 

3698 def get_sequence_names(self, connection, schema=None, **kw): 

3699 return self._get_relnames_for_relkinds( 

3700 connection, schema, relkinds=("S",), scope=ObjectScope.ANY 

3701 ) 

3702 

3703 @reflection.cache 

3704 def get_view_definition(self, connection, view_name, schema=None, **kw): 

3705 query = ( 

3706 select(pg_catalog.pg_get_viewdef(pg_catalog.pg_class.c.oid)) 

3707 .select_from(pg_catalog.pg_class) 

3708 .where( 

3709 pg_catalog.pg_class.c.relname == view_name, 

3710 self._pg_class_relkind_condition( 

3711 pg_catalog.RELKINDS_VIEW + pg_catalog.RELKINDS_MAT_VIEW 

3712 ), 

3713 ) 

3714 ) 

3715 query = self._pg_class_filter_scope_schema( 

3716 query, schema, scope=ObjectScope.ANY 

3717 ) 

3718 res = connection.scalar(query) 

3719 if res is None: 

3720 raise exc.NoSuchTableError( 

3721 f"{schema}.{view_name}" if schema else view_name 

3722 ) 

3723 else: 

3724 return res 

3725 

3726 def _value_or_raise(self, data, table, schema): 

3727 try: 

3728 return dict(data)[(schema, table)] 

3729 except KeyError: 

3730 raise exc.NoSuchTableError( 

3731 f"{schema}.{table}" if schema else table 

3732 ) from None 

3733 

3734 def _prepare_filter_names(self, filter_names): 

3735 if filter_names: 

3736 return True, {"filter_names": filter_names} 

3737 else: 

3738 return False, {} 

3739 

3740 def _kind_to_relkinds(self, kind: ObjectKind) -> Tuple[str, ...]: 

3741 if kind is ObjectKind.ANY: 

3742 return pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3743 relkinds = () 

3744 if ObjectKind.TABLE in kind: 

3745 relkinds += pg_catalog.RELKINDS_TABLE 

3746 if ObjectKind.VIEW in kind: 

3747 relkinds += pg_catalog.RELKINDS_VIEW 

3748 if ObjectKind.MATERIALIZED_VIEW in kind: 

3749 relkinds += pg_catalog.RELKINDS_MAT_VIEW 

3750 return relkinds 

3751 

3752 @reflection.cache 

3753 def get_columns(self, connection, table_name, schema=None, **kw): 

3754 data = self.get_multi_columns( 

3755 connection, 

3756 schema=schema, 

3757 filter_names=[table_name], 

3758 scope=ObjectScope.ANY, 

3759 kind=ObjectKind.ANY, 

3760 **kw, 

3761 ) 

3762 return self._value_or_raise(data, table_name, schema) 

3763 

3764 @lru_cache() 

3765 def _columns_query(self, schema, has_filter_names, scope, kind): 

3766 # NOTE: the query with the default and identity options scalar 

3767 # subquery is faster than trying to use outer joins for them 

3768 generated = ( 

3769 pg_catalog.pg_attribute.c.attgenerated.label("generated") 

3770 if self.server_version_info >= (12,) 

3771 else sql.null().label("generated") 

3772 ) 

3773 if self.server_version_info >= (10,): 

3774 # join lateral performs worse (~2x slower) than a scalar_subquery 

3775 identity = ( 

3776 select( 

3777 sql.func.json_build_object( 

3778 "always", 

3779 pg_catalog.pg_attribute.c.attidentity == "a", 

3780 "start", 

3781 pg_catalog.pg_sequence.c.seqstart, 

3782 "increment", 

3783 pg_catalog.pg_sequence.c.seqincrement, 

3784 "minvalue", 

3785 pg_catalog.pg_sequence.c.seqmin, 

3786 "maxvalue", 

3787 pg_catalog.pg_sequence.c.seqmax, 

3788 "cache", 

3789 pg_catalog.pg_sequence.c.seqcache, 

3790 "cycle", 

3791 pg_catalog.pg_sequence.c.seqcycle, 

3792 type_=sqltypes.JSON(), 

3793 ) 

3794 ) 

3795 .select_from(pg_catalog.pg_sequence) 

3796 .where( 

3797 # attidentity != '' is required or it will reflect also 

3798 # serial columns as identity. 

3799 pg_catalog.pg_attribute.c.attidentity != "", 

3800 pg_catalog.pg_sequence.c.seqrelid 

3801 == sql.cast( 

3802 sql.cast( 

3803 pg_catalog.pg_get_serial_sequence( 

3804 sql.cast( 

3805 sql.cast( 

3806 pg_catalog.pg_attribute.c.attrelid, 

3807 REGCLASS, 

3808 ), 

3809 TEXT, 

3810 ), 

3811 pg_catalog.pg_attribute.c.attname, 

3812 ), 

3813 REGCLASS, 

3814 ), 

3815 OID, 

3816 ), 

3817 ) 

3818 .correlate(pg_catalog.pg_attribute) 

3819 .scalar_subquery() 

3820 .label("identity_options") 

3821 ) 

3822 else: 

3823 identity = sql.null().label("identity_options") 

3824 

3825 # join lateral performs the same as scalar_subquery here 

3826 default = ( 

3827 select( 

3828 pg_catalog.pg_get_expr( 

3829 pg_catalog.pg_attrdef.c.adbin, 

3830 pg_catalog.pg_attrdef.c.adrelid, 

3831 ) 

3832 ) 

3833 .select_from(pg_catalog.pg_attrdef) 

3834 .where( 

3835 pg_catalog.pg_attrdef.c.adrelid 

3836 == pg_catalog.pg_attribute.c.attrelid, 

3837 pg_catalog.pg_attrdef.c.adnum 

3838 == pg_catalog.pg_attribute.c.attnum, 

3839 pg_catalog.pg_attribute.c.atthasdef, 

3840 ) 

3841 .correlate(pg_catalog.pg_attribute) 

3842 .scalar_subquery() 

3843 .label("default") 

3844 ) 

3845 

3846 # get the name of the collate when it's different from the default one 

3847 collate = sql.case( 

3848 ( 

3849 sql.and_( 

3850 pg_catalog.pg_attribute.c.attcollation != 0, 

3851 select(pg_catalog.pg_type.c.typcollation) 

3852 .where( 

3853 pg_catalog.pg_type.c.oid 

3854 == pg_catalog.pg_attribute.c.atttypid, 

3855 ) 

3856 .correlate(pg_catalog.pg_attribute) 

3857 .scalar_subquery() 

3858 != pg_catalog.pg_attribute.c.attcollation, 

3859 ), 

3860 select(pg_catalog.pg_collation.c.collname) 

3861 .where( 

3862 pg_catalog.pg_collation.c.oid 

3863 == pg_catalog.pg_attribute.c.attcollation 

3864 ) 

3865 .correlate(pg_catalog.pg_attribute) 

3866 .scalar_subquery(), 

3867 ), 

3868 else_=sql.null(), 

3869 ).label("collation") 

3870 

3871 relkinds = self._kind_to_relkinds(kind) 

3872 query = ( 

3873 select( 

3874 pg_catalog.pg_attribute.c.attname.label("name"), 

3875 pg_catalog.format_type( 

3876 pg_catalog.pg_attribute.c.atttypid, 

3877 pg_catalog.pg_attribute.c.atttypmod, 

3878 ).label("format_type"), 

3879 default, 

3880 pg_catalog.pg_attribute.c.attnotnull.label("not_null"), 

3881 pg_catalog.pg_class.c.relname.label("table_name"), 

3882 pg_catalog.pg_description.c.description.label("comment"), 

3883 generated, 

3884 identity, 

3885 collate, 

3886 ) 

3887 .select_from(pg_catalog.pg_class) 

3888 # NOTE: postgresql support table with no user column, meaning 

3889 # there is no row with pg_attribute.attnum > 0. use a left outer 

3890 # join to avoid filtering these tables. 

3891 .outerjoin( 

3892 pg_catalog.pg_attribute, 

3893 sql.and_( 

3894 pg_catalog.pg_class.c.oid 

3895 == pg_catalog.pg_attribute.c.attrelid, 

3896 pg_catalog.pg_attribute.c.attnum > 0, 

3897 ~pg_catalog.pg_attribute.c.attisdropped, 

3898 ), 

3899 ) 

3900 .outerjoin( 

3901 pg_catalog.pg_description, 

3902 sql.and_( 

3903 pg_catalog.pg_description.c.objoid 

3904 == pg_catalog.pg_attribute.c.attrelid, 

3905 pg_catalog.pg_description.c.objsubid 

3906 == pg_catalog.pg_attribute.c.attnum, 

3907 ), 

3908 ) 

3909 .where(self._pg_class_relkind_condition(relkinds)) 

3910 .order_by( 

3911 pg_catalog.pg_class.c.relname, pg_catalog.pg_attribute.c.attnum 

3912 ) 

3913 ) 

3914 query = self._pg_class_filter_scope_schema(query, schema, scope=scope) 

3915 if has_filter_names: 

3916 query = query.where( 

3917 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

3918 ) 

3919 return query 

3920 

3921 def get_multi_columns( 

3922 self, connection, schema, filter_names, scope, kind, **kw 

3923 ): 

3924 has_filter_names, params = self._prepare_filter_names(filter_names) 

3925 query = self._columns_query(schema, has_filter_names, scope, kind) 

3926 rows = connection.execute(query, params).mappings() 

3927 

3928 # dictionary with (name, ) if default search path or (schema, name) 

3929 # as keys 

3930 domains = { 

3931 ((d["schema"], d["name"]) if not d["visible"] else (d["name"],)): d 

3932 for d in self._load_domains( 

3933 connection, schema="*", info_cache=kw.get("info_cache") 

3934 ) 

3935 } 

3936 

3937 # dictionary with (name, ) if default search path or (schema, name) 

3938 # as keys 

3939 enums = dict( 

3940 ( 

3941 ((rec["name"],), rec) 

3942 if rec["visible"] 

3943 else ((rec["schema"], rec["name"]), rec) 

3944 ) 

3945 for rec in self._load_enums( 

3946 connection, schema="*", info_cache=kw.get("info_cache") 

3947 ) 

3948 ) 

3949 

3950 columns = self._get_columns_info(rows, domains, enums, schema) 

3951 

3952 return columns.items() 

3953 

3954 _format_type_args_pattern = re.compile(r"\((.*)\)") 

3955 _format_type_args_delim = re.compile(r"\s*,\s*") 

3956 _format_array_spec_pattern = re.compile(r"((?:\[\])*)$") 

3957 

3958 def _reflect_type( 

3959 self, 

3960 format_type: Optional[str], 

3961 domains: Dict[str, ReflectedDomain], 

3962 enums: Dict[str, ReflectedEnum], 

3963 type_description: str, 

3964 collation: Optional[str], 

3965 ) -> sqltypes.TypeEngine[Any]: 

3966 """ 

3967 Attempts to reconstruct a column type defined in ischema_names based 

3968 on the information available in the format_type. 

3969 

3970 If the `format_type` cannot be associated with a known `ischema_names`, 

3971 it is treated as a reference to a known PostgreSQL named `ENUM` or 

3972 `DOMAIN` type. 

3973 """ 

3974 type_description = type_description or "unknown type" 

3975 if format_type is None: 

3976 util.warn( 

3977 "PostgreSQL format_type() returned NULL for %s" 

3978 % type_description 

3979 ) 

3980 return sqltypes.NULLTYPE 

3981 

3982 attype_args_match = self._format_type_args_pattern.search(format_type) 

3983 if attype_args_match and attype_args_match.group(1): 

3984 attype_args = self._format_type_args_delim.split( 

3985 attype_args_match.group(1) 

3986 ) 

3987 else: 

3988 attype_args = () 

3989 

3990 match_array_dim = self._format_array_spec_pattern.search(format_type) 

3991 # Each "[]" in array specs corresponds to an array dimension 

3992 array_dim = len(match_array_dim.group(1) or "") // 2 

3993 

3994 # Remove all parameters and array specs from format_type to obtain an 

3995 # ischema_name candidate 

3996 attype = self._format_type_args_pattern.sub("", format_type) 

3997 attype = self._format_array_spec_pattern.sub("", attype) 

3998 

3999 schema_type = self.ischema_names.get(attype.lower(), None) 

4000 args, kwargs = (), {} 

4001 

4002 if attype == "numeric": 

4003 if len(attype_args) == 2: 

4004 precision, scale = map(int, attype_args) 

4005 args = (precision, scale) 

4006 

4007 elif attype == "double precision": 

4008 args = (53,) 

4009 

4010 elif attype == "integer": 

4011 args = () 

4012 

4013 elif attype in ("timestamp with time zone", "time with time zone"): 

4014 kwargs["timezone"] = True 

4015 if len(attype_args) == 1: 

4016 kwargs["precision"] = int(attype_args[0]) 

4017 

4018 elif attype in ( 

4019 "timestamp without time zone", 

4020 "time without time zone", 

4021 "time", 

4022 ): 

4023 kwargs["timezone"] = False 

4024 if len(attype_args) == 1: 

4025 kwargs["precision"] = int(attype_args[0]) 

4026 

4027 elif attype == "bit varying": 

4028 kwargs["varying"] = True 

4029 if len(attype_args) == 1: 

4030 charlen = int(attype_args[0]) 

4031 args = (charlen,) 

4032 

4033 # a domain or enum can start with interval, so be mindful of that. 

4034 elif attype == "interval" or attype.startswith("interval "): 

4035 schema_type = INTERVAL 

4036 

4037 field_match = re.match(r"interval (.+)", attype) 

4038 if field_match: 

4039 kwargs["fields"] = field_match.group(1) 

4040 

4041 if len(attype_args) == 1: 

4042 kwargs["precision"] = int(attype_args[0]) 

4043 

4044 else: 

4045 enum_or_domain_key = tuple(util.quoted_token_parser(attype)) 

4046 

4047 if enum_or_domain_key in enums: 

4048 schema_type = ENUM 

4049 enum = enums[enum_or_domain_key] 

4050 

4051 kwargs["name"] = enum["name"] 

4052 

4053 if not enum["visible"]: 

4054 kwargs["schema"] = enum["schema"] 

4055 args = tuple(enum["labels"]) 

4056 elif enum_or_domain_key in domains: 

4057 schema_type = DOMAIN 

4058 domain = domains[enum_or_domain_key] 

4059 

4060 data_type = self._reflect_type( 

4061 domain["type"], 

4062 domains, 

4063 enums, 

4064 type_description="DOMAIN '%s'" % domain["name"], 

4065 collation=domain["collation"], 

4066 ) 

4067 args = (domain["name"], data_type) 

4068 

4069 kwargs["collation"] = domain["collation"] 

4070 kwargs["default"] = domain["default"] 

4071 kwargs["not_null"] = not domain["nullable"] 

4072 kwargs["create_type"] = False 

4073 

4074 if domain["constraints"]: 

4075 # We only support a single constraint 

4076 check_constraint = domain["constraints"][0] 

4077 

4078 kwargs["constraint_name"] = check_constraint["name"] 

4079 kwargs["check"] = check_constraint["check"] 

4080 

4081 if not domain["visible"]: 

4082 kwargs["schema"] = domain["schema"] 

4083 

4084 else: 

4085 try: 

4086 charlen = int(attype_args[0]) 

4087 args = (charlen, *attype_args[1:]) 

4088 except (ValueError, IndexError): 

4089 args = attype_args 

4090 

4091 if not schema_type: 

4092 util.warn( 

4093 "Did not recognize type '%s' of %s" 

4094 % (attype, type_description) 

4095 ) 

4096 return sqltypes.NULLTYPE 

4097 

4098 if collation is not None: 

4099 kwargs["collation"] = collation 

4100 

4101 data_type = schema_type(*args, **kwargs) 

4102 if array_dim >= 1: 

4103 # postgres does not preserve dimensionality or size of array types. 

4104 data_type = _array.ARRAY(data_type) 

4105 

4106 return data_type 

4107 

4108 def _get_columns_info(self, rows, domains, enums, schema): 

4109 columns = defaultdict(list) 

4110 for row_dict in rows: 

4111 # ensure that each table has an entry, even if it has no columns 

4112 if row_dict["name"] is None: 

4113 columns[(schema, row_dict["table_name"])] = ( 

4114 ReflectionDefaults.columns() 

4115 ) 

4116 continue 

4117 table_cols = columns[(schema, row_dict["table_name"])] 

4118 

4119 collation = row_dict["collation"] 

4120 

4121 coltype = self._reflect_type( 

4122 row_dict["format_type"], 

4123 domains, 

4124 enums, 

4125 type_description="column '%s'" % row_dict["name"], 

4126 collation=collation, 

4127 ) 

4128 

4129 default = row_dict["default"] 

4130 name = row_dict["name"] 

4131 generated = row_dict["generated"] 

4132 nullable = not row_dict["not_null"] 

4133 

4134 if isinstance(coltype, DOMAIN): 

4135 if not default: 

4136 # domain can override the default value but 

4137 # cant set it to None 

4138 if coltype.default is not None: 

4139 default = coltype.default 

4140 

4141 nullable = nullable and not coltype.not_null 

4142 

4143 identity = row_dict["identity_options"] 

4144 

4145 # If a zero byte or blank string depending on driver (is also 

4146 # absent for older PG versions), then not a generated column. 

4147 # Otherwise, s = stored. (Other values might be added in the 

4148 # future.) 

4149 if generated not in (None, "", b"\x00"): 

4150 computed = dict( 

4151 sqltext=default, persisted=generated in ("s", b"s") 

4152 ) 

4153 default = None 

4154 else: 

4155 computed = None 

4156 

4157 # adjust the default value 

4158 autoincrement = False 

4159 if default is not None: 

4160 match = re.search(r"""(nextval\(')([^']+)('.*$)""", default) 

4161 if match is not None: 

4162 if issubclass(coltype._type_affinity, sqltypes.Integer): 

4163 autoincrement = True 

4164 # the default is related to a Sequence 

4165 if "." not in match.group(2) and schema is not None: 

4166 # unconditionally quote the schema name. this could 

4167 # later be enhanced to obey quoting rules / 

4168 # "quote schema" 

4169 default = ( 

4170 match.group(1) 

4171 + ('"%s"' % schema) 

4172 + "." 

4173 + match.group(2) 

4174 + match.group(3) 

4175 ) 

4176 

4177 column_info = { 

4178 "name": name, 

4179 "type": coltype, 

4180 "nullable": nullable, 

4181 "default": default, 

4182 "autoincrement": autoincrement or identity is not None, 

4183 "comment": row_dict["comment"], 

4184 } 

4185 if computed is not None: 

4186 column_info["computed"] = computed 

4187 if identity is not None: 

4188 column_info["identity"] = identity 

4189 

4190 table_cols.append(column_info) 

4191 

4192 return columns 

4193 

4194 @lru_cache() 

4195 def _table_oids_query(self, schema, has_filter_names, scope, kind): 

4196 relkinds = self._kind_to_relkinds(kind) 

4197 oid_q = select( 

4198 pg_catalog.pg_class.c.oid, pg_catalog.pg_class.c.relname 

4199 ).where(self._pg_class_relkind_condition(relkinds)) 

4200 oid_q = self._pg_class_filter_scope_schema(oid_q, schema, scope=scope) 

4201 

4202 if has_filter_names: 

4203 oid_q = oid_q.where( 

4204 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

4205 ) 

4206 return oid_q 

4207 

4208 @reflection.flexi_cache( 

4209 ("schema", InternalTraversal.dp_string), 

4210 ("filter_names", InternalTraversal.dp_string_list), 

4211 ("kind", InternalTraversal.dp_plain_obj), 

4212 ("scope", InternalTraversal.dp_plain_obj), 

4213 ) 

4214 def _get_table_oids( 

4215 self, connection, schema, filter_names, scope, kind, **kw 

4216 ): 

4217 has_filter_names, params = self._prepare_filter_names(filter_names) 

4218 oid_q = self._table_oids_query(schema, has_filter_names, scope, kind) 

4219 result = connection.execute(oid_q, params) 

4220 return result.all() 

4221 

4222 @util.memoized_property 

4223 def _constraint_query(self): 

4224 if self.server_version_info >= (11, 0): 

4225 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts 

4226 else: 

4227 indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts") 

4228 

4229 if self.server_version_info >= (15,): 

4230 indnullsnotdistinct = pg_catalog.pg_index.c.indnullsnotdistinct 

4231 else: 

4232 indnullsnotdistinct = sql.false().label("indnullsnotdistinct") 

4233 

4234 con_sq = ( 

4235 select( 

4236 pg_catalog.pg_constraint.c.conrelid, 

4237 pg_catalog.pg_constraint.c.conname, 

4238 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"), 

4239 sql.func.generate_subscripts( 

4240 pg_catalog.pg_index.c.indkey, 1 

4241 ).label("ord"), 

4242 indnkeyatts, 

4243 indnullsnotdistinct, 

4244 pg_catalog.pg_description.c.description, 

4245 ) 

4246 .join( 

4247 pg_catalog.pg_index, 

4248 pg_catalog.pg_constraint.c.conindid 

4249 == pg_catalog.pg_index.c.indexrelid, 

4250 ) 

4251 .outerjoin( 

4252 pg_catalog.pg_description, 

4253 pg_catalog.pg_description.c.objoid 

4254 == pg_catalog.pg_constraint.c.oid, 

4255 ) 

4256 .where( 

4257 pg_catalog.pg_constraint.c.contype == bindparam("contype"), 

4258 pg_catalog.pg_constraint.c.conrelid.in_(bindparam("oids")), 

4259 # NOTE: filtering also on pg_index.indrelid for oids does 

4260 # not seem to have a performance effect, but it may be an 

4261 # option if perf problems are reported 

4262 ) 

4263 .subquery("con") 

4264 ) 

4265 

4266 attr_sq = ( 

4267 select( 

4268 con_sq.c.conrelid, 

4269 con_sq.c.conname, 

4270 con_sq.c.description, 

4271 con_sq.c.ord, 

4272 con_sq.c.indnkeyatts, 

4273 con_sq.c.indnullsnotdistinct, 

4274 pg_catalog.pg_attribute.c.attname, 

4275 ) 

4276 .select_from(pg_catalog.pg_attribute) 

4277 .join( 

4278 con_sq, 

4279 sql.and_( 

4280 pg_catalog.pg_attribute.c.attnum == con_sq.c.attnum, 

4281 pg_catalog.pg_attribute.c.attrelid == con_sq.c.conrelid, 

4282 ), 

4283 ) 

4284 .where( 

4285 # NOTE: restate the condition here, since pg15 otherwise 

4286 # seems to get confused on pscopg2 sometimes, doing 

4287 # a sequential scan of pg_attribute. 

4288 # The condition in the con_sq subquery is not actually needed 

4289 # in pg15, but it may be needed in older versions. Keeping it 

4290 # does not seems to have any inpact in any case. 

4291 con_sq.c.conrelid.in_(bindparam("oids")) 

4292 ) 

4293 .subquery("attr") 

4294 ) 

4295 

4296 return ( 

4297 select( 

4298 attr_sq.c.conrelid, 

4299 sql.func.array_agg( 

4300 # NOTE: cast since some postgresql derivatives may 

4301 # not support array_agg on the name type 

4302 aggregate_order_by( 

4303 attr_sq.c.attname.cast(TEXT), attr_sq.c.ord 

4304 ) 

4305 ).label("cols"), 

4306 attr_sq.c.conname, 

4307 sql.func.min(attr_sq.c.description).label("description"), 

4308 sql.func.min(attr_sq.c.indnkeyatts).label("indnkeyatts"), 

4309 sql.func.bool_and(attr_sq.c.indnullsnotdistinct).label( 

4310 "indnullsnotdistinct" 

4311 ), 

4312 ) 

4313 .group_by(attr_sq.c.conrelid, attr_sq.c.conname) 

4314 .order_by(attr_sq.c.conrelid, attr_sq.c.conname) 

4315 ) 

4316 

4317 def _reflect_constraint( 

4318 self, connection, contype, schema, filter_names, scope, kind, **kw 

4319 ): 

4320 # used to reflect primary and unique constraint 

4321 table_oids = self._get_table_oids( 

4322 connection, schema, filter_names, scope, kind, **kw 

4323 ) 

4324 batches = list(table_oids) 

4325 is_unique = contype == "u" 

4326 

4327 while batches: 

4328 batch = batches[0:3000] 

4329 batches[0:3000] = [] 

4330 

4331 result = connection.execute( 

4332 self._constraint_query, 

4333 {"oids": [r[0] for r in batch], "contype": contype}, 

4334 ).mappings() 

4335 

4336 result_by_oid = defaultdict(list) 

4337 for row_dict in result: 

4338 result_by_oid[row_dict["conrelid"]].append(row_dict) 

4339 

4340 for oid, tablename in batch: 

4341 for_oid = result_by_oid.get(oid, ()) 

4342 if for_oid: 

4343 for row in for_oid: 

4344 # See note in get_multi_indexes 

4345 all_cols = row["cols"] 

4346 indnkeyatts = row["indnkeyatts"] 

4347 if len(all_cols) > indnkeyatts: 

4348 inc_cols = all_cols[indnkeyatts:] 

4349 cst_cols = all_cols[:indnkeyatts] 

4350 else: 

4351 inc_cols = [] 

4352 cst_cols = all_cols 

4353 

4354 opts = {} 

4355 if self.server_version_info >= (11,): 

4356 opts["postgresql_include"] = inc_cols 

4357 if is_unique: 

4358 opts["postgresql_nulls_not_distinct"] = row[ 

4359 "indnullsnotdistinct" 

4360 ] 

4361 yield ( 

4362 tablename, 

4363 cst_cols, 

4364 row["conname"], 

4365 row["description"], 

4366 opts, 

4367 ) 

4368 else: 

4369 yield tablename, None, None, None, None 

4370 

4371 @reflection.cache 

4372 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

4373 data = self.get_multi_pk_constraint( 

4374 connection, 

4375 schema=schema, 

4376 filter_names=[table_name], 

4377 scope=ObjectScope.ANY, 

4378 kind=ObjectKind.ANY, 

4379 **kw, 

4380 ) 

4381 return self._value_or_raise(data, table_name, schema) 

4382 

4383 def get_multi_pk_constraint( 

4384 self, connection, schema, filter_names, scope, kind, **kw 

4385 ): 

4386 result = self._reflect_constraint( 

4387 connection, "p", schema, filter_names, scope, kind, **kw 

4388 ) 

4389 

4390 # only a single pk can be present for each table. Return an entry 

4391 # even if a table has no primary key 

4392 default = ReflectionDefaults.pk_constraint 

4393 

4394 def pk_constraint(pk_name, cols, comment, opts): 

4395 info = { 

4396 "constrained_columns": cols, 

4397 "name": pk_name, 

4398 "comment": comment, 

4399 } 

4400 if opts: 

4401 info["dialect_options"] = opts 

4402 return info 

4403 

4404 return ( 

4405 ( 

4406 (schema, table_name), 

4407 ( 

4408 pk_constraint(pk_name, cols, comment, opts) 

4409 if pk_name is not None 

4410 else default() 

4411 ), 

4412 ) 

4413 for table_name, cols, pk_name, comment, opts in result 

4414 ) 

4415 

4416 @reflection.cache 

4417 def get_foreign_keys( 

4418 self, 

4419 connection, 

4420 table_name, 

4421 schema=None, 

4422 postgresql_ignore_search_path=False, 

4423 **kw, 

4424 ): 

4425 data = self.get_multi_foreign_keys( 

4426 connection, 

4427 schema=schema, 

4428 filter_names=[table_name], 

4429 postgresql_ignore_search_path=postgresql_ignore_search_path, 

4430 scope=ObjectScope.ANY, 

4431 kind=ObjectKind.ANY, 

4432 **kw, 

4433 ) 

4434 return self._value_or_raise(data, table_name, schema) 

4435 

4436 @lru_cache() 

4437 def _foreing_key_query(self, schema, has_filter_names, scope, kind): 

4438 pg_class_ref = pg_catalog.pg_class.alias("cls_ref") 

4439 pg_namespace_ref = pg_catalog.pg_namespace.alias("nsp_ref") 

4440 relkinds = self._kind_to_relkinds(kind) 

4441 query = ( 

4442 select( 

4443 pg_catalog.pg_class.c.relname, 

4444 pg_catalog.pg_constraint.c.conname, 

4445 # NOTE: avoid calling pg_get_constraintdef when not needed 

4446 # to speed up the query 

4447 sql.case( 

4448 ( 

4449 pg_catalog.pg_constraint.c.oid.is_not(None), 

4450 pg_catalog.pg_get_constraintdef( 

4451 pg_catalog.pg_constraint.c.oid, True 

4452 ), 

4453 ), 

4454 else_=None, 

4455 ), 

4456 pg_namespace_ref.c.nspname, 

4457 pg_catalog.pg_description.c.description, 

4458 ) 

4459 .select_from(pg_catalog.pg_class) 

4460 .outerjoin( 

4461 pg_catalog.pg_constraint, 

4462 sql.and_( 

4463 pg_catalog.pg_class.c.oid 

4464 == pg_catalog.pg_constraint.c.conrelid, 

4465 pg_catalog.pg_constraint.c.contype == "f", 

4466 ), 

4467 ) 

4468 .outerjoin( 

4469 pg_class_ref, 

4470 pg_class_ref.c.oid == pg_catalog.pg_constraint.c.confrelid, 

4471 ) 

4472 .outerjoin( 

4473 pg_namespace_ref, 

4474 pg_class_ref.c.relnamespace == pg_namespace_ref.c.oid, 

4475 ) 

4476 .outerjoin( 

4477 pg_catalog.pg_description, 

4478 pg_catalog.pg_description.c.objoid 

4479 == pg_catalog.pg_constraint.c.oid, 

4480 ) 

4481 .order_by( 

4482 pg_catalog.pg_class.c.relname, 

4483 pg_catalog.pg_constraint.c.conname, 

4484 ) 

4485 .where(self._pg_class_relkind_condition(relkinds)) 

4486 ) 

4487 query = self._pg_class_filter_scope_schema(query, schema, scope) 

4488 if has_filter_names: 

4489 query = query.where( 

4490 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

4491 ) 

4492 return query 

4493 

4494 @util.memoized_property 

4495 def _fk_regex_pattern(self): 

4496 # optionally quoted token 

4497 qtoken = '(?:"[^"]+"|[A-Za-z0-9_]+?)' 

4498 

4499 # https://www.postgresql.org/docs/current/static/sql-createtable.html 

4500 return re.compile( 

4501 r"FOREIGN KEY \((.*?)\) " 

4502 rf"REFERENCES (?:({qtoken})\.)?({qtoken})\(((?:{qtoken}(?: *, *)?)+)\)" # noqa: E501 

4503 r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?" 

4504 r"[\s]?(ON UPDATE " 

4505 r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?" 

4506 r"[\s]?(ON DELETE " 

4507 r"(CASCADE|RESTRICT|NO ACTION|" 

4508 r"SET (?:NULL|DEFAULT)(?:\s\(.+\))?)+)?" 

4509 r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?" 

4510 r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?" 

4511 ) 

4512 

4513 def get_multi_foreign_keys( 

4514 self, 

4515 connection, 

4516 schema, 

4517 filter_names, 

4518 scope, 

4519 kind, 

4520 postgresql_ignore_search_path=False, 

4521 **kw, 

4522 ): 

4523 preparer = self.identifier_preparer 

4524 

4525 has_filter_names, params = self._prepare_filter_names(filter_names) 

4526 query = self._foreing_key_query(schema, has_filter_names, scope, kind) 

4527 result = connection.execute(query, params) 

4528 

4529 FK_REGEX = self._fk_regex_pattern 

4530 

4531 fkeys = defaultdict(list) 

4532 default = ReflectionDefaults.foreign_keys 

4533 for table_name, conname, condef, conschema, comment in result: 

4534 # ensure that each table has an entry, even if it has 

4535 # no foreign keys 

4536 if conname is None: 

4537 fkeys[(schema, table_name)] = default() 

4538 continue 

4539 table_fks = fkeys[(schema, table_name)] 

4540 m = re.search(FK_REGEX, condef).groups() 

4541 

4542 ( 

4543 constrained_columns, 

4544 referred_schema, 

4545 referred_table, 

4546 referred_columns, 

4547 _, 

4548 match, 

4549 _, 

4550 onupdate, 

4551 _, 

4552 ondelete, 

4553 deferrable, 

4554 _, 

4555 initially, 

4556 ) = m 

4557 

4558 if deferrable is not None: 

4559 deferrable = True if deferrable == "DEFERRABLE" else False 

4560 constrained_columns = [ 

4561 preparer._unquote_identifier(x) 

4562 for x in re.split(r"\s*,\s*", constrained_columns) 

4563 ] 

4564 

4565 if postgresql_ignore_search_path: 

4566 # when ignoring search path, we use the actual schema 

4567 # provided it isn't the "default" schema 

4568 if conschema != self.default_schema_name: 

4569 referred_schema = conschema 

4570 else: 

4571 referred_schema = schema 

4572 elif referred_schema: 

4573 # referred_schema is the schema that we regexp'ed from 

4574 # pg_get_constraintdef(). If the schema is in the search 

4575 # path, pg_get_constraintdef() will give us None. 

4576 referred_schema = preparer._unquote_identifier(referred_schema) 

4577 elif schema is not None and schema == conschema: 

4578 # If the actual schema matches the schema of the table 

4579 # we're reflecting, then we will use that. 

4580 referred_schema = schema 

4581 

4582 referred_table = preparer._unquote_identifier(referred_table) 

4583 referred_columns = [ 

4584 preparer._unquote_identifier(x) 

4585 for x in re.split(r"\s*,\s", referred_columns) 

4586 ] 

4587 options = { 

4588 k: v 

4589 for k, v in [ 

4590 ("onupdate", onupdate), 

4591 ("ondelete", ondelete), 

4592 ("initially", initially), 

4593 ("deferrable", deferrable), 

4594 ("match", match), 

4595 ] 

4596 if v is not None and v != "NO ACTION" 

4597 } 

4598 fkey_d = { 

4599 "name": conname, 

4600 "constrained_columns": constrained_columns, 

4601 "referred_schema": referred_schema, 

4602 "referred_table": referred_table, 

4603 "referred_columns": referred_columns, 

4604 "options": options, 

4605 "comment": comment, 

4606 } 

4607 table_fks.append(fkey_d) 

4608 return fkeys.items() 

4609 

4610 @reflection.cache 

4611 def get_indexes(self, connection, table_name, schema=None, **kw): 

4612 data = self.get_multi_indexes( 

4613 connection, 

4614 schema=schema, 

4615 filter_names=[table_name], 

4616 scope=ObjectScope.ANY, 

4617 kind=ObjectKind.ANY, 

4618 **kw, 

4619 ) 

4620 return self._value_or_raise(data, table_name, schema) 

4621 

4622 @util.memoized_property 

4623 def _index_query(self): 

4624 # NOTE: pg_index is used as from two times to improve performance, 

4625 # since extraing all the index information from `idx_sq` to avoid 

4626 # the second pg_index use leads to a worse performing query in 

4627 # particular when querying for a single table (as of pg 17) 

4628 # NOTE: repeating oids clause improve query performance 

4629 

4630 # subquery to get the columns 

4631 idx_sq = ( 

4632 select( 

4633 pg_catalog.pg_index.c.indexrelid, 

4634 pg_catalog.pg_index.c.indrelid, 

4635 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"), 

4636 sql.func.unnest(pg_catalog.pg_index.c.indclass).label( 

4637 "att_opclass" 

4638 ), 

4639 sql.func.generate_subscripts( 

4640 pg_catalog.pg_index.c.indkey, 1 

4641 ).label("ord"), 

4642 ) 

4643 .where( 

4644 ~pg_catalog.pg_index.c.indisprimary, 

4645 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), 

4646 ) 

4647 .subquery("idx") 

4648 ) 

4649 

4650 attr_sq = ( 

4651 select( 

4652 idx_sq.c.indexrelid, 

4653 idx_sq.c.indrelid, 

4654 idx_sq.c.ord, 

4655 # NOTE: always using pg_get_indexdef is too slow so just 

4656 # invoke when the element is an expression 

4657 sql.case( 

4658 ( 

4659 idx_sq.c.attnum == 0, 

4660 pg_catalog.pg_get_indexdef( 

4661 idx_sq.c.indexrelid, idx_sq.c.ord + 1, True 

4662 ), 

4663 ), 

4664 # NOTE: need to cast this since attname is of type "name" 

4665 # that's limited to 63 bytes, while pg_get_indexdef 

4666 # returns "text" so its output may get cut 

4667 else_=pg_catalog.pg_attribute.c.attname.cast(TEXT), 

4668 ).label("element"), 

4669 (idx_sq.c.attnum == 0).label("is_expr"), 

4670 pg_catalog.pg_opclass.c.opcname, 

4671 pg_catalog.pg_opclass.c.opcdefault, 

4672 ) 

4673 .select_from(idx_sq) 

4674 .outerjoin( 

4675 # do not remove rows where idx_sq.c.attnum is 0 

4676 pg_catalog.pg_attribute, 

4677 sql.and_( 

4678 pg_catalog.pg_attribute.c.attnum == idx_sq.c.attnum, 

4679 pg_catalog.pg_attribute.c.attrelid == idx_sq.c.indrelid, 

4680 ), 

4681 ) 

4682 .outerjoin( 

4683 pg_catalog.pg_opclass, 

4684 pg_catalog.pg_opclass.c.oid == idx_sq.c.att_opclass, 

4685 ) 

4686 .where(idx_sq.c.indrelid.in_(bindparam("oids"))) 

4687 .subquery("idx_attr") 

4688 ) 

4689 

4690 cols_sq = ( 

4691 select( 

4692 attr_sq.c.indexrelid, 

4693 sql.func.min(attr_sq.c.indrelid), 

4694 sql.func.array_agg( 

4695 aggregate_order_by(attr_sq.c.element, attr_sq.c.ord) 

4696 ).label("elements"), 

4697 sql.func.array_agg( 

4698 aggregate_order_by(attr_sq.c.is_expr, attr_sq.c.ord) 

4699 ).label("elements_is_expr"), 

4700 sql.func.array_agg( 

4701 aggregate_order_by(attr_sq.c.opcname, attr_sq.c.ord) 

4702 ).label("elements_opclass"), 

4703 sql.func.array_agg( 

4704 aggregate_order_by(attr_sq.c.opcdefault, attr_sq.c.ord) 

4705 ).label("elements_opdefault"), 

4706 ) 

4707 .group_by(attr_sq.c.indexrelid) 

4708 .subquery("idx_cols") 

4709 ) 

4710 

4711 if self.server_version_info >= (11, 0): 

4712 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts 

4713 else: 

4714 indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts") 

4715 

4716 if self.server_version_info >= (15,): 

4717 nulls_not_distinct = pg_catalog.pg_index.c.indnullsnotdistinct 

4718 else: 

4719 nulls_not_distinct = sql.false().label("indnullsnotdistinct") 

4720 

4721 return ( 

4722 select( 

4723 pg_catalog.pg_index.c.indrelid, 

4724 pg_catalog.pg_class.c.relname, 

4725 pg_catalog.pg_index.c.indisunique, 

4726 pg_catalog.pg_constraint.c.conrelid.is_not(None).label( 

4727 "has_constraint" 

4728 ), 

4729 pg_catalog.pg_index.c.indoption, 

4730 pg_catalog.pg_class.c.reloptions, 

4731 pg_catalog.pg_am.c.amname, 

4732 # NOTE: pg_get_expr is very fast so this case has almost no 

4733 # performance impact 

4734 sql.case( 

4735 ( 

4736 pg_catalog.pg_index.c.indpred.is_not(None), 

4737 pg_catalog.pg_get_expr( 

4738 pg_catalog.pg_index.c.indpred, 

4739 pg_catalog.pg_index.c.indrelid, 

4740 ), 

4741 ), 

4742 else_=None, 

4743 ).label("filter_definition"), 

4744 indnkeyatts, 

4745 nulls_not_distinct, 

4746 cols_sq.c.elements, 

4747 cols_sq.c.elements_is_expr, 

4748 cols_sq.c.elements_opclass, 

4749 cols_sq.c.elements_opdefault, 

4750 ) 

4751 .select_from(pg_catalog.pg_index) 

4752 .where( 

4753 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), 

4754 ~pg_catalog.pg_index.c.indisprimary, 

4755 ) 

4756 .join( 

4757 pg_catalog.pg_class, 

4758 pg_catalog.pg_index.c.indexrelid == pg_catalog.pg_class.c.oid, 

4759 ) 

4760 .join( 

4761 pg_catalog.pg_am, 

4762 pg_catalog.pg_class.c.relam == pg_catalog.pg_am.c.oid, 

4763 ) 

4764 .outerjoin( 

4765 cols_sq, 

4766 pg_catalog.pg_index.c.indexrelid == cols_sq.c.indexrelid, 

4767 ) 

4768 .outerjoin( 

4769 pg_catalog.pg_constraint, 

4770 sql.and_( 

4771 pg_catalog.pg_index.c.indrelid 

4772 == pg_catalog.pg_constraint.c.conrelid, 

4773 pg_catalog.pg_index.c.indexrelid 

4774 == pg_catalog.pg_constraint.c.conindid, 

4775 pg_catalog.pg_constraint.c.contype 

4776 == sql.any_(_array.array(("p", "u", "x"))), 

4777 ), 

4778 ) 

4779 .order_by( 

4780 pg_catalog.pg_index.c.indrelid, pg_catalog.pg_class.c.relname 

4781 ) 

4782 ) 

4783 

4784 def get_multi_indexes( 

4785 self, connection, schema, filter_names, scope, kind, **kw 

4786 ): 

4787 table_oids = self._get_table_oids( 

4788 connection, schema, filter_names, scope, kind, **kw 

4789 ) 

4790 

4791 indexes = defaultdict(list) 

4792 default = ReflectionDefaults.indexes 

4793 

4794 batches = list(table_oids) 

4795 

4796 while batches: 

4797 batch = batches[0:3000] 

4798 batches[0:3000] = [] 

4799 

4800 result = connection.execute( 

4801 self._index_query, {"oids": [r[0] for r in batch]} 

4802 ).mappings() 

4803 

4804 result_by_oid = defaultdict(list) 

4805 for row_dict in result: 

4806 result_by_oid[row_dict["indrelid"]].append(row_dict) 

4807 

4808 for oid, table_name in batch: 

4809 if oid not in result_by_oid: 

4810 # ensure that each table has an entry, even if reflection 

4811 # is skipped because not supported 

4812 indexes[(schema, table_name)] = default() 

4813 continue 

4814 

4815 for row in result_by_oid[oid]: 

4816 index_name = row["relname"] 

4817 

4818 table_indexes = indexes[(schema, table_name)] 

4819 

4820 all_elements = row["elements"] 

4821 all_elements_is_expr = row["elements_is_expr"] 

4822 all_elements_opclass = row["elements_opclass"] 

4823 all_elements_opdefault = row["elements_opdefault"] 

4824 indnkeyatts = row["indnkeyatts"] 

4825 # "The number of key columns in the index, not counting any 

4826 # included columns, which are merely stored and do not 

4827 # participate in the index semantics" 

4828 if len(all_elements) > indnkeyatts: 

4829 # this is a "covering index" which has INCLUDE columns 

4830 # as well as regular index columns 

4831 inc_cols = all_elements[indnkeyatts:] 

4832 idx_elements = all_elements[:indnkeyatts] 

4833 idx_elements_is_expr = all_elements_is_expr[ 

4834 :indnkeyatts 

4835 ] 

4836 # postgresql does not support expression on included 

4837 # columns as of v14: "ERROR: expressions are not 

4838 # supported in included columns". 

4839 assert all( 

4840 not is_expr 

4841 for is_expr in all_elements_is_expr[indnkeyatts:] 

4842 ) 

4843 idx_elements_opclass = all_elements_opclass[ 

4844 :indnkeyatts 

4845 ] 

4846 idx_elements_opdefault = all_elements_opdefault[ 

4847 :indnkeyatts 

4848 ] 

4849 else: 

4850 idx_elements = all_elements 

4851 idx_elements_is_expr = all_elements_is_expr 

4852 inc_cols = [] 

4853 idx_elements_opclass = all_elements_opclass 

4854 idx_elements_opdefault = all_elements_opdefault 

4855 

4856 index = {"name": index_name, "unique": row["indisunique"]} 

4857 if any(idx_elements_is_expr): 

4858 index["column_names"] = [ 

4859 None if is_expr else expr 

4860 for expr, is_expr in zip( 

4861 idx_elements, idx_elements_is_expr 

4862 ) 

4863 ] 

4864 index["expressions"] = idx_elements 

4865 else: 

4866 index["column_names"] = idx_elements 

4867 

4868 dialect_options = {} 

4869 

4870 if not all(idx_elements_opdefault): 

4871 dialect_options["postgresql_ops"] = { 

4872 name: opclass 

4873 for name, opclass, is_default in zip( 

4874 idx_elements, 

4875 idx_elements_opclass, 

4876 idx_elements_opdefault, 

4877 ) 

4878 if not is_default 

4879 } 

4880 

4881 sorting = {} 

4882 for col_index, col_flags in enumerate(row["indoption"]): 

4883 col_sorting = () 

4884 # try to set flags only if they differ from PG 

4885 # defaults... 

4886 if col_flags & 0x01: 

4887 col_sorting += ("desc",) 

4888 if not (col_flags & 0x02): 

4889 col_sorting += ("nulls_last",) 

4890 else: 

4891 if col_flags & 0x02: 

4892 col_sorting += ("nulls_first",) 

4893 if col_sorting: 

4894 sorting[idx_elements[col_index]] = col_sorting 

4895 if sorting: 

4896 index["column_sorting"] = sorting 

4897 if row["has_constraint"]: 

4898 index["duplicates_constraint"] = index_name 

4899 

4900 if row["reloptions"]: 

4901 dialect_options["postgresql_with"] = dict( 

4902 [ 

4903 option.split("=", 1) 

4904 for option in row["reloptions"] 

4905 ] 

4906 ) 

4907 # it *might* be nice to include that this is 'btree' in the 

4908 # reflection info. But we don't want an Index object 

4909 # to have a ``postgresql_using`` in it that is just the 

4910 # default, so for the moment leaving this out. 

4911 amname = row["amname"] 

4912 if amname != "btree": 

4913 dialect_options["postgresql_using"] = row["amname"] 

4914 if row["filter_definition"]: 

4915 dialect_options["postgresql_where"] = row[ 

4916 "filter_definition" 

4917 ] 

4918 if self.server_version_info >= (11,): 

4919 # NOTE: this is legacy, this is part of 

4920 # dialect_options now as of #7382 

4921 index["include_columns"] = inc_cols 

4922 dialect_options["postgresql_include"] = inc_cols 

4923 if row["indnullsnotdistinct"]: 

4924 # the default is False, so ignore it. 

4925 dialect_options["postgresql_nulls_not_distinct"] = row[ 

4926 "indnullsnotdistinct" 

4927 ] 

4928 

4929 if dialect_options: 

4930 index["dialect_options"] = dialect_options 

4931 

4932 table_indexes.append(index) 

4933 return indexes.items() 

4934 

4935 @reflection.cache 

4936 def get_unique_constraints( 

4937 self, connection, table_name, schema=None, **kw 

4938 ): 

4939 data = self.get_multi_unique_constraints( 

4940 connection, 

4941 schema=schema, 

4942 filter_names=[table_name], 

4943 scope=ObjectScope.ANY, 

4944 kind=ObjectKind.ANY, 

4945 **kw, 

4946 ) 

4947 return self._value_or_raise(data, table_name, schema) 

4948 

4949 def get_multi_unique_constraints( 

4950 self, 

4951 connection, 

4952 schema, 

4953 filter_names, 

4954 scope, 

4955 kind, 

4956 **kw, 

4957 ): 

4958 result = self._reflect_constraint( 

4959 connection, "u", schema, filter_names, scope, kind, **kw 

4960 ) 

4961 

4962 # each table can have multiple unique constraints 

4963 uniques = defaultdict(list) 

4964 default = ReflectionDefaults.unique_constraints 

4965 for table_name, cols, con_name, comment, options in result: 

4966 # ensure a list is created for each table. leave it empty if 

4967 # the table has no unique cosntraint 

4968 if con_name is None: 

4969 uniques[(schema, table_name)] = default() 

4970 continue 

4971 

4972 uc_dict = { 

4973 "column_names": cols, 

4974 "name": con_name, 

4975 "comment": comment, 

4976 } 

4977 if options: 

4978 uc_dict["dialect_options"] = options 

4979 

4980 uniques[(schema, table_name)].append(uc_dict) 

4981 return uniques.items() 

4982 

4983 @reflection.cache 

4984 def get_table_comment(self, connection, table_name, schema=None, **kw): 

4985 data = self.get_multi_table_comment( 

4986 connection, 

4987 schema, 

4988 [table_name], 

4989 scope=ObjectScope.ANY, 

4990 kind=ObjectKind.ANY, 

4991 **kw, 

4992 ) 

4993 return self._value_or_raise(data, table_name, schema) 

4994 

4995 @lru_cache() 

4996 def _comment_query(self, schema, has_filter_names, scope, kind): 

4997 relkinds = self._kind_to_relkinds(kind) 

4998 query = ( 

4999 select( 

5000 pg_catalog.pg_class.c.relname, 

5001 pg_catalog.pg_description.c.description, 

5002 ) 

5003 .select_from(pg_catalog.pg_class) 

5004 .outerjoin( 

5005 pg_catalog.pg_description, 

5006 sql.and_( 

5007 pg_catalog.pg_class.c.oid 

5008 == pg_catalog.pg_description.c.objoid, 

5009 pg_catalog.pg_description.c.objsubid == 0, 

5010 pg_catalog.pg_description.c.classoid 

5011 == sql.func.cast("pg_catalog.pg_class", REGCLASS), 

5012 ), 

5013 ) 

5014 .where(self._pg_class_relkind_condition(relkinds)) 

5015 ) 

5016 query = self._pg_class_filter_scope_schema(query, schema, scope) 

5017 if has_filter_names: 

5018 query = query.where( 

5019 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

5020 ) 

5021 return query 

5022 

5023 def get_multi_table_comment( 

5024 self, connection, schema, filter_names, scope, kind, **kw 

5025 ): 

5026 has_filter_names, params = self._prepare_filter_names(filter_names) 

5027 query = self._comment_query(schema, has_filter_names, scope, kind) 

5028 result = connection.execute(query, params) 

5029 

5030 default = ReflectionDefaults.table_comment 

5031 return ( 

5032 ( 

5033 (schema, table), 

5034 {"text": comment} if comment is not None else default(), 

5035 ) 

5036 for table, comment in result 

5037 ) 

5038 

5039 @reflection.cache 

5040 def get_check_constraints(self, connection, table_name, schema=None, **kw): 

5041 data = self.get_multi_check_constraints( 

5042 connection, 

5043 schema, 

5044 [table_name], 

5045 scope=ObjectScope.ANY, 

5046 kind=ObjectKind.ANY, 

5047 **kw, 

5048 ) 

5049 return self._value_or_raise(data, table_name, schema) 

5050 

5051 @lru_cache() 

5052 def _check_constraint_query(self, schema, has_filter_names, scope, kind): 

5053 relkinds = self._kind_to_relkinds(kind) 

5054 query = ( 

5055 select( 

5056 pg_catalog.pg_class.c.relname, 

5057 pg_catalog.pg_constraint.c.conname, 

5058 # NOTE: avoid calling pg_get_constraintdef when not needed 

5059 # to speed up the query 

5060 sql.case( 

5061 ( 

5062 pg_catalog.pg_constraint.c.oid.is_not(None), 

5063 pg_catalog.pg_get_constraintdef( 

5064 pg_catalog.pg_constraint.c.oid, True 

5065 ), 

5066 ), 

5067 else_=None, 

5068 ), 

5069 pg_catalog.pg_description.c.description, 

5070 ) 

5071 .select_from(pg_catalog.pg_class) 

5072 .outerjoin( 

5073 pg_catalog.pg_constraint, 

5074 sql.and_( 

5075 pg_catalog.pg_class.c.oid 

5076 == pg_catalog.pg_constraint.c.conrelid, 

5077 pg_catalog.pg_constraint.c.contype == "c", 

5078 ), 

5079 ) 

5080 .outerjoin( 

5081 pg_catalog.pg_description, 

5082 pg_catalog.pg_description.c.objoid 

5083 == pg_catalog.pg_constraint.c.oid, 

5084 ) 

5085 .order_by( 

5086 pg_catalog.pg_class.c.relname, 

5087 pg_catalog.pg_constraint.c.conname, 

5088 ) 

5089 .where(self._pg_class_relkind_condition(relkinds)) 

5090 ) 

5091 query = self._pg_class_filter_scope_schema(query, schema, scope) 

5092 if has_filter_names: 

5093 query = query.where( 

5094 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

5095 ) 

5096 return query 

5097 

5098 def get_multi_check_constraints( 

5099 self, connection, schema, filter_names, scope, kind, **kw 

5100 ): 

5101 has_filter_names, params = self._prepare_filter_names(filter_names) 

5102 query = self._check_constraint_query( 

5103 schema, has_filter_names, scope, kind 

5104 ) 

5105 result = connection.execute(query, params) 

5106 

5107 check_constraints = defaultdict(list) 

5108 default = ReflectionDefaults.check_constraints 

5109 for table_name, check_name, src, comment in result: 

5110 # only two cases for check_name and src: both null or both defined 

5111 if check_name is None and src is None: 

5112 check_constraints[(schema, table_name)] = default() 

5113 continue 

5114 # samples: 

5115 # "CHECK (((a > 1) AND (a < 5)))" 

5116 # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))" 

5117 # "CHECK (((a > 1) AND (a < 5))) NOT VALID" 

5118 # "CHECK (some_boolean_function(a))" 

5119 # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)" 

5120 # "CHECK (a NOT NULL) NO INHERIT" 

5121 # "CHECK (a NOT NULL) NO INHERIT NOT VALID" 

5122 

5123 m = re.match( 

5124 r"^CHECK *\((.+)\)( NO INHERIT)?( NOT VALID)?$", 

5125 src, 

5126 flags=re.DOTALL, 

5127 ) 

5128 if not m: 

5129 util.warn("Could not parse CHECK constraint text: %r" % src) 

5130 sqltext = "" 

5131 else: 

5132 sqltext = re.compile( 

5133 r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL 

5134 ).sub(r"\1", m.group(1)) 

5135 entry = { 

5136 "name": check_name, 

5137 "sqltext": sqltext, 

5138 "comment": comment, 

5139 } 

5140 if m: 

5141 do = {} 

5142 if " NOT VALID" in m.groups(): 

5143 do["not_valid"] = True 

5144 if " NO INHERIT" in m.groups(): 

5145 do["no_inherit"] = True 

5146 if do: 

5147 entry["dialect_options"] = do 

5148 

5149 check_constraints[(schema, table_name)].append(entry) 

5150 return check_constraints.items() 

5151 

5152 def _pg_type_filter_schema(self, query, schema): 

5153 if schema is None: 

5154 query = query.where( 

5155 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), 

5156 # ignore pg_catalog schema 

5157 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

5158 ) 

5159 elif schema != "*": 

5160 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

5161 return query 

5162 

5163 @lru_cache() 

5164 def _enum_query(self, schema): 

5165 lbl_agg_sq = ( 

5166 select( 

5167 pg_catalog.pg_enum.c.enumtypid, 

5168 sql.func.array_agg( 

5169 aggregate_order_by( 

5170 # NOTE: cast since some postgresql derivatives may 

5171 # not support array_agg on the name type 

5172 pg_catalog.pg_enum.c.enumlabel.cast(TEXT), 

5173 pg_catalog.pg_enum.c.enumsortorder, 

5174 ) 

5175 ).label("labels"), 

5176 ) 

5177 .group_by(pg_catalog.pg_enum.c.enumtypid) 

5178 .subquery("lbl_agg") 

5179 ) 

5180 

5181 query = ( 

5182 select( 

5183 pg_catalog.pg_type.c.typname.label("name"), 

5184 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( 

5185 "visible" 

5186 ), 

5187 pg_catalog.pg_namespace.c.nspname.label("schema"), 

5188 lbl_agg_sq.c.labels.label("labels"), 

5189 ) 

5190 .join( 

5191 pg_catalog.pg_namespace, 

5192 pg_catalog.pg_namespace.c.oid 

5193 == pg_catalog.pg_type.c.typnamespace, 

5194 ) 

5195 .outerjoin( 

5196 lbl_agg_sq, pg_catalog.pg_type.c.oid == lbl_agg_sq.c.enumtypid 

5197 ) 

5198 .where(pg_catalog.pg_type.c.typtype == "e") 

5199 .order_by( 

5200 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname 

5201 ) 

5202 ) 

5203 

5204 return self._pg_type_filter_schema(query, schema) 

5205 

5206 @reflection.cache 

5207 def _load_enums(self, connection, schema=None, **kw): 

5208 if not self.supports_native_enum: 

5209 return [] 

5210 

5211 result = connection.execute(self._enum_query(schema)) 

5212 

5213 enums = [] 

5214 for name, visible, schema, labels in result: 

5215 enums.append( 

5216 { 

5217 "name": name, 

5218 "schema": schema, 

5219 "visible": visible, 

5220 "labels": [] if labels is None else labels, 

5221 } 

5222 ) 

5223 return enums 

5224 

5225 @lru_cache() 

5226 def _domain_query(self, schema): 

5227 con_sq = ( 

5228 select( 

5229 pg_catalog.pg_constraint.c.contypid, 

5230 sql.func.array_agg( 

5231 pg_catalog.pg_get_constraintdef( 

5232 pg_catalog.pg_constraint.c.oid, True 

5233 ) 

5234 ).label("condefs"), 

5235 sql.func.array_agg( 

5236 # NOTE: cast since some postgresql derivatives may 

5237 # not support array_agg on the name type 

5238 pg_catalog.pg_constraint.c.conname.cast(TEXT) 

5239 ).label("connames"), 

5240 ) 

5241 # The domain this constraint is on; zero if not a domain constraint 

5242 .where(pg_catalog.pg_constraint.c.contypid != 0) 

5243 .group_by(pg_catalog.pg_constraint.c.contypid) 

5244 .subquery("domain_constraints") 

5245 ) 

5246 

5247 query = ( 

5248 select( 

5249 pg_catalog.pg_type.c.typname.label("name"), 

5250 pg_catalog.format_type( 

5251 pg_catalog.pg_type.c.typbasetype, 

5252 pg_catalog.pg_type.c.typtypmod, 

5253 ).label("attype"), 

5254 (~pg_catalog.pg_type.c.typnotnull).label("nullable"), 

5255 pg_catalog.pg_type.c.typdefault.label("default"), 

5256 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( 

5257 "visible" 

5258 ), 

5259 pg_catalog.pg_namespace.c.nspname.label("schema"), 

5260 con_sq.c.condefs, 

5261 con_sq.c.connames, 

5262 pg_catalog.pg_collation.c.collname, 

5263 ) 

5264 .join( 

5265 pg_catalog.pg_namespace, 

5266 pg_catalog.pg_namespace.c.oid 

5267 == pg_catalog.pg_type.c.typnamespace, 

5268 ) 

5269 .outerjoin( 

5270 pg_catalog.pg_collation, 

5271 pg_catalog.pg_type.c.typcollation 

5272 == pg_catalog.pg_collation.c.oid, 

5273 ) 

5274 .outerjoin( 

5275 con_sq, 

5276 pg_catalog.pg_type.c.oid == con_sq.c.contypid, 

5277 ) 

5278 .where(pg_catalog.pg_type.c.typtype == "d") 

5279 .order_by( 

5280 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname 

5281 ) 

5282 ) 

5283 return self._pg_type_filter_schema(query, schema) 

5284 

5285 @reflection.cache 

5286 def _load_domains(self, connection, schema=None, **kw): 

5287 result = connection.execute(self._domain_query(schema)) 

5288 

5289 domains: List[ReflectedDomain] = [] 

5290 for domain in result.mappings(): 

5291 # strip (30) from character varying(30) 

5292 attype = re.search(r"([^\(]+)", domain["attype"]).group(1) 

5293 constraints: List[ReflectedDomainConstraint] = [] 

5294 if domain["connames"]: 

5295 # When a domain has multiple CHECK constraints, they will 

5296 # be tested in alphabetical order by name. 

5297 sorted_constraints = sorted( 

5298 zip(domain["connames"], domain["condefs"]), 

5299 key=lambda t: t[0], 

5300 ) 

5301 for name, def_ in sorted_constraints: 

5302 # constraint is in the form "CHECK (expression)" 

5303 # or "NOT NULL". Ignore the "NOT NULL" and 

5304 # remove "CHECK (" and the tailing ")". 

5305 if def_.casefold().startswith("check"): 

5306 check = def_[7:-1] 

5307 constraints.append({"name": name, "check": check}) 

5308 domain_rec: ReflectedDomain = { 

5309 "name": domain["name"], 

5310 "schema": domain["schema"], 

5311 "visible": domain["visible"], 

5312 "type": attype, 

5313 "nullable": domain["nullable"], 

5314 "default": domain["default"], 

5315 "constraints": constraints, 

5316 "collation": domain["collname"], 

5317 } 

5318 domains.append(domain_rec) 

5319 

5320 return domains 

5321 

5322 def _set_backslash_escapes(self, connection): 

5323 # this method is provided as an override hook for descendant 

5324 # dialects (e.g. Redshift), so removing it may break them 

5325 std_string = connection.exec_driver_sql( 

5326 "show standard_conforming_strings" 

5327 ).scalar() 

5328 self._backslash_escapes = std_string == "off"