Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1452 statements  

1# dialects/postgresql/base.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9r""" 

10.. dialect:: postgresql 

11 :name: PostgreSQL 

12 :normal_support: 9.6+ 

13 :best_effort: 9+ 

14 

15.. _postgresql_sequences: 

16 

17Sequences/SERIAL/IDENTITY 

18------------------------- 

19 

20PostgreSQL supports sequences, and SQLAlchemy uses these as the default means 

21of creating new primary key values for integer-based primary key columns. When 

22creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for 

23integer-based primary key columns, which generates a sequence and server side 

24default corresponding to the column. 

25 

26To specify a specific named sequence to be used for primary key generation, 

27use the :func:`~sqlalchemy.schema.Sequence` construct:: 

28 

29 Table( 

30 "sometable", 

31 metadata, 

32 Column( 

33 "id", Integer, Sequence("some_id_seq", start=1), primary_key=True 

34 ), 

35 ) 

36 

37When SQLAlchemy issues a single INSERT statement, to fulfill the contract of 

38having the "last insert identifier" available, a RETURNING clause is added to 

39the INSERT statement which specifies the primary key columns should be 

40returned after the statement completes. The RETURNING functionality only takes 

41place if PostgreSQL 8.2 or later is in use. As a fallback approach, the 

42sequence, whether specified explicitly or implicitly via ``SERIAL``, is 

43executed independently beforehand, the returned value to be used in the 

44subsequent insert. Note that when an 

45:func:`~sqlalchemy.sql.expression.insert()` construct is executed using 

46"executemany" semantics, the "last inserted identifier" functionality does not 

47apply; no RETURNING clause is emitted nor is the sequence pre-executed in this 

48case. 

49 

50 

51PostgreSQL 10 and above IDENTITY columns 

52^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

53 

54PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use 

55of SERIAL. The :class:`_schema.Identity` construct in a 

56:class:`_schema.Column` can be used to control its behavior:: 

57 

58 from sqlalchemy import Table, Column, MetaData, Integer, Computed 

59 

60 metadata = MetaData() 

61 

62 data = Table( 

63 "data", 

64 metadata, 

65 Column( 

66 "id", Integer, Identity(start=42, cycle=True), primary_key=True 

67 ), 

68 Column("data", String), 

69 ) 

70 

71The CREATE TABLE for the above :class:`_schema.Table` object would be: 

72 

73.. sourcecode:: sql 

74 

75 CREATE TABLE data ( 

76 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE), 

77 data VARCHAR, 

78 PRIMARY KEY (id) 

79 ) 

80 

81.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct 

82 in a :class:`_schema.Column` to specify the option of an autoincrementing 

83 column. 

84 

85.. note:: 

86 

87 Previous versions of SQLAlchemy did not have built-in support for rendering 

88 of IDENTITY, and could use the following compilation hook to replace 

89 occurrences of SERIAL with IDENTITY:: 

90 

91 from sqlalchemy.schema import CreateColumn 

92 from sqlalchemy.ext.compiler import compiles 

93 

94 

95 @compiles(CreateColumn, "postgresql") 

96 def use_identity(element, compiler, **kw): 

97 text = compiler.visit_create_column(element, **kw) 

98 text = text.replace("SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY") 

99 return text 

100 

101 Using the above, a table such as:: 

102 

103 t = Table( 

104 "t", m, Column("id", Integer, primary_key=True), Column("data", String) 

105 ) 

106 

107 Will generate on the backing database as: 

108 

109 .. sourcecode:: sql 

110 

111 CREATE TABLE t ( 

112 id INT GENERATED BY DEFAULT AS IDENTITY, 

113 data VARCHAR, 

114 PRIMARY KEY (id) 

115 ) 

116 

117.. _postgresql_ss_cursors: 

118 

119Server Side Cursors 

120------------------- 

121 

122Server-side cursor support is available for the psycopg2, asyncpg 

123dialects and may also be available in others. 

124 

125Server side cursors are enabled on a per-statement basis by using the 

126:paramref:`.Connection.execution_options.stream_results` connection execution 

127option:: 

128 

129 with engine.connect() as conn: 

130 result = conn.execution_options(stream_results=True).execute( 

131 text("select * from table") 

132 ) 

133 

134Note that some kinds of SQL statements may not be supported with 

135server side cursors; generally, only SQL statements that return rows should be 

136used with this option. 

137 

138.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated 

139 and will be removed in a future release. Please use the 

140 :paramref:`_engine.Connection.stream_results` execution option for 

141 unbuffered cursor support. 

142 

143.. seealso:: 

144 

145 :ref:`engine_stream_results` 

146 

147.. _postgresql_isolation_level: 

148 

149Transaction Isolation Level 

150--------------------------- 

151 

152Most SQLAlchemy dialects support setting of transaction isolation level 

153using the :paramref:`_sa.create_engine.isolation_level` parameter 

154at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection` 

155level via the :paramref:`.Connection.execution_options.isolation_level` 

156parameter. 

157 

158For PostgreSQL dialects, this feature works either by making use of the 

159DBAPI-specific features, such as psycopg2's isolation level flags which will 

160embed the isolation level setting inline with the ``"BEGIN"`` statement, or for 

161DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS 

162TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement 

163emitted by the DBAPI. For the special AUTOCOMMIT isolation level, 

164DBAPI-specific techniques are used which is typically an ``.autocommit`` 

165flag on the DBAPI connection object. 

166 

167To set isolation level using :func:`_sa.create_engine`:: 

168 

169 engine = create_engine( 

170 "postgresql+pg8000://scott:tiger@localhost/test", 

171 isolation_level="REPEATABLE READ", 

172 ) 

173 

174To set using per-connection execution options:: 

175 

176 with engine.connect() as conn: 

177 conn = conn.execution_options(isolation_level="REPEATABLE READ") 

178 with conn.begin(): 

179 ... # work with transaction 

180 

181There are also more options for isolation level configurations, such as 

182"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply 

183different isolation level settings. See the discussion at 

184:ref:`dbapi_autocommit` for background. 

185 

186Valid values for ``isolation_level`` on most PostgreSQL dialects include: 

187 

188* ``READ COMMITTED`` 

189* ``READ UNCOMMITTED`` 

190* ``REPEATABLE READ`` 

191* ``SERIALIZABLE`` 

192* ``AUTOCOMMIT`` 

193 

194.. seealso:: 

195 

196 :ref:`dbapi_autocommit` 

197 

198 :ref:`postgresql_readonly_deferrable` 

199 

200 :ref:`psycopg2_isolation_level` 

201 

202 :ref:`pg8000_isolation_level` 

203 

204.. _postgresql_readonly_deferrable: 

205 

206Setting READ ONLY / DEFERRABLE 

207------------------------------ 

208 

209Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE" 

210characteristics of the transaction, which is in addition to the isolation level 

211setting. These two attributes can be established either in conjunction with or 

212independently of the isolation level by passing the ``postgresql_readonly`` and 

213``postgresql_deferrable`` flags with 

214:meth:`_engine.Connection.execution_options`. The example below illustrates 

215passing the ``"SERIALIZABLE"`` isolation level at the same time as setting 

216"READ ONLY" and "DEFERRABLE":: 

217 

218 with engine.connect() as conn: 

219 conn = conn.execution_options( 

220 isolation_level="SERIALIZABLE", 

221 postgresql_readonly=True, 

222 postgresql_deferrable=True, 

223 ) 

224 with conn.begin(): 

225 ... # work with transaction 

226 

227Note that some DBAPIs such as asyncpg only support "readonly" with 

228SERIALIZABLE isolation. 

229 

230.. versionadded:: 1.4 added support for the ``postgresql_readonly`` 

231 and ``postgresql_deferrable`` execution options. 

232 

233.. _postgresql_reset_on_return: 

234 

235Temporary Table / Resource Reset for Connection Pooling 

236------------------------------------------------------- 

237 

238The :class:`.QueuePool` connection pool implementation used 

239by the SQLAlchemy :class:`.Engine` object includes 

240:ref:`reset on return <pool_reset_on_return>` behavior that will invoke 

241the DBAPI ``.rollback()`` method when connections are returned to the pool. 

242While this rollback will clear out the immediate state used by the previous 

243transaction, it does not cover a wider range of session-level state, including 

244temporary tables as well as other server state such as prepared statement 

245handles and statement caches. The PostgreSQL database includes a variety 

246of commands which may be used to reset this state, including 

247``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``. 

248 

249 

250To install 

251one or more of these commands as the means of performing reset-on-return, 

252the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated 

253in the example below. The implementation 

254will end transactions in progress as well as discard temporary tables 

255using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL 

256documentation for background on what each of these statements do. 

257 

258The :paramref:`_sa.create_engine.pool_reset_on_return` parameter 

259is set to ``None`` so that the custom scheme can replace the default behavior 

260completely. The custom hook implementation calls ``.rollback()`` in any case, 

261as it's usually important that the DBAPI's own tracking of commit/rollback 

262will remain consistent with the state of the transaction:: 

263 

264 

265 from sqlalchemy import create_engine 

266 from sqlalchemy import event 

267 

268 postgresql_engine = create_engine( 

269 "postgresql+psycopg2://scott:tiger@hostname/dbname", 

270 # disable default reset-on-return scheme 

271 pool_reset_on_return=None, 

272 ) 

273 

274 

275 @event.listens_for(postgresql_engine, "reset") 

276 def _reset_postgresql(dbapi_connection, connection_record, reset_state): 

277 if not reset_state.terminate_only: 

278 dbapi_connection.execute("CLOSE ALL") 

279 dbapi_connection.execute("RESET ALL") 

280 dbapi_connection.execute("DISCARD TEMP") 

281 

282 # so that the DBAPI itself knows that the connection has been 

283 # reset 

284 dbapi_connection.rollback() 

285 

286.. versionchanged:: 2.0.0b3 Added additional state arguments to 

287 the :meth:`.PoolEvents.reset` event and additionally ensured the event 

288 is invoked for all "reset" occurrences, so that it's appropriate 

289 as a place for custom "reset" handlers. Previous schemes which 

290 use the :meth:`.PoolEvents.checkin` handler remain usable as well. 

291 

292.. seealso:: 

293 

294 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation 

295 

296.. _postgresql_alternate_search_path: 

297 

298Setting Alternate Search Paths on Connect 

299------------------------------------------ 

300 

301The PostgreSQL ``search_path`` variable refers to the list of schema names 

302that will be implicitly referenced when a particular table or other 

303object is referenced in a SQL statement. As detailed in the next section 

304:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around 

305the concept of keeping this variable at its default value of ``public``, 

306however, in order to have it set to any arbitrary name or names when connections 

307are used automatically, the "SET SESSION search_path" command may be invoked 

308for all connections in a pool using the following event handler, as discussed 

309at :ref:`schema_set_default_connections`:: 

310 

311 from sqlalchemy import event 

312 from sqlalchemy import create_engine 

313 

314 engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname") 

315 

316 

317 @event.listens_for(engine, "connect", insert=True) 

318 def set_search_path(dbapi_connection, connection_record): 

319 existing_autocommit = dbapi_connection.autocommit 

320 dbapi_connection.autocommit = True 

321 cursor = dbapi_connection.cursor() 

322 cursor.execute("SET SESSION search_path='%s'" % schema_name) 

323 cursor.close() 

324 dbapi_connection.autocommit = existing_autocommit 

325 

326The reason the recipe is complicated by use of the ``.autocommit`` DBAPI 

327attribute is so that when the ``SET SESSION search_path`` directive is invoked, 

328it is invoked outside of the scope of any transaction and therefore will not 

329be reverted when the DBAPI connection has a rollback. 

330 

331.. seealso:: 

332 

333 :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation 

334 

335.. _postgresql_schema_reflection: 

336 

337Remote-Schema Table Introspection and PostgreSQL search_path 

338------------------------------------------------------------ 

339 

340.. admonition:: Section Best Practices Summarized 

341 

342 keep the ``search_path`` variable set to its default of ``public``, without 

343 any other schema names. Ensure the username used to connect **does not** 

344 match remote schemas, or ensure the ``"$user"`` token is **removed** from 

345 ``search_path``. For other schema names, name these explicitly 

346 within :class:`_schema.Table` definitions. Alternatively, the 

347 ``postgresql_ignore_search_path`` option will cause all reflected 

348 :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema` 

349 attribute set up. 

350 

351The PostgreSQL dialect can reflect tables from any schema, as outlined in 

352:ref:`metadata_reflection_schemas`. 

353 

354In all cases, the first thing SQLAlchemy does when reflecting tables is 

355to **determine the default schema for the current database connection**. 

356It does this using the PostgreSQL ``current_schema()`` 

357function, illustated below using a PostgreSQL client session (i.e. using 

358the ``psql`` tool): 

359 

360.. sourcecode:: sql 

361 

362 test=> select current_schema(); 

363 current_schema 

364 ---------------- 

365 public 

366 (1 row) 

367 

368Above we see that on a plain install of PostgreSQL, the default schema name 

369is the name ``public``. 

370 

371However, if your database username **matches the name of a schema**, PostgreSQL's 

372default is to then **use that name as the default schema**. Below, we log in 

373using the username ``scott``. When we create a schema named ``scott``, **it 

374implicitly changes the default schema**: 

375 

376.. sourcecode:: sql 

377 

378 test=> select current_schema(); 

379 current_schema 

380 ---------------- 

381 public 

382 (1 row) 

383 

384 test=> create schema scott; 

385 CREATE SCHEMA 

386 test=> select current_schema(); 

387 current_schema 

388 ---------------- 

389 scott 

390 (1 row) 

391 

392The behavior of ``current_schema()`` is derived from the 

393`PostgreSQL search path 

394<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_ 

395variable ``search_path``, which in modern PostgreSQL versions defaults to this: 

396 

397.. sourcecode:: sql 

398 

399 test=> show search_path; 

400 search_path 

401 ----------------- 

402 "$user", public 

403 (1 row) 

404 

405Where above, the ``"$user"`` variable will inject the current username as the 

406default schema, if one exists. Otherwise, ``public`` is used. 

407 

408When a :class:`_schema.Table` object is reflected, if it is present in the 

409schema indicated by the ``current_schema()`` function, **the schema name assigned 

410to the ".schema" attribute of the Table is the Python "None" value**. Otherwise, the 

411".schema" attribute will be assigned the string name of that schema. 

412 

413With regards to tables which these :class:`_schema.Table` 

414objects refer to via foreign key constraint, a decision must be made as to how 

415the ``.schema`` is represented in those remote tables, in the case where that 

416remote schema name is also a member of the current ``search_path``. 

417 

418By default, the PostgreSQL dialect mimics the behavior encouraged by 

419PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function 

420returns a sample definition for a particular foreign key constraint, 

421omitting the referenced schema name from that definition when the name is 

422also in the PostgreSQL schema search path. The interaction below 

423illustrates this behavior: 

424 

425.. sourcecode:: sql 

426 

427 test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY); 

428 CREATE TABLE 

429 test=> CREATE TABLE referring( 

430 test(> id INTEGER PRIMARY KEY, 

431 test(> referred_id INTEGER REFERENCES test_schema.referred(id)); 

432 CREATE TABLE 

433 test=> SET search_path TO public, test_schema; 

434 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM 

435 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n 

436 test-> ON n.oid = c.relnamespace 

437 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid 

438 test-> WHERE c.relname='referring' AND r.contype = 'f' 

439 test-> ; 

440 pg_get_constraintdef 

441 --------------------------------------------------- 

442 FOREIGN KEY (referred_id) REFERENCES referred(id) 

443 (1 row) 

444 

445Above, we created a table ``referred`` as a member of the remote schema 

446``test_schema``, however when we added ``test_schema`` to the 

447PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the 

448``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of 

449the function. 

450 

451On the other hand, if we set the search path back to the typical default 

452of ``public``: 

453 

454.. sourcecode:: sql 

455 

456 test=> SET search_path TO public; 

457 SET 

458 

459The same query against ``pg_get_constraintdef()`` now returns the fully 

460schema-qualified name for us: 

461 

462.. sourcecode:: sql 

463 

464 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM 

465 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n 

466 test-> ON n.oid = c.relnamespace 

467 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid 

468 test-> WHERE c.relname='referring' AND r.contype = 'f'; 

469 pg_get_constraintdef 

470 --------------------------------------------------------------- 

471 FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id) 

472 (1 row) 

473 

474SQLAlchemy will by default use the return value of ``pg_get_constraintdef()`` 

475in order to determine the remote schema name. That is, if our ``search_path`` 

476were set to include ``test_schema``, and we invoked a table 

477reflection process as follows:: 

478 

479 >>> from sqlalchemy import Table, MetaData, create_engine, text 

480 >>> engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

481 >>> with engine.connect() as conn: 

482 ... conn.execute(text("SET search_path TO test_schema, public")) 

483 ... metadata_obj = MetaData() 

484 ... referring = Table("referring", metadata_obj, autoload_with=conn) 

485 <sqlalchemy.engine.result.CursorResult object at 0x101612ed0> 

486 

487The above process would deliver to the :attr:`_schema.MetaData.tables` 

488collection 

489``referred`` table named **without** the schema:: 

490 

491 >>> metadata_obj.tables["referred"].schema is None 

492 True 

493 

494To alter the behavior of reflection such that the referred schema is 

495maintained regardless of the ``search_path`` setting, use the 

496``postgresql_ignore_search_path`` option, which can be specified as a 

497dialect-specific argument to both :class:`_schema.Table` as well as 

498:meth:`_schema.MetaData.reflect`:: 

499 

500 >>> with engine.connect() as conn: 

501 ... conn.execute(text("SET search_path TO test_schema, public")) 

502 ... metadata_obj = MetaData() 

503 ... referring = Table( 

504 ... "referring", 

505 ... metadata_obj, 

506 ... autoload_with=conn, 

507 ... postgresql_ignore_search_path=True, 

508 ... ) 

509 <sqlalchemy.engine.result.CursorResult object at 0x1016126d0> 

510 

511We will now have ``test_schema.referred`` stored as schema-qualified:: 

512 

513 >>> metadata_obj.tables["test_schema.referred"].schema 

514 'test_schema' 

515 

516.. sidebar:: Best Practices for PostgreSQL Schema reflection 

517 

518 The description of PostgreSQL schema reflection behavior is complex, and 

519 is the product of many years of dealing with widely varied use cases and 

520 user preferences. But in fact, there's no need to understand any of it if 

521 you just stick to the simplest use pattern: leave the ``search_path`` set 

522 to its default of ``public`` only, never refer to the name ``public`` as 

523 an explicit schema name otherwise, and refer to all other schema names 

524 explicitly when building up a :class:`_schema.Table` object. The options 

525 described here are only for those users who can't, or prefer not to, stay 

526 within these guidelines. 

527 

528.. seealso:: 

529 

530 :ref:`reflection_schema_qualified_interaction` - discussion of the issue 

531 from a backend-agnostic perspective 

532 

533 `The Schema Search Path 

534 <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_ 

535 - on the PostgreSQL website. 

536 

537INSERT/UPDATE...RETURNING 

538------------------------- 

539 

540The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and 

541``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default 

542for single-row INSERT statements in order to fetch newly generated 

543primary key identifiers. To specify an explicit ``RETURNING`` clause, 

544use the :meth:`._UpdateBase.returning` method on a per-statement basis:: 

545 

546 # INSERT..RETURNING 

547 result = ( 

548 table.insert().returning(table.c.col1, table.c.col2).values(name="foo") 

549 ) 

550 print(result.fetchall()) 

551 

552 # UPDATE..RETURNING 

553 result = ( 

554 table.update() 

555 .returning(table.c.col1, table.c.col2) 

556 .where(table.c.name == "foo") 

557 .values(name="bar") 

558 ) 

559 print(result.fetchall()) 

560 

561 # DELETE..RETURNING 

562 result = ( 

563 table.delete() 

564 .returning(table.c.col1, table.c.col2) 

565 .where(table.c.name == "foo") 

566 ) 

567 print(result.fetchall()) 

568 

569.. _postgresql_insert_on_conflict: 

570 

571INSERT...ON CONFLICT (Upsert) 

572------------------------------ 

573 

574Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of 

575rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A 

576candidate row will only be inserted if that row does not violate any unique 

577constraints. In the case of a unique constraint violation, a secondary action 

578can occur which can be either "DO UPDATE", indicating that the data in the 

579target row should be updated, or "DO NOTHING", which indicates to silently skip 

580this row. 

581 

582Conflicts are determined using existing unique constraints and indexes. These 

583constraints may be identified either using their name as stated in DDL, 

584or they may be inferred by stating the columns and conditions that comprise 

585the indexes. 

586 

587SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific 

588:func:`_postgresql.insert()` function, which provides 

589the generative methods :meth:`_postgresql.Insert.on_conflict_do_update` 

590and :meth:`~.postgresql.Insert.on_conflict_do_nothing`: 

591 

592.. sourcecode:: pycon+sql 

593 

594 >>> from sqlalchemy.dialects.postgresql import insert 

595 >>> insert_stmt = insert(my_table).values( 

596 ... id="some_existing_id", data="inserted value" 

597 ... ) 

598 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"]) 

599 >>> print(do_nothing_stmt) 

600 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

601 ON CONFLICT (id) DO NOTHING 

602 {stop} 

603 

604 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

605 ... constraint="pk_my_table", set_=dict(data="updated value") 

606 ... ) 

607 >>> print(do_update_stmt) 

608 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

609 ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s 

610 

611.. seealso:: 

612 

613 `INSERT .. ON CONFLICT 

614 <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_ 

615 - in the PostgreSQL documentation. 

616 

617Specifying the Target 

618^^^^^^^^^^^^^^^^^^^^^ 

619 

620Both methods supply the "target" of the conflict using either the 

621named constraint or by column inference: 

622 

623* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument 

624 specifies a sequence containing string column names, :class:`_schema.Column` 

625 objects, and/or SQL expression elements, which would identify a unique 

626 index: 

627 

628 .. sourcecode:: pycon+sql 

629 

630 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

631 ... index_elements=["id"], set_=dict(data="updated value") 

632 ... ) 

633 >>> print(do_update_stmt) 

634 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

635 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

636 {stop} 

637 

638 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

639 ... index_elements=[my_table.c.id], set_=dict(data="updated value") 

640 ... ) 

641 >>> print(do_update_stmt) 

642 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

643 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

644 

645* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to 

646 infer an index, a partial index can be inferred by also specifying the 

647 use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter: 

648 

649 .. sourcecode:: pycon+sql 

650 

651 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data") 

652 >>> stmt = stmt.on_conflict_do_update( 

653 ... index_elements=[my_table.c.user_email], 

654 ... index_where=my_table.c.user_email.like("%@gmail.com"), 

655 ... set_=dict(data=stmt.excluded.data), 

656 ... ) 

657 >>> print(stmt) 

658 {printsql}INSERT INTO my_table (data, user_email) 

659 VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email) 

660 WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data 

661 

662* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is 

663 used to specify an index directly rather than inferring it. This can be 

664 the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX: 

665 

666 .. sourcecode:: pycon+sql 

667 

668 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

669 ... constraint="my_table_idx_1", set_=dict(data="updated value") 

670 ... ) 

671 >>> print(do_update_stmt) 

672 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

673 ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s 

674 {stop} 

675 

676 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

677 ... constraint="my_table_pk", set_=dict(data="updated value") 

678 ... ) 

679 >>> print(do_update_stmt) 

680 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

681 ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s 

682 {stop} 

683 

684* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may 

685 also refer to a SQLAlchemy construct representing a constraint, 

686 e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`, 

687 :class:`.Index`, or :class:`.ExcludeConstraint`. In this use, 

688 if the constraint has a name, it is used directly. Otherwise, if the 

689 constraint is unnamed, then inference will be used, where the expressions 

690 and optional WHERE clause of the constraint will be spelled out in the 

691 construct. This use is especially convenient 

692 to refer to the named or unnamed primary key of a :class:`_schema.Table` 

693 using the 

694 :attr:`_schema.Table.primary_key` attribute: 

695 

696 .. sourcecode:: pycon+sql 

697 

698 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

699 ... constraint=my_table.primary_key, set_=dict(data="updated value") 

700 ... ) 

701 >>> print(do_update_stmt) 

702 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

703 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

704 

705The SET Clause 

706^^^^^^^^^^^^^^^ 

707 

708``ON CONFLICT...DO UPDATE`` is used to perform an update of the already 

709existing row, using any combination of new values as well as values 

710from the proposed insertion. These values are specified using the 

711:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This 

712parameter accepts a dictionary which consists of direct values 

713for UPDATE: 

714 

715.. sourcecode:: pycon+sql 

716 

717 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

718 >>> do_update_stmt = stmt.on_conflict_do_update( 

719 ... index_elements=["id"], set_=dict(data="updated value") 

720 ... ) 

721 >>> print(do_update_stmt) 

722 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

723 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

724 

725.. warning:: 

726 

727 The :meth:`_expression.Insert.on_conflict_do_update` 

728 method does **not** take into 

729 account Python-side default UPDATE values or generation functions, e.g. 

730 those specified using :paramref:`_schema.Column.onupdate`. 

731 These values will not be exercised for an ON CONFLICT style of UPDATE, 

732 unless they are manually specified in the 

733 :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary. 

734 

735Updating using the Excluded INSERT Values 

736^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

737 

738In order to refer to the proposed insertion row, the special alias 

739:attr:`~.postgresql.Insert.excluded` is available as an attribute on 

740the :class:`_postgresql.Insert` object; this object is a 

741:class:`_expression.ColumnCollection` 

742which alias contains all columns of the target 

743table: 

744 

745.. sourcecode:: pycon+sql 

746 

747 >>> stmt = insert(my_table).values( 

748 ... id="some_id", data="inserted value", author="jlh" 

749 ... ) 

750 >>> do_update_stmt = stmt.on_conflict_do_update( 

751 ... index_elements=["id"], 

752 ... set_=dict(data="updated value", author=stmt.excluded.author), 

753 ... ) 

754 >>> print(do_update_stmt) 

755 {printsql}INSERT INTO my_table (id, data, author) 

756 VALUES (%(id)s, %(data)s, %(author)s) 

757 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author 

758 

759Additional WHERE Criteria 

760^^^^^^^^^^^^^^^^^^^^^^^^^ 

761 

762The :meth:`_expression.Insert.on_conflict_do_update` method also accepts 

763a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where` 

764parameter, which will limit those rows which receive an UPDATE: 

765 

766.. sourcecode:: pycon+sql 

767 

768 >>> stmt = insert(my_table).values( 

769 ... id="some_id", data="inserted value", author="jlh" 

770 ... ) 

771 >>> on_update_stmt = stmt.on_conflict_do_update( 

772 ... index_elements=["id"], 

773 ... set_=dict(data="updated value", author=stmt.excluded.author), 

774 ... where=(my_table.c.status == 2), 

775 ... ) 

776 >>> print(on_update_stmt) 

777 {printsql}INSERT INTO my_table (id, data, author) 

778 VALUES (%(id)s, %(data)s, %(author)s) 

779 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author 

780 WHERE my_table.status = %(status_1)s 

781 

782Skipping Rows with DO NOTHING 

783^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

784 

785``ON CONFLICT`` may be used to skip inserting a row entirely 

786if any conflict with a unique or exclusion constraint occurs; below 

787this is illustrated using the 

788:meth:`~.postgresql.Insert.on_conflict_do_nothing` method: 

789 

790.. sourcecode:: pycon+sql 

791 

792 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

793 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) 

794 >>> print(stmt) 

795 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

796 ON CONFLICT (id) DO NOTHING 

797 

798If ``DO NOTHING`` is used without specifying any columns or constraint, 

799it has the effect of skipping the INSERT for any unique or exclusion 

800constraint violation which occurs: 

801 

802.. sourcecode:: pycon+sql 

803 

804 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

805 >>> stmt = stmt.on_conflict_do_nothing() 

806 >>> print(stmt) 

807 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

808 ON CONFLICT DO NOTHING 

809 

810.. _postgresql_match: 

811 

812Full Text Search 

813---------------- 

814 

815PostgreSQL's full text search system is available through the use of the 

816:data:`.func` namespace, combined with the use of custom operators 

817via the :meth:`.Operators.bool_op` method. For simple cases with some 

818degree of cross-backend compatibility, the :meth:`.Operators.match` operator 

819may also be used. 

820 

821.. _postgresql_simple_match: 

822 

823Simple plain text matching with ``match()`` 

824^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

825 

826The :meth:`.Operators.match` operator provides for cross-compatible simple 

827text matching. For the PostgreSQL backend, it's hardcoded to generate 

828an expression using the ``@@`` operator in conjunction with the 

829``plainto_tsquery()`` PostgreSQL function. 

830 

831On the PostgreSQL dialect, an expression like the following:: 

832 

833 select(sometable.c.text.match("search string")) 

834 

835would emit to the database: 

836 

837.. sourcecode:: sql 

838 

839 SELECT text @@ plainto_tsquery('search string') FROM table 

840 

841Above, passing a plain string to :meth:`.Operators.match` will automatically 

842make use of ``plainto_tsquery()`` to specify the type of tsquery. This 

843establishes basic database cross-compatibility for :meth:`.Operators.match` 

844with other backends. 

845 

846.. versionchanged:: 2.0 The default tsquery generation function used by the 

847 PostgreSQL dialect with :meth:`.Operators.match` is ``plainto_tsquery()``. 

848 

849 To render exactly what was rendered in 1.4, use the following form:: 

850 

851 from sqlalchemy import func 

852 

853 select(sometable.c.text.bool_op("@@")(func.to_tsquery("search string"))) 

854 

855 Which would emit: 

856 

857 .. sourcecode:: sql 

858 

859 SELECT text @@ to_tsquery('search string') FROM table 

860 

861Using PostgreSQL full text functions and operators directly 

862^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

863 

864Text search operations beyond the simple use of :meth:`.Operators.match` 

865may make use of the :data:`.func` namespace to generate PostgreSQL full-text 

866functions, in combination with :meth:`.Operators.bool_op` to generate 

867any boolean operator. 

868 

869For example, the query:: 

870 

871 select(func.to_tsquery("cat").bool_op("@>")(func.to_tsquery("cat & rat"))) 

872 

873would generate: 

874 

875.. sourcecode:: sql 

876 

877 SELECT to_tsquery('cat') @> to_tsquery('cat & rat') 

878 

879 

880The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST:: 

881 

882 from sqlalchemy.dialects.postgresql import TSVECTOR 

883 from sqlalchemy import select, cast 

884 

885 select(cast("some text", TSVECTOR)) 

886 

887produces a statement equivalent to: 

888 

889.. sourcecode:: sql 

890 

891 SELECT CAST('some text' AS TSVECTOR) AS anon_1 

892 

893The ``func`` namespace is augmented by the PostgreSQL dialect to set up 

894correct argument and return types for most full text search functions. 

895These functions are used automatically by the :attr:`_sql.func` namespace 

896assuming the ``sqlalchemy.dialects.postgresql`` package has been imported, 

897or :func:`_sa.create_engine` has been invoked using a ``postgresql`` 

898dialect. These functions are documented at: 

899 

900* :class:`_postgresql.to_tsvector` 

901* :class:`_postgresql.to_tsquery` 

902* :class:`_postgresql.plainto_tsquery` 

903* :class:`_postgresql.phraseto_tsquery` 

904* :class:`_postgresql.websearch_to_tsquery` 

905* :class:`_postgresql.ts_headline` 

906 

907Specifying the "regconfig" with ``match()`` or custom operators 

908^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

909 

910PostgreSQL's ``plainto_tsquery()`` function accepts an optional 

911"regconfig" argument that is used to instruct PostgreSQL to use a 

912particular pre-computed GIN or GiST index in order to perform the search. 

913When using :meth:`.Operators.match`, this additional parameter may be 

914specified using the ``postgresql_regconfig`` parameter, such as:: 

915 

916 select(mytable.c.id).where( 

917 mytable.c.title.match("somestring", postgresql_regconfig="english") 

918 ) 

919 

920Which would emit: 

921 

922.. sourcecode:: sql 

923 

924 SELECT mytable.id FROM mytable 

925 WHERE mytable.title @@ plainto_tsquery('english', 'somestring') 

926 

927When using other PostgreSQL search functions with :data:`.func`, the 

928"regconfig" parameter may be passed directly as the initial argument:: 

929 

930 select(mytable.c.id).where( 

931 func.to_tsvector("english", mytable.c.title).bool_op("@@")( 

932 func.to_tsquery("english", "somestring") 

933 ) 

934 ) 

935 

936produces a statement equivalent to: 

937 

938.. sourcecode:: sql 

939 

940 SELECT mytable.id FROM mytable 

941 WHERE to_tsvector('english', mytable.title) @@ 

942 to_tsquery('english', 'somestring') 

943 

944It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from 

945PostgreSQL to ensure that you are generating queries with SQLAlchemy that 

946take full advantage of any indexes you may have created for full text search. 

947 

948.. seealso:: 

949 

950 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation 

951 

952 

953FROM ONLY ... 

954------------- 

955 

956The dialect supports PostgreSQL's ONLY keyword for targeting only a particular 

957table in an inheritance hierarchy. This can be used to produce the 

958``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...`` 

959syntaxes. It uses SQLAlchemy's hints mechanism:: 

960 

961 # SELECT ... FROM ONLY ... 

962 result = table.select().with_hint(table, "ONLY", "postgresql") 

963 print(result.fetchall()) 

964 

965 # UPDATE ONLY ... 

966 table.update(values=dict(foo="bar")).with_hint( 

967 "ONLY", dialect_name="postgresql" 

968 ) 

969 

970 # DELETE FROM ONLY ... 

971 table.delete().with_hint("ONLY", dialect_name="postgresql") 

972 

973.. _postgresql_indexes: 

974 

975PostgreSQL-Specific Index Options 

976--------------------------------- 

977 

978Several extensions to the :class:`.Index` construct are available, specific 

979to the PostgreSQL dialect. 

980 

981.. _postgresql_covering_indexes: 

982 

983Covering Indexes 

984^^^^^^^^^^^^^^^^ 

985 

986A covering index includes additional columns that are not part of the index key 

987but are stored in the index, allowing PostgreSQL to satisfy queries using only 

988the index without accessing the table (an "index-only scan"). This is 

989indicated on the index using the ``INCLUDE`` clause. The 

990``postgresql_include`` option for :class:`.Index` (as well as 

991:class:`.UniqueConstraint`) renders ``INCLUDE(colname)`` for the given string 

992names:: 

993 

994 Index("my_index", table.c.x, postgresql_include=["y"]) 

995 

996would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` 

997 

998Note that this feature requires PostgreSQL 11 or later. 

999 

1000.. seealso:: 

1001 

1002 :ref:`postgresql_constraint_options_include` - the same feature implemented 

1003 for :class:`.UniqueConstraint` 

1004 

1005.. versionadded:: 1.4 - support for covering indexes with :class:`.Index`. 

1006 support for :class:`.UniqueConstraint` was in 2.0.41 

1007 

1008.. _postgresql_partial_indexes: 

1009 

1010Partial Indexes 

1011^^^^^^^^^^^^^^^ 

1012 

1013Partial indexes add criterion to the index definition so that the index is 

1014applied to a subset of rows. These can be specified on :class:`.Index` 

1015using the ``postgresql_where`` keyword argument:: 

1016 

1017 Index("my_index", my_table.c.id, postgresql_where=my_table.c.value > 10) 

1018 

1019.. _postgresql_operator_classes: 

1020 

1021Operator Classes 

1022^^^^^^^^^^^^^^^^ 

1023 

1024PostgreSQL allows the specification of an *operator class* for each column of 

1025an index (see 

1026https://www.postgresql.org/docs/current/interactive/indexes-opclass.html). 

1027The :class:`.Index` construct allows these to be specified via the 

1028``postgresql_ops`` keyword argument:: 

1029 

1030 Index( 

1031 "my_index", 

1032 my_table.c.id, 

1033 my_table.c.data, 

1034 postgresql_ops={"data": "text_pattern_ops", "id": "int4_ops"}, 

1035 ) 

1036 

1037Note that the keys in the ``postgresql_ops`` dictionaries are the 

1038"key" name of the :class:`_schema.Column`, i.e. the name used to access it from 

1039the ``.c`` collection of :class:`_schema.Table`, which can be configured to be 

1040different than the actual name of the column as expressed in the database. 

1041 

1042If ``postgresql_ops`` is to be used against a complex SQL expression such 

1043as a function call, then to apply to the column it must be given a label 

1044that is identified in the dictionary by name, e.g.:: 

1045 

1046 Index( 

1047 "my_index", 

1048 my_table.c.id, 

1049 func.lower(my_table.c.data).label("data_lower"), 

1050 postgresql_ops={"data_lower": "text_pattern_ops", "id": "int4_ops"}, 

1051 ) 

1052 

1053Operator classes are also supported by the 

1054:class:`_postgresql.ExcludeConstraint` construct using the 

1055:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for 

1056details. 

1057 

1058.. versionadded:: 1.3.21 added support for operator classes with 

1059 :class:`_postgresql.ExcludeConstraint`. 

1060 

1061 

1062Index Types 

1063^^^^^^^^^^^ 

1064 

1065PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well 

1066as the ability for users to create their own (see 

1067https://www.postgresql.org/docs/current/static/indexes-types.html). These can be 

1068specified on :class:`.Index` using the ``postgresql_using`` keyword argument:: 

1069 

1070 Index("my_index", my_table.c.data, postgresql_using="gin") 

1071 

1072The value passed to the keyword argument will be simply passed through to the 

1073underlying CREATE INDEX command, so it *must* be a valid index type for your 

1074version of PostgreSQL. 

1075 

1076.. _postgresql_index_storage: 

1077 

1078Index Storage Parameters 

1079^^^^^^^^^^^^^^^^^^^^^^^^ 

1080 

1081PostgreSQL allows storage parameters to be set on indexes. The storage 

1082parameters available depend on the index method used by the index. Storage 

1083parameters can be specified on :class:`.Index` using the ``postgresql_with`` 

1084keyword argument:: 

1085 

1086 Index("my_index", my_table.c.data, postgresql_with={"fillfactor": 50}) 

1087 

1088PostgreSQL allows to define the tablespace in which to create the index. 

1089The tablespace can be specified on :class:`.Index` using the 

1090``postgresql_tablespace`` keyword argument:: 

1091 

1092 Index("my_index", my_table.c.data, postgresql_tablespace="my_tablespace") 

1093 

1094Note that the same option is available on :class:`_schema.Table` as well. 

1095 

1096.. _postgresql_index_concurrently: 

1097 

1098Indexes with CONCURRENTLY 

1099^^^^^^^^^^^^^^^^^^^^^^^^^ 

1100 

1101The PostgreSQL index option CONCURRENTLY is supported by passing the 

1102flag ``postgresql_concurrently`` to the :class:`.Index` construct:: 

1103 

1104 tbl = Table("testtbl", m, Column("data", Integer)) 

1105 

1106 idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True) 

1107 

1108The above index construct will render DDL for CREATE INDEX, assuming 

1109PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as: 

1110 

1111.. sourcecode:: sql 

1112 

1113 CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data) 

1114 

1115For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for 

1116a connection-less dialect, it will emit: 

1117 

1118.. sourcecode:: sql 

1119 

1120 DROP INDEX CONCURRENTLY test_idx1 

1121 

1122When using CONCURRENTLY, the PostgreSQL database requires that the statement 

1123be invoked outside of a transaction block. The Python DBAPI enforces that 

1124even for a single statement, a transaction is present, so to use this 

1125construct, the DBAPI's "autocommit" mode must be used:: 

1126 

1127 metadata = MetaData() 

1128 table = Table("foo", metadata, Column("id", String)) 

1129 index = Index("foo_idx", table.c.id, postgresql_concurrently=True) 

1130 

1131 with engine.connect() as conn: 

1132 with conn.execution_options(isolation_level="AUTOCOMMIT"): 

1133 table.create(conn) 

1134 

1135.. seealso:: 

1136 

1137 :ref:`postgresql_isolation_level` 

1138 

1139.. _postgresql_index_reflection: 

1140 

1141PostgreSQL Index Reflection 

1142--------------------------- 

1143 

1144The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the 

1145UNIQUE CONSTRAINT construct is used. When inspecting a table using 

1146:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes` 

1147and the :meth:`_reflection.Inspector.get_unique_constraints` 

1148will report on these 

1149two constructs distinctly; in the case of the index, the key 

1150``duplicates_constraint`` will be present in the index entry if it is 

1151detected as mirroring a constraint. When performing reflection using 

1152``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned 

1153in :attr:`_schema.Table.indexes` when it is detected as mirroring a 

1154:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection 

1155. 

1156 

1157Special Reflection Options 

1158-------------------------- 

1159 

1160The :class:`_reflection.Inspector` 

1161used for the PostgreSQL backend is an instance 

1162of :class:`.PGInspector`, which offers additional methods:: 

1163 

1164 from sqlalchemy import create_engine, inspect 

1165 

1166 engine = create_engine("postgresql+psycopg2://localhost/test") 

1167 insp = inspect(engine) # will be a PGInspector 

1168 

1169 print(insp.get_enums()) 

1170 

1171.. autoclass:: PGInspector 

1172 :members: 

1173 

1174.. _postgresql_table_options: 

1175 

1176PostgreSQL Table Options 

1177------------------------ 

1178 

1179Several options for CREATE TABLE are supported directly by the PostgreSQL 

1180dialect in conjunction with the :class:`_schema.Table` construct, listed in 

1181the following sections. 

1182 

1183.. seealso:: 

1184 

1185 `PostgreSQL CREATE TABLE options 

1186 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ - 

1187 in the PostgreSQL documentation. 

1188 

1189``INHERITS`` 

1190^^^^^^^^^^^^ 

1191 

1192Specifies one or more parent tables from which this table inherits columns and 

1193constraints, enabling table inheritance hierarchies in PostgreSQL. 

1194 

1195:: 

1196 

1197 Table("some_table", metadata, ..., postgresql_inherits="some_supertable") 

1198 

1199 Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...)) 

1200 

1201``ON COMMIT`` 

1202^^^^^^^^^^^^^ 

1203 

1204Controls the behavior of temporary tables at transaction commit, with options 

1205to preserve rows, delete rows, or drop the table. 

1206 

1207:: 

1208 

1209 Table("some_table", metadata, ..., postgresql_on_commit="PRESERVE ROWS") 

1210 

1211``PARTITION BY`` 

1212^^^^^^^^^^^^^^^^ 

1213 

1214Declares the table as a partitioned table using the specified partitioning 

1215strategy (RANGE, LIST, or HASH) on the given column(s). 

1216 

1217:: 

1218 

1219 Table( 

1220 "some_table", 

1221 metadata, 

1222 ..., 

1223 postgresql_partition_by="LIST (part_column)", 

1224 ) 

1225 

1226``TABLESPACE`` 

1227^^^^^^^^^^^^^^ 

1228 

1229Specifies the tablespace where the table will be stored, allowing control over 

1230the physical location of table data on disk. 

1231 

1232:: 

1233 

1234 Table("some_table", metadata, ..., postgresql_tablespace="some_tablespace") 

1235 

1236The above option is also available on the :class:`.Index` construct. 

1237 

1238``USING`` 

1239^^^^^^^^^ 

1240 

1241Specifies the table access method to use for storing table data, such as 

1242``heap`` (the default) or other custom access methods. 

1243 

1244:: 

1245 

1246 Table("some_table", metadata, ..., postgresql_using="heap") 

1247 

1248.. versionadded:: 2.0.26 

1249 

1250``WITH OIDS`` 

1251^^^^^^^^^^^^^ 

1252 

1253Enables the legacy OID (object identifier) system column for the table, which 

1254assigns a unique identifier to each row. 

1255 

1256:: 

1257 

1258 Table("some_table", metadata, ..., postgresql_with_oids=True) 

1259 

1260``WITHOUT OIDS`` 

1261^^^^^^^^^^^^^^^^ 

1262 

1263Explicitly disables the OID system column for the table (the default behavior 

1264in modern PostgreSQL versions). 

1265 

1266:: 

1267 

1268 Table("some_table", metadata, ..., postgresql_with_oids=False) 

1269 

1270.. _postgresql_constraint_options: 

1271 

1272PostgreSQL Constraint Options 

1273----------------------------- 

1274 

1275The following sections indicate options which are supported by the PostgreSQL 

1276dialect in conjunction with selected constraint constructs. 

1277 

1278 

1279``NOT VALID`` 

1280^^^^^^^^^^^^^ 

1281 

1282Allows a constraint to be added without validating existing rows, improving 

1283performance when adding constraints to large tables. This option applies 

1284towards CHECK and FOREIGN KEY constraints when the constraint is being added 

1285to an existing table via ALTER TABLE, and has the effect that existing rows 

1286are not scanned during the ALTER operation against the constraint being added. 

1287 

1288When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_ 

1289that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument 

1290may be specified as an additional keyword argument within the operation 

1291that creates the constraint, as in the following Alembic example:: 

1292 

1293 def update(): 

1294 op.create_foreign_key( 

1295 "fk_user_address", 

1296 "address", 

1297 "user", 

1298 ["user_id"], 

1299 ["id"], 

1300 postgresql_not_valid=True, 

1301 ) 

1302 

1303The keyword is ultimately accepted directly by the 

1304:class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint` 

1305and :class:`_schema.ForeignKey` constructs; when using a tool like 

1306Alembic, dialect-specific keyword arguments are passed through to 

1307these constructs from the migration operation directives:: 

1308 

1309 CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True) 

1310 

1311 ForeignKeyConstraint( 

1312 ["some_id"], ["some_table.some_id"], postgresql_not_valid=True 

1313 ) 

1314 

1315.. versionadded:: 1.4.32 

1316 

1317.. seealso:: 

1318 

1319 `PostgreSQL ALTER TABLE options 

1320 <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ - 

1321 in the PostgreSQL documentation. 

1322 

1323.. _postgresql_constraint_options_include: 

1324 

1325``INCLUDE`` 

1326^^^^^^^^^^^ 

1327 

1328This keyword is applicable to both a ``UNIQUE`` constraint as well as an 

1329``INDEX``. The ``postgresql_include`` option available for 

1330:class:`.UniqueConstraint` as well as :class:`.Index` creates a covering index 

1331by including additional columns in the underlying index without making them 

1332part of the key constraint. This option adds one or more columns as a "payload" 

1333to the index created automatically by PostgreSQL for the constraint. For 

1334example, the following table definition:: 

1335 

1336 Table( 

1337 "mytable", 

1338 metadata, 

1339 Column("id", Integer, nullable=False), 

1340 Column("value", Integer, nullable=False), 

1341 UniqueConstraint("id", postgresql_include=["value"]), 

1342 ) 

1343 

1344would produce the DDL statement 

1345 

1346.. sourcecode:: sql 

1347 

1348 CREATE TABLE mytable ( 

1349 id INTEGER NOT NULL, 

1350 value INTEGER NOT NULL, 

1351 UNIQUE (id) INCLUDE (value) 

1352 ) 

1353 

1354Note that this feature requires PostgreSQL 11 or later. 

1355 

1356.. versionadded:: 2.0.41 - added support for ``postgresql_include`` to 

1357 :class:`.UniqueConstraint`, to complement the existing feature in 

1358 :class:`.Index`. 

1359 

1360.. seealso:: 

1361 

1362 :ref:`postgresql_covering_indexes` - background on ``postgresql_include`` 

1363 for the :class:`.Index` construct. 

1364 

1365 

1366Column list with foreign key ``ON DELETE SET`` actions 

1367^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

1368 

1369Allows selective column updates when a foreign key action is triggered, limiting 

1370which columns are set to NULL or DEFAULT upon deletion of a referenced row. 

1371This applies to :class:`.ForeignKey` and :class:`.ForeignKeyConstraint`, the 

1372:paramref:`.ForeignKey.ondelete` parameter will accept on the PostgreSQL 

1373backend only a string list of column names inside parenthesis, following the 

1374``SET NULL`` or ``SET DEFAULT`` phrases, which will limit the set of columns 

1375that are subject to the action:: 

1376 

1377 fktable = Table( 

1378 "fktable", 

1379 metadata, 

1380 Column("tid", Integer), 

1381 Column("id", Integer), 

1382 Column("fk_id_del_set_null", Integer), 

1383 ForeignKeyConstraint( 

1384 columns=["tid", "fk_id_del_set_null"], 

1385 refcolumns=[pktable.c.tid, pktable.c.id], 

1386 ondelete="SET NULL (fk_id_del_set_null)", 

1387 ), 

1388 ) 

1389 

1390.. versionadded:: 2.0.40 

1391 

1392 

1393.. _postgresql_table_valued_overview: 

1394 

1395Table values, Table and Column valued functions, Row and Tuple objects 

1396----------------------------------------------------------------------- 

1397 

1398PostgreSQL makes great use of modern SQL forms such as table-valued functions, 

1399tables and rows as values. These constructs are commonly used as part 

1400of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other 

1401datatypes. SQLAlchemy's SQL expression language has native support for 

1402most table-valued and row-valued forms. 

1403 

1404.. _postgresql_table_valued: 

1405 

1406Table-Valued Functions 

1407^^^^^^^^^^^^^^^^^^^^^^^ 

1408 

1409Many PostgreSQL built-in functions are intended to be used in the FROM clause 

1410of a SELECT statement, and are capable of returning table rows or sets of table 

1411rows. A large portion of PostgreSQL's JSON functions for example such as 

1412``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``, 

1413``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such 

1414forms. These classes of SQL function calling forms in SQLAlchemy are available 

1415using the :meth:`_functions.FunctionElement.table_valued` method in conjunction 

1416with :class:`_functions.Function` objects generated from the :data:`_sql.func` 

1417namespace. 

1418 

1419Examples from PostgreSQL's reference documentation follow below: 

1420 

1421* ``json_each()``: 

1422 

1423 .. sourcecode:: pycon+sql 

1424 

1425 >>> from sqlalchemy import select, func 

1426 >>> stmt = select( 

1427 ... func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value") 

1428 ... ) 

1429 >>> print(stmt) 

1430 {printsql}SELECT anon_1.key, anon_1.value 

1431 FROM json_each(:json_each_1) AS anon_1 

1432 

1433* ``json_populate_record()``: 

1434 

1435 .. sourcecode:: pycon+sql 

1436 

1437 >>> from sqlalchemy import select, func, literal_column 

1438 >>> stmt = select( 

1439 ... func.json_populate_record( 

1440 ... literal_column("null::myrowtype"), '{"a":1,"b":2}' 

1441 ... ).table_valued("a", "b", name="x") 

1442 ... ) 

1443 >>> print(stmt) 

1444 {printsql}SELECT x.a, x.b 

1445 FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x 

1446 

1447* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived 

1448 columns in the alias, where we may make use of :func:`_sql.column` elements with 

1449 types to produce them. The :meth:`_functions.FunctionElement.table_valued` 

1450 method produces a :class:`_sql.TableValuedAlias` construct, and the method 

1451 :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived 

1452 columns specification: 

1453 

1454 .. sourcecode:: pycon+sql 

1455 

1456 >>> from sqlalchemy import select, func, column, Integer, Text 

1457 >>> stmt = select( 

1458 ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}') 

1459 ... .table_valued( 

1460 ... column("a", Integer), 

1461 ... column("b", Text), 

1462 ... column("d", Text), 

1463 ... ) 

1464 ... .render_derived(name="x", with_types=True) 

1465 ... ) 

1466 >>> print(stmt) 

1467 {printsql}SELECT x.a, x.b, x.d 

1468 FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT) 

1469 

1470* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an 

1471 ordinal counter to the output of a function and is accepted by a limited set 

1472 of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The 

1473 :meth:`_functions.FunctionElement.table_valued` method accepts a keyword 

1474 parameter ``with_ordinality`` for this purpose, which accepts the string name 

1475 that will be applied to the "ordinality" column: 

1476 

1477 .. sourcecode:: pycon+sql 

1478 

1479 >>> from sqlalchemy import select, func 

1480 >>> stmt = select( 

1481 ... func.generate_series(4, 1, -1) 

1482 ... .table_valued("value", with_ordinality="ordinality") 

1483 ... .render_derived() 

1484 ... ) 

1485 >>> print(stmt) 

1486 {printsql}SELECT anon_1.value, anon_1.ordinality 

1487 FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3) 

1488 WITH ORDINALITY AS anon_1(value, ordinality) 

1489 

1490.. versionadded:: 1.4.0b2 

1491 

1492.. seealso:: 

1493 

1494 :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial` 

1495 

1496.. _postgresql_column_valued: 

1497 

1498Column Valued Functions 

1499^^^^^^^^^^^^^^^^^^^^^^^ 

1500 

1501Similar to the table valued function, a column valued function is present 

1502in the FROM clause, but delivers itself to the columns clause as a single 

1503scalar value. PostgreSQL functions such as ``json_array_elements()``, 

1504``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the 

1505:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`: 

1506 

1507* ``json_array_elements()``: 

1508 

1509 .. sourcecode:: pycon+sql 

1510 

1511 >>> from sqlalchemy import select, func 

1512 >>> stmt = select( 

1513 ... func.json_array_elements('["one", "two"]').column_valued("x") 

1514 ... ) 

1515 >>> print(stmt) 

1516 {printsql}SELECT x 

1517 FROM json_array_elements(:json_array_elements_1) AS x 

1518 

1519* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the 

1520 :func:`_postgresql.array` construct may be used: 

1521 

1522 .. sourcecode:: pycon+sql 

1523 

1524 >>> from sqlalchemy.dialects.postgresql import array 

1525 >>> from sqlalchemy import select, func 

1526 >>> stmt = select(func.unnest(array([1, 2])).column_valued()) 

1527 >>> print(stmt) 

1528 {printsql}SELECT anon_1 

1529 FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1 

1530 

1531 The function can of course be used against an existing table-bound column 

1532 that's of type :class:`_types.ARRAY`: 

1533 

1534 .. sourcecode:: pycon+sql 

1535 

1536 >>> from sqlalchemy import table, column, ARRAY, Integer 

1537 >>> from sqlalchemy import select, func 

1538 >>> t = table("t", column("value", ARRAY(Integer))) 

1539 >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value")) 

1540 >>> print(stmt) 

1541 {printsql}SELECT unnested_value 

1542 FROM unnest(t.value) AS unnested_value 

1543 

1544.. seealso:: 

1545 

1546 :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial` 

1547 

1548 

1549Row Types 

1550^^^^^^^^^ 

1551 

1552Built-in support for rendering a ``ROW`` may be approximated using 

1553``func.ROW`` with the :attr:`_sa.func` namespace, or by using the 

1554:func:`_sql.tuple_` construct: 

1555 

1556.. sourcecode:: pycon+sql 

1557 

1558 >>> from sqlalchemy import table, column, func, tuple_ 

1559 >>> t = table("t", column("id"), column("fk")) 

1560 >>> stmt = ( 

1561 ... t.select() 

1562 ... .where(tuple_(t.c.id, t.c.fk) > (1, 2)) 

1563 ... .where(func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7)) 

1564 ... ) 

1565 >>> print(stmt) 

1566 {printsql}SELECT t.id, t.fk 

1567 FROM t 

1568 WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2) 

1569 

1570.. seealso:: 

1571 

1572 `PostgreSQL Row Constructors 

1573 <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_ 

1574 

1575 `PostgreSQL Row Constructor Comparison 

1576 <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_ 

1577 

1578Table Types passed to Functions 

1579^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

1580 

1581PostgreSQL supports passing a table as an argument to a function, which is 

1582known as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects 

1583such as :class:`_schema.Table` support this special form using the 

1584:meth:`_sql.FromClause.table_valued` method, which is comparable to the 

1585:meth:`_functions.FunctionElement.table_valued` method except that the collection 

1586of columns is already established by that of the :class:`_sql.FromClause` 

1587itself: 

1588 

1589.. sourcecode:: pycon+sql 

1590 

1591 >>> from sqlalchemy import table, column, func, select 

1592 >>> a = table("a", column("id"), column("x"), column("y")) 

1593 >>> stmt = select(func.row_to_json(a.table_valued())) 

1594 >>> print(stmt) 

1595 {printsql}SELECT row_to_json(a) AS row_to_json_1 

1596 FROM a 

1597 

1598.. versionadded:: 1.4.0b2 

1599 

1600 

1601 

1602""" # noqa: E501 

1603 

1604from __future__ import annotations 

1605 

1606from collections import defaultdict 

1607from functools import lru_cache 

1608import re 

1609from typing import Any 

1610from typing import cast 

1611from typing import Dict 

1612from typing import List 

1613from typing import Optional 

1614from typing import Tuple 

1615from typing import TYPE_CHECKING 

1616from typing import Union 

1617 

1618from . import arraylib as _array 

1619from . import json as _json 

1620from . import pg_catalog 

1621from . import ranges as _ranges 

1622from .ext import _regconfig_fn 

1623from .ext import aggregate_order_by 

1624from .hstore import HSTORE 

1625from .named_types import CreateDomainType as CreateDomainType # noqa: F401 

1626from .named_types import CreateEnumType as CreateEnumType # noqa: F401 

1627from .named_types import DOMAIN as DOMAIN # noqa: F401 

1628from .named_types import DropDomainType as DropDomainType # noqa: F401 

1629from .named_types import DropEnumType as DropEnumType # noqa: F401 

1630from .named_types import ENUM as ENUM # noqa: F401 

1631from .named_types import NamedType as NamedType # noqa: F401 

1632from .types import _DECIMAL_TYPES # noqa: F401 

1633from .types import _FLOAT_TYPES # noqa: F401 

1634from .types import _INT_TYPES # noqa: F401 

1635from .types import BIT as BIT 

1636from .types import BYTEA as BYTEA 

1637from .types import CIDR as CIDR 

1638from .types import CITEXT as CITEXT 

1639from .types import INET as INET 

1640from .types import INTERVAL as INTERVAL 

1641from .types import MACADDR as MACADDR 

1642from .types import MACADDR8 as MACADDR8 

1643from .types import MONEY as MONEY 

1644from .types import OID as OID 

1645from .types import PGBit as PGBit # noqa: F401 

1646from .types import PGCidr as PGCidr # noqa: F401 

1647from .types import PGInet as PGInet # noqa: F401 

1648from .types import PGInterval as PGInterval # noqa: F401 

1649from .types import PGMacAddr as PGMacAddr # noqa: F401 

1650from .types import PGMacAddr8 as PGMacAddr8 # noqa: F401 

1651from .types import PGUuid as PGUuid 

1652from .types import REGCLASS as REGCLASS 

1653from .types import REGCONFIG as REGCONFIG # noqa: F401 

1654from .types import TIME as TIME 

1655from .types import TIMESTAMP as TIMESTAMP 

1656from .types import TSVECTOR as TSVECTOR 

1657from ... import exc 

1658from ... import schema 

1659from ... import select 

1660from ... import sql 

1661from ... import util 

1662from ...engine import characteristics 

1663from ...engine import default 

1664from ...engine import interfaces 

1665from ...engine import ObjectKind 

1666from ...engine import ObjectScope 

1667from ...engine import reflection 

1668from ...engine import URL 

1669from ...engine.reflection import ReflectionDefaults 

1670from ...sql import bindparam 

1671from ...sql import coercions 

1672from ...sql import compiler 

1673from ...sql import elements 

1674from ...sql import expression 

1675from ...sql import functions 

1676from ...sql import roles 

1677from ...sql import sqltypes 

1678from ...sql import util as sql_util 

1679from ...sql.compiler import InsertmanyvaluesSentinelOpts 

1680from ...sql.visitors import InternalTraversal 

1681from ...types import BIGINT 

1682from ...types import BOOLEAN 

1683from ...types import CHAR 

1684from ...types import DATE 

1685from ...types import DOUBLE_PRECISION 

1686from ...types import FLOAT 

1687from ...types import INTEGER 

1688from ...types import NUMERIC 

1689from ...types import REAL 

1690from ...types import SMALLINT 

1691from ...types import TEXT 

1692from ...types import UUID as UUID 

1693from ...types import VARCHAR 

1694from ...util.typing import TypedDict 

1695 

1696IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I) 

1697 

1698RESERVED_WORDS = { 

1699 "all", 

1700 "analyse", 

1701 "analyze", 

1702 "and", 

1703 "any", 

1704 "array", 

1705 "as", 

1706 "asc", 

1707 "asymmetric", 

1708 "both", 

1709 "case", 

1710 "cast", 

1711 "check", 

1712 "collate", 

1713 "column", 

1714 "constraint", 

1715 "create", 

1716 "current_catalog", 

1717 "current_date", 

1718 "current_role", 

1719 "current_time", 

1720 "current_timestamp", 

1721 "current_user", 

1722 "default", 

1723 "deferrable", 

1724 "desc", 

1725 "distinct", 

1726 "do", 

1727 "else", 

1728 "end", 

1729 "except", 

1730 "false", 

1731 "fetch", 

1732 "for", 

1733 "foreign", 

1734 "from", 

1735 "grant", 

1736 "group", 

1737 "having", 

1738 "in", 

1739 "initially", 

1740 "intersect", 

1741 "into", 

1742 "leading", 

1743 "limit", 

1744 "localtime", 

1745 "localtimestamp", 

1746 "new", 

1747 "not", 

1748 "null", 

1749 "of", 

1750 "off", 

1751 "offset", 

1752 "old", 

1753 "on", 

1754 "only", 

1755 "or", 

1756 "order", 

1757 "placing", 

1758 "primary", 

1759 "references", 

1760 "returning", 

1761 "select", 

1762 "session_user", 

1763 "some", 

1764 "symmetric", 

1765 "table", 

1766 "then", 

1767 "to", 

1768 "trailing", 

1769 "true", 

1770 "union", 

1771 "unique", 

1772 "user", 

1773 "using", 

1774 "variadic", 

1775 "when", 

1776 "where", 

1777 "window", 

1778 "with", 

1779 "authorization", 

1780 "between", 

1781 "binary", 

1782 "cross", 

1783 "current_schema", 

1784 "freeze", 

1785 "full", 

1786 "ilike", 

1787 "inner", 

1788 "is", 

1789 "isnull", 

1790 "join", 

1791 "left", 

1792 "like", 

1793 "natural", 

1794 "notnull", 

1795 "outer", 

1796 "over", 

1797 "overlaps", 

1798 "right", 

1799 "similar", 

1800 "verbose", 

1801} 

1802 

1803 

1804colspecs = { 

1805 sqltypes.ARRAY: _array.ARRAY, 

1806 sqltypes.Interval: INTERVAL, 

1807 sqltypes.Enum: ENUM, 

1808 sqltypes.JSON.JSONPathType: _json.JSONPATH, 

1809 sqltypes.JSON: _json.JSON, 

1810 sqltypes.Uuid: PGUuid, 

1811} 

1812 

1813 

1814ischema_names = { 

1815 "_array": _array.ARRAY, 

1816 "hstore": HSTORE, 

1817 "json": _json.JSON, 

1818 "jsonb": _json.JSONB, 

1819 "int4range": _ranges.INT4RANGE, 

1820 "int8range": _ranges.INT8RANGE, 

1821 "numrange": _ranges.NUMRANGE, 

1822 "daterange": _ranges.DATERANGE, 

1823 "tsrange": _ranges.TSRANGE, 

1824 "tstzrange": _ranges.TSTZRANGE, 

1825 "int4multirange": _ranges.INT4MULTIRANGE, 

1826 "int8multirange": _ranges.INT8MULTIRANGE, 

1827 "nummultirange": _ranges.NUMMULTIRANGE, 

1828 "datemultirange": _ranges.DATEMULTIRANGE, 

1829 "tsmultirange": _ranges.TSMULTIRANGE, 

1830 "tstzmultirange": _ranges.TSTZMULTIRANGE, 

1831 "integer": INTEGER, 

1832 "bigint": BIGINT, 

1833 "smallint": SMALLINT, 

1834 "character varying": VARCHAR, 

1835 "character": CHAR, 

1836 '"char"': sqltypes.String, 

1837 "name": sqltypes.String, 

1838 "text": TEXT, 

1839 "numeric": NUMERIC, 

1840 "float": FLOAT, 

1841 "real": REAL, 

1842 "inet": INET, 

1843 "cidr": CIDR, 

1844 "citext": CITEXT, 

1845 "uuid": UUID, 

1846 "bit": BIT, 

1847 "bit varying": BIT, 

1848 "macaddr": MACADDR, 

1849 "macaddr8": MACADDR8, 

1850 "money": MONEY, 

1851 "oid": OID, 

1852 "regclass": REGCLASS, 

1853 "double precision": DOUBLE_PRECISION, 

1854 "timestamp": TIMESTAMP, 

1855 "timestamp with time zone": TIMESTAMP, 

1856 "timestamp without time zone": TIMESTAMP, 

1857 "time with time zone": TIME, 

1858 "time without time zone": TIME, 

1859 "date": DATE, 

1860 "time": TIME, 

1861 "bytea": BYTEA, 

1862 "boolean": BOOLEAN, 

1863 "interval": INTERVAL, 

1864 "tsvector": TSVECTOR, 

1865} 

1866 

1867 

1868class PGCompiler(compiler.SQLCompiler): 

1869 def visit_to_tsvector_func(self, element, **kw): 

1870 return self._assert_pg_ts_ext(element, **kw) 

1871 

1872 def visit_to_tsquery_func(self, element, **kw): 

1873 return self._assert_pg_ts_ext(element, **kw) 

1874 

1875 def visit_plainto_tsquery_func(self, element, **kw): 

1876 return self._assert_pg_ts_ext(element, **kw) 

1877 

1878 def visit_phraseto_tsquery_func(self, element, **kw): 

1879 return self._assert_pg_ts_ext(element, **kw) 

1880 

1881 def visit_websearch_to_tsquery_func(self, element, **kw): 

1882 return self._assert_pg_ts_ext(element, **kw) 

1883 

1884 def visit_ts_headline_func(self, element, **kw): 

1885 return self._assert_pg_ts_ext(element, **kw) 

1886 

1887 def _assert_pg_ts_ext(self, element, **kw): 

1888 if not isinstance(element, _regconfig_fn): 

1889 # other options here include trying to rewrite the function 

1890 # with the correct types. however, that means we have to 

1891 # "un-SQL-ize" the first argument, which can't work in a 

1892 # generalized way. Also, parent compiler class has already added 

1893 # the incorrect return type to the result map. So let's just 

1894 # make sure the function we want is used up front. 

1895 

1896 raise exc.CompileError( 

1897 f'Can\'t compile "{element.name}()" full text search ' 

1898 f"function construct that does not originate from the " 

1899 f'"sqlalchemy.dialects.postgresql" package. ' 

1900 f'Please ensure "import sqlalchemy.dialects.postgresql" is ' 

1901 f"called before constructing " 

1902 f'"sqlalchemy.func.{element.name}()" to ensure registration ' 

1903 f"of the correct argument and return types." 

1904 ) 

1905 

1906 return f"{element.name}{self.function_argspec(element, **kw)}" 

1907 

1908 def render_bind_cast(self, type_, dbapi_type, sqltext): 

1909 if dbapi_type._type_affinity is sqltypes.String and dbapi_type.length: 

1910 # use VARCHAR with no length for VARCHAR cast. 

1911 # see #9511 

1912 dbapi_type = sqltypes.STRINGTYPE 

1913 return f"""{sqltext}::{ 

1914 self.dialect.type_compiler_instance.process( 

1915 dbapi_type, identifier_preparer=self.preparer 

1916 ) 

1917 }""" 

1918 

1919 def visit_array(self, element, **kw): 

1920 if not element.clauses and not element.type.item_type._isnull: 

1921 return "ARRAY[]::%s" % element.type.compile(self.dialect) 

1922 return "ARRAY[%s]" % self.visit_clauselist(element, **kw) 

1923 

1924 def visit_slice(self, element, **kw): 

1925 return "%s:%s" % ( 

1926 self.process(element.start, **kw), 

1927 self.process(element.stop, **kw), 

1928 ) 

1929 

1930 def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 

1931 return self._generate_generic_binary(binary, " # ", **kw) 

1932 

1933 def visit_json_getitem_op_binary( 

1934 self, binary, operator, _cast_applied=False, **kw 

1935 ): 

1936 if ( 

1937 not _cast_applied 

1938 and binary.type._type_affinity is not sqltypes.JSON 

1939 ): 

1940 kw["_cast_applied"] = True 

1941 return self.process(sql.cast(binary, binary.type), **kw) 

1942 

1943 kw["eager_grouping"] = True 

1944 

1945 if ( 

1946 not _cast_applied 

1947 and isinstance(binary.left.type, _json.JSONB) 

1948 and self.dialect._supports_jsonb_subscripting 

1949 ): 

1950 left = binary.left 

1951 if isinstance(left, (functions.FunctionElement, elements.Cast)): 

1952 left = elements.Grouping(left) 

1953 

1954 # for pg14+JSONB use subscript notation: col['key'] instead 

1955 # of col -> 'key' 

1956 return "%s[%s]" % ( 

1957 self.process(left, **kw), 

1958 self.process(binary.right, **kw), 

1959 ) 

1960 else: 

1961 # Fall back to arrow notation for older versions or when cast 

1962 # is applied 

1963 return self._generate_generic_binary( 

1964 binary, " -> " if not _cast_applied else " ->> ", **kw 

1965 ) 

1966 

1967 def visit_json_path_getitem_op_binary( 

1968 self, binary, operator, _cast_applied=False, **kw 

1969 ): 

1970 if ( 

1971 not _cast_applied 

1972 and binary.type._type_affinity is not sqltypes.JSON 

1973 ): 

1974 kw["_cast_applied"] = True 

1975 return self.process(sql.cast(binary, binary.type), **kw) 

1976 

1977 kw["eager_grouping"] = True 

1978 return self._generate_generic_binary( 

1979 binary, " #> " if not _cast_applied else " #>> ", **kw 

1980 ) 

1981 

1982 def visit_getitem_binary(self, binary, operator, **kw): 

1983 return "%s[%s]" % ( 

1984 self.process(binary.left, **kw), 

1985 self.process(binary.right, **kw), 

1986 ) 

1987 

1988 def visit_aggregate_order_by(self, element, **kw): 

1989 return "%s ORDER BY %s" % ( 

1990 self.process(element.target, **kw), 

1991 self.process(element.order_by, **kw), 

1992 ) 

1993 

1994 def visit_match_op_binary(self, binary, operator, **kw): 

1995 if "postgresql_regconfig" in binary.modifiers: 

1996 regconfig = self.render_literal_value( 

1997 binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE 

1998 ) 

1999 if regconfig: 

2000 return "%s @@ plainto_tsquery(%s, %s)" % ( 

2001 self.process(binary.left, **kw), 

2002 regconfig, 

2003 self.process(binary.right, **kw), 

2004 ) 

2005 return "%s @@ plainto_tsquery(%s)" % ( 

2006 self.process(binary.left, **kw), 

2007 self.process(binary.right, **kw), 

2008 ) 

2009 

2010 def visit_ilike_case_insensitive_operand(self, element, **kw): 

2011 return element.element._compiler_dispatch(self, **kw) 

2012 

2013 def visit_ilike_op_binary(self, binary, operator, **kw): 

2014 escape = binary.modifiers.get("escape", None) 

2015 

2016 return "%s ILIKE %s" % ( 

2017 self.process(binary.left, **kw), 

2018 self.process(binary.right, **kw), 

2019 ) + ( 

2020 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

2021 if escape is not None 

2022 else "" 

2023 ) 

2024 

2025 def visit_not_ilike_op_binary(self, binary, operator, **kw): 

2026 escape = binary.modifiers.get("escape", None) 

2027 return "%s NOT ILIKE %s" % ( 

2028 self.process(binary.left, **kw), 

2029 self.process(binary.right, **kw), 

2030 ) + ( 

2031 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

2032 if escape is not None 

2033 else "" 

2034 ) 

2035 

2036 def _regexp_match(self, base_op, binary, operator, kw): 

2037 flags = binary.modifiers["flags"] 

2038 if flags is None: 

2039 return self._generate_generic_binary( 

2040 binary, " %s " % base_op, **kw 

2041 ) 

2042 if flags == "i": 

2043 return self._generate_generic_binary( 

2044 binary, " %s* " % base_op, **kw 

2045 ) 

2046 return "%s %s CONCAT('(?', %s, ')', %s)" % ( 

2047 self.process(binary.left, **kw), 

2048 base_op, 

2049 self.render_literal_value(flags, sqltypes.STRINGTYPE), 

2050 self.process(binary.right, **kw), 

2051 ) 

2052 

2053 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

2054 return self._regexp_match("~", binary, operator, kw) 

2055 

2056 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

2057 return self._regexp_match("!~", binary, operator, kw) 

2058 

2059 def visit_regexp_replace_op_binary(self, binary, operator, **kw): 

2060 string = self.process(binary.left, **kw) 

2061 pattern_replace = self.process(binary.right, **kw) 

2062 flags = binary.modifiers["flags"] 

2063 if flags is None: 

2064 return "REGEXP_REPLACE(%s, %s)" % ( 

2065 string, 

2066 pattern_replace, 

2067 ) 

2068 else: 

2069 return "REGEXP_REPLACE(%s, %s, %s)" % ( 

2070 string, 

2071 pattern_replace, 

2072 self.render_literal_value(flags, sqltypes.STRINGTYPE), 

2073 ) 

2074 

2075 def visit_empty_set_expr(self, element_types, **kw): 

2076 # cast the empty set to the type we are comparing against. if 

2077 # we are comparing against the null type, pick an arbitrary 

2078 # datatype for the empty set 

2079 return "SELECT %s WHERE 1!=1" % ( 

2080 ", ".join( 

2081 "CAST(NULL AS %s)" 

2082 % self.dialect.type_compiler_instance.process( 

2083 INTEGER() if type_._isnull else type_ 

2084 ) 

2085 for type_ in element_types or [INTEGER()] 

2086 ), 

2087 ) 

2088 

2089 def render_literal_value(self, value, type_): 

2090 value = super().render_literal_value(value, type_) 

2091 

2092 if self.dialect._backslash_escapes: 

2093 value = value.replace("\\", "\\\\") 

2094 return value 

2095 

2096 def visit_aggregate_strings_func(self, fn, **kw): 

2097 return "string_agg%s" % self.function_argspec(fn) 

2098 

2099 def visit_sequence(self, seq, **kw): 

2100 return "nextval('%s')" % self.preparer.format_sequence(seq) 

2101 

2102 def limit_clause(self, select, **kw): 

2103 text = "" 

2104 if select._limit_clause is not None: 

2105 text += " \n LIMIT " + self.process(select._limit_clause, **kw) 

2106 if select._offset_clause is not None: 

2107 if select._limit_clause is None: 

2108 text += "\n LIMIT ALL" 

2109 text += " OFFSET " + self.process(select._offset_clause, **kw) 

2110 return text 

2111 

2112 def format_from_hint_text(self, sqltext, table, hint, iscrud): 

2113 if hint.upper() != "ONLY": 

2114 raise exc.CompileError("Unrecognized hint: %r" % hint) 

2115 return "ONLY " + sqltext 

2116 

2117 def get_select_precolumns(self, select, **kw): 

2118 # Do not call super().get_select_precolumns because 

2119 # it will warn/raise when distinct on is present 

2120 if select._distinct or select._distinct_on: 

2121 if select._distinct_on: 

2122 return ( 

2123 "DISTINCT ON (" 

2124 + ", ".join( 

2125 [ 

2126 self.process(col, **kw) 

2127 for col in select._distinct_on 

2128 ] 

2129 ) 

2130 + ") " 

2131 ) 

2132 else: 

2133 return "DISTINCT " 

2134 else: 

2135 return "" 

2136 

2137 def for_update_clause(self, select, **kw): 

2138 if select._for_update_arg.read: 

2139 if select._for_update_arg.key_share: 

2140 tmp = " FOR KEY SHARE" 

2141 else: 

2142 tmp = " FOR SHARE" 

2143 elif select._for_update_arg.key_share: 

2144 tmp = " FOR NO KEY UPDATE" 

2145 else: 

2146 tmp = " FOR UPDATE" 

2147 

2148 if select._for_update_arg.of: 

2149 tables = util.OrderedSet() 

2150 for c in select._for_update_arg.of: 

2151 tables.update(sql_util.surface_selectables_only(c)) 

2152 

2153 of_kw = dict(kw) 

2154 of_kw.update(ashint=True, use_schema=False) 

2155 tmp += " OF " + ", ".join( 

2156 self.process(table, **of_kw) for table in tables 

2157 ) 

2158 

2159 if select._for_update_arg.nowait: 

2160 tmp += " NOWAIT" 

2161 if select._for_update_arg.skip_locked: 

2162 tmp += " SKIP LOCKED" 

2163 

2164 return tmp 

2165 

2166 def visit_substring_func(self, func, **kw): 

2167 s = self.process(func.clauses.clauses[0], **kw) 

2168 start = self.process(func.clauses.clauses[1], **kw) 

2169 if len(func.clauses.clauses) > 2: 

2170 length = self.process(func.clauses.clauses[2], **kw) 

2171 return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length) 

2172 else: 

2173 return "SUBSTRING(%s FROM %s)" % (s, start) 

2174 

2175 def _on_conflict_target(self, clause, **kw): 

2176 if clause.constraint_target is not None: 

2177 # target may be a name of an Index, UniqueConstraint or 

2178 # ExcludeConstraint. While there is a separate 

2179 # "max_identifier_length" for indexes, PostgreSQL uses the same 

2180 # length for all objects so we can use 

2181 # truncate_and_render_constraint_name 

2182 target_text = ( 

2183 "ON CONSTRAINT %s" 

2184 % self.preparer.truncate_and_render_constraint_name( 

2185 clause.constraint_target 

2186 ) 

2187 ) 

2188 elif clause.inferred_target_elements is not None: 

2189 target_text = "(%s)" % ", ".join( 

2190 ( 

2191 self.preparer.quote(c) 

2192 if isinstance(c, str) 

2193 else self.process(c, include_table=False, use_schema=False) 

2194 ) 

2195 for c in clause.inferred_target_elements 

2196 ) 

2197 if clause.inferred_target_whereclause is not None: 

2198 whereclause_kw = dict(kw) 

2199 whereclause_kw.update(include_table=False, use_schema=False) 

2200 target_text += " WHERE %s" % self.process( 

2201 clause.inferred_target_whereclause, 

2202 **whereclause_kw, 

2203 ) 

2204 else: 

2205 target_text = "" 

2206 

2207 return target_text 

2208 

2209 def visit_on_conflict_do_nothing(self, on_conflict, **kw): 

2210 target_text = self._on_conflict_target(on_conflict, **kw) 

2211 

2212 if target_text: 

2213 return "ON CONFLICT %s DO NOTHING" % target_text 

2214 else: 

2215 return "ON CONFLICT DO NOTHING" 

2216 

2217 def visit_on_conflict_do_update(self, on_conflict, **kw): 

2218 clause = on_conflict 

2219 

2220 target_text = self._on_conflict_target(on_conflict, **kw) 

2221 

2222 action_set_ops = [] 

2223 

2224 set_parameters = dict(clause.update_values_to_set) 

2225 # create a list of column assignment clauses as tuples 

2226 

2227 insert_statement = self.stack[-1]["selectable"] 

2228 cols = insert_statement.table.c 

2229 set_kw = dict(kw) 

2230 set_kw.update(use_schema=False) 

2231 for c in cols: 

2232 col_key = c.key 

2233 

2234 if col_key in set_parameters: 

2235 value = set_parameters.pop(col_key) 

2236 elif c in set_parameters: 

2237 value = set_parameters.pop(c) 

2238 else: 

2239 continue 

2240 

2241 # TODO: this coercion should be up front. we can't cache 

2242 # SQL constructs with non-bound literals buried in them 

2243 if coercions._is_literal(value): 

2244 value = elements.BindParameter(None, value, type_=c.type) 

2245 

2246 else: 

2247 if ( 

2248 isinstance(value, elements.BindParameter) 

2249 and value.type._isnull 

2250 ): 

2251 value = value._clone() 

2252 value.type = c.type 

2253 value_text = self.process( 

2254 value.self_group(), is_upsert_set=True, **set_kw 

2255 ) 

2256 

2257 key_text = self.preparer.quote(c.name) 

2258 action_set_ops.append("%s = %s" % (key_text, value_text)) 

2259 

2260 # check for names that don't match columns 

2261 if set_parameters: 

2262 util.warn( 

2263 "Additional column names not matching " 

2264 "any column keys in table '%s': %s" 

2265 % ( 

2266 self.current_executable.table.name, 

2267 (", ".join("'%s'" % c for c in set_parameters)), 

2268 ) 

2269 ) 

2270 for k, v in set_parameters.items(): 

2271 key_text = ( 

2272 self.preparer.quote(k) 

2273 if isinstance(k, str) 

2274 else self.process(k, use_schema=False) 

2275 ) 

2276 value_text = self.process( 

2277 coercions.expect(roles.ExpressionElementRole, v), 

2278 is_upsert_set=True, 

2279 **set_kw, 

2280 ) 

2281 action_set_ops.append("%s = %s" % (key_text, value_text)) 

2282 

2283 action_text = ", ".join(action_set_ops) 

2284 if clause.update_whereclause is not None: 

2285 where_kw = dict(kw) 

2286 where_kw.update(include_table=True, use_schema=False) 

2287 action_text += " WHERE %s" % self.process( 

2288 clause.update_whereclause, **where_kw 

2289 ) 

2290 

2291 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) 

2292 

2293 def update_from_clause( 

2294 self, update_stmt, from_table, extra_froms, from_hints, **kw 

2295 ): 

2296 kw["asfrom"] = True 

2297 return "FROM " + ", ".join( 

2298 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

2299 for t in extra_froms 

2300 ) 

2301 

2302 def delete_extra_from_clause( 

2303 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

2304 ): 

2305 """Render the DELETE .. USING clause specific to PostgreSQL.""" 

2306 kw["asfrom"] = True 

2307 return "USING " + ", ".join( 

2308 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

2309 for t in extra_froms 

2310 ) 

2311 

2312 def fetch_clause(self, select, **kw): 

2313 # pg requires parens for non literal clauses. It's also required for 

2314 # bind parameters if a ::type casts is used by the driver (asyncpg), 

2315 # so it's easiest to just always add it 

2316 text = "" 

2317 if select._offset_clause is not None: 

2318 text += "\n OFFSET (%s) ROWS" % self.process( 

2319 select._offset_clause, **kw 

2320 ) 

2321 if select._fetch_clause is not None: 

2322 text += "\n FETCH FIRST (%s)%s ROWS %s" % ( 

2323 self.process(select._fetch_clause, **kw), 

2324 " PERCENT" if select._fetch_clause_options["percent"] else "", 

2325 ( 

2326 "WITH TIES" 

2327 if select._fetch_clause_options["with_ties"] 

2328 else "ONLY" 

2329 ), 

2330 ) 

2331 return text 

2332 

2333 

2334class PGDDLCompiler(compiler.DDLCompiler): 

2335 def get_column_specification(self, column, **kwargs): 

2336 colspec = self.preparer.format_column(column) 

2337 impl_type = column.type.dialect_impl(self.dialect) 

2338 if isinstance(impl_type, sqltypes.TypeDecorator): 

2339 impl_type = impl_type.impl 

2340 

2341 has_identity = ( 

2342 column.identity is not None 

2343 and self.dialect.supports_identity_columns 

2344 ) 

2345 

2346 if ( 

2347 column.primary_key 

2348 and column is column.table._autoincrement_column 

2349 and ( 

2350 self.dialect.supports_smallserial 

2351 or not isinstance(impl_type, sqltypes.SmallInteger) 

2352 ) 

2353 and not has_identity 

2354 and ( 

2355 column.default is None 

2356 or ( 

2357 isinstance(column.default, schema.Sequence) 

2358 and column.default.optional 

2359 ) 

2360 ) 

2361 ): 

2362 if isinstance(impl_type, sqltypes.BigInteger): 

2363 colspec += " BIGSERIAL" 

2364 elif isinstance(impl_type, sqltypes.SmallInteger): 

2365 colspec += " SMALLSERIAL" 

2366 else: 

2367 colspec += " SERIAL" 

2368 else: 

2369 colspec += " " + self.dialect.type_compiler_instance.process( 

2370 column.type, 

2371 type_expression=column, 

2372 identifier_preparer=self.preparer, 

2373 ) 

2374 default = self.get_column_default_string(column) 

2375 if default is not None: 

2376 colspec += " DEFAULT " + default 

2377 

2378 if column.computed is not None: 

2379 colspec += " " + self.process(column.computed) 

2380 if has_identity: 

2381 colspec += " " + self.process(column.identity) 

2382 

2383 if not column.nullable and not has_identity: 

2384 colspec += " NOT NULL" 

2385 elif column.nullable and has_identity: 

2386 colspec += " NULL" 

2387 return colspec 

2388 

2389 def _define_constraint_validity(self, constraint): 

2390 not_valid = constraint.dialect_options["postgresql"]["not_valid"] 

2391 return " NOT VALID" if not_valid else "" 

2392 

2393 def _define_include(self, obj): 

2394 includeclause = obj.dialect_options["postgresql"]["include"] 

2395 if not includeclause: 

2396 return "" 

2397 inclusions = [ 

2398 obj.table.c[col] if isinstance(col, str) else col 

2399 for col in includeclause 

2400 ] 

2401 return " INCLUDE (%s)" % ", ".join( 

2402 [self.preparer.quote(c.name) for c in inclusions] 

2403 ) 

2404 

2405 def visit_check_constraint(self, constraint, **kw): 

2406 if constraint._type_bound: 

2407 typ = list(constraint.columns)[0].type 

2408 if ( 

2409 isinstance(typ, sqltypes.ARRAY) 

2410 and isinstance(typ.item_type, sqltypes.Enum) 

2411 and not typ.item_type.native_enum 

2412 ): 

2413 raise exc.CompileError( 

2414 "PostgreSQL dialect cannot produce the CHECK constraint " 

2415 "for ARRAY of non-native ENUM; please specify " 

2416 "create_constraint=False on this Enum datatype." 

2417 ) 

2418 

2419 text = super().visit_check_constraint(constraint) 

2420 text += self._define_constraint_validity(constraint) 

2421 return text 

2422 

2423 def visit_foreign_key_constraint(self, constraint, **kw): 

2424 text = super().visit_foreign_key_constraint(constraint) 

2425 text += self._define_constraint_validity(constraint) 

2426 return text 

2427 

2428 def visit_primary_key_constraint(self, constraint, **kw): 

2429 text = self.define_constraint_preamble(constraint, **kw) 

2430 text += self.define_primary_key_body(constraint, **kw) 

2431 text += self._define_include(constraint) 

2432 text += self.define_constraint_deferrability(constraint) 

2433 return text 

2434 

2435 def visit_unique_constraint(self, constraint, **kw): 

2436 if len(constraint) == 0: 

2437 return "" 

2438 text = self.define_constraint_preamble(constraint, **kw) 

2439 text += self.define_unique_body(constraint, **kw) 

2440 text += self._define_include(constraint) 

2441 text += self.define_constraint_deferrability(constraint) 

2442 return text 

2443 

2444 @util.memoized_property 

2445 def _fk_ondelete_pattern(self): 

2446 return re.compile( 

2447 r"^(?:RESTRICT|CASCADE|SET (?:NULL|DEFAULT)(?:\s*\(.+\))?" 

2448 r"|NO ACTION)$", 

2449 re.I, 

2450 ) 

2451 

2452 def define_constraint_ondelete_cascade(self, constraint): 

2453 return " ON DELETE %s" % self.preparer.validate_sql_phrase( 

2454 constraint.ondelete, self._fk_ondelete_pattern 

2455 ) 

2456 

2457 def visit_create_enum_type(self, create, **kw): 

2458 type_ = create.element 

2459 

2460 return "CREATE TYPE %s AS ENUM (%s)" % ( 

2461 self.preparer.format_type(type_), 

2462 ", ".join( 

2463 self.sql_compiler.process(sql.literal(e), literal_binds=True) 

2464 for e in type_.enums 

2465 ), 

2466 ) 

2467 

2468 def visit_drop_enum_type(self, drop, **kw): 

2469 type_ = drop.element 

2470 

2471 return "DROP TYPE %s" % (self.preparer.format_type(type_)) 

2472 

2473 def visit_create_domain_type(self, create, **kw): 

2474 domain: DOMAIN = create.element 

2475 

2476 options = [] 

2477 if domain.collation is not None: 

2478 options.append(f"COLLATE {self.preparer.quote(domain.collation)}") 

2479 if domain.default is not None: 

2480 default = self.render_default_string(domain.default) 

2481 options.append(f"DEFAULT {default}") 

2482 if domain.constraint_name is not None: 

2483 name = self.preparer.truncate_and_render_constraint_name( 

2484 domain.constraint_name 

2485 ) 

2486 options.append(f"CONSTRAINT {name}") 

2487 if domain.not_null: 

2488 options.append("NOT NULL") 

2489 if domain.check is not None: 

2490 check = self.sql_compiler.process( 

2491 domain.check, include_table=False, literal_binds=True 

2492 ) 

2493 options.append(f"CHECK ({check})") 

2494 

2495 return ( 

2496 f"CREATE DOMAIN {self.preparer.format_type(domain)} AS " 

2497 f"{self.type_compiler.process(domain.data_type)} " 

2498 f"{' '.join(options)}" 

2499 ) 

2500 

2501 def visit_drop_domain_type(self, drop, **kw): 

2502 domain = drop.element 

2503 return f"DROP DOMAIN {self.preparer.format_type(domain)}" 

2504 

2505 def visit_create_index(self, create, **kw): 

2506 preparer = self.preparer 

2507 index = create.element 

2508 self._verify_index_table(index) 

2509 text = "CREATE " 

2510 if index.unique: 

2511 text += "UNIQUE " 

2512 

2513 text += "INDEX " 

2514 

2515 if self.dialect._supports_create_index_concurrently: 

2516 concurrently = index.dialect_options["postgresql"]["concurrently"] 

2517 if concurrently: 

2518 text += "CONCURRENTLY " 

2519 

2520 if create.if_not_exists: 

2521 text += "IF NOT EXISTS " 

2522 

2523 text += "%s ON %s " % ( 

2524 self._prepared_index_name(index, include_schema=False), 

2525 preparer.format_table(index.table), 

2526 ) 

2527 

2528 using = index.dialect_options["postgresql"]["using"] 

2529 if using: 

2530 text += ( 

2531 "USING %s " 

2532 % self.preparer.validate_sql_phrase(using, IDX_USING).lower() 

2533 ) 

2534 

2535 ops = index.dialect_options["postgresql"]["ops"] 

2536 text += "(%s)" % ( 

2537 ", ".join( 

2538 [ 

2539 self.sql_compiler.process( 

2540 ( 

2541 expr.self_group() 

2542 if not isinstance(expr, expression.ColumnClause) 

2543 else expr 

2544 ), 

2545 include_table=False, 

2546 literal_binds=True, 

2547 ) 

2548 + ( 

2549 (" " + ops[expr.key]) 

2550 if hasattr(expr, "key") and expr.key in ops 

2551 else "" 

2552 ) 

2553 for expr in index.expressions 

2554 ] 

2555 ) 

2556 ) 

2557 

2558 text += self._define_include(index) 

2559 

2560 nulls_not_distinct = index.dialect_options["postgresql"][ 

2561 "nulls_not_distinct" 

2562 ] 

2563 if nulls_not_distinct is True: 

2564 text += " NULLS NOT DISTINCT" 

2565 elif nulls_not_distinct is False: 

2566 text += " NULLS DISTINCT" 

2567 

2568 withclause = index.dialect_options["postgresql"]["with"] 

2569 if withclause: 

2570 text += " WITH (%s)" % ( 

2571 ", ".join( 

2572 [ 

2573 "%s = %s" % storage_parameter 

2574 for storage_parameter in withclause.items() 

2575 ] 

2576 ) 

2577 ) 

2578 

2579 tablespace_name = index.dialect_options["postgresql"]["tablespace"] 

2580 if tablespace_name: 

2581 text += " TABLESPACE %s" % preparer.quote(tablespace_name) 

2582 

2583 whereclause = index.dialect_options["postgresql"]["where"] 

2584 if whereclause is not None: 

2585 whereclause = coercions.expect( 

2586 roles.DDLExpressionRole, whereclause 

2587 ) 

2588 

2589 where_compiled = self.sql_compiler.process( 

2590 whereclause, include_table=False, literal_binds=True 

2591 ) 

2592 text += " WHERE " + where_compiled 

2593 

2594 return text 

2595 

2596 def define_unique_constraint_distinct(self, constraint, **kw): 

2597 nulls_not_distinct = constraint.dialect_options["postgresql"][ 

2598 "nulls_not_distinct" 

2599 ] 

2600 if nulls_not_distinct is True: 

2601 nulls_not_distinct_param = "NULLS NOT DISTINCT " 

2602 elif nulls_not_distinct is False: 

2603 nulls_not_distinct_param = "NULLS DISTINCT " 

2604 else: 

2605 nulls_not_distinct_param = "" 

2606 return nulls_not_distinct_param 

2607 

2608 def visit_drop_index(self, drop, **kw): 

2609 index = drop.element 

2610 

2611 text = "\nDROP INDEX " 

2612 

2613 if self.dialect._supports_drop_index_concurrently: 

2614 concurrently = index.dialect_options["postgresql"]["concurrently"] 

2615 if concurrently: 

2616 text += "CONCURRENTLY " 

2617 

2618 if drop.if_exists: 

2619 text += "IF EXISTS " 

2620 

2621 text += self._prepared_index_name(index, include_schema=True) 

2622 return text 

2623 

2624 def visit_exclude_constraint(self, constraint, **kw): 

2625 text = "" 

2626 if constraint.name is not None: 

2627 text += "CONSTRAINT %s " % self.preparer.format_constraint( 

2628 constraint 

2629 ) 

2630 elements = [] 

2631 kw["include_table"] = False 

2632 kw["literal_binds"] = True 

2633 for expr, name, op in constraint._render_exprs: 

2634 exclude_element = self.sql_compiler.process(expr, **kw) + ( 

2635 (" " + constraint.ops[expr.key]) 

2636 if hasattr(expr, "key") and expr.key in constraint.ops 

2637 else "" 

2638 ) 

2639 

2640 elements.append("%s WITH %s" % (exclude_element, op)) 

2641 text += "EXCLUDE USING %s (%s)" % ( 

2642 self.preparer.validate_sql_phrase( 

2643 constraint.using, IDX_USING 

2644 ).lower(), 

2645 ", ".join(elements), 

2646 ) 

2647 if constraint.where is not None: 

2648 text += " WHERE (%s)" % self.sql_compiler.process( 

2649 constraint.where, literal_binds=True 

2650 ) 

2651 text += self.define_constraint_deferrability(constraint) 

2652 return text 

2653 

2654 def post_create_table(self, table): 

2655 table_opts = [] 

2656 pg_opts = table.dialect_options["postgresql"] 

2657 

2658 inherits = pg_opts.get("inherits") 

2659 if inherits is not None: 

2660 if not isinstance(inherits, (list, tuple)): 

2661 inherits = (inherits,) 

2662 table_opts.append( 

2663 "\n INHERITS ( " 

2664 + ", ".join(self.preparer.quote(name) for name in inherits) 

2665 + " )" 

2666 ) 

2667 

2668 if pg_opts["partition_by"]: 

2669 table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"]) 

2670 

2671 if pg_opts["using"]: 

2672 table_opts.append("\n USING %s" % pg_opts["using"]) 

2673 

2674 if pg_opts["with_oids"] is True: 

2675 table_opts.append("\n WITH OIDS") 

2676 elif pg_opts["with_oids"] is False: 

2677 table_opts.append("\n WITHOUT OIDS") 

2678 

2679 if pg_opts["on_commit"]: 

2680 on_commit_options = pg_opts["on_commit"].replace("_", " ").upper() 

2681 table_opts.append("\n ON COMMIT %s" % on_commit_options) 

2682 

2683 if pg_opts["tablespace"]: 

2684 tablespace_name = pg_opts["tablespace"] 

2685 table_opts.append( 

2686 "\n TABLESPACE %s" % self.preparer.quote(tablespace_name) 

2687 ) 

2688 

2689 return "".join(table_opts) 

2690 

2691 def visit_computed_column(self, generated, **kw): 

2692 if generated.persisted is False: 

2693 raise exc.CompileError( 

2694 "PostrgreSQL computed columns do not support 'virtual' " 

2695 "persistence; set the 'persisted' flag to None or True for " 

2696 "PostgreSQL support." 

2697 ) 

2698 

2699 return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process( 

2700 generated.sqltext, include_table=False, literal_binds=True 

2701 ) 

2702 

2703 def visit_create_sequence(self, create, **kw): 

2704 prefix = None 

2705 if create.element.data_type is not None: 

2706 prefix = " AS %s" % self.type_compiler.process( 

2707 create.element.data_type 

2708 ) 

2709 

2710 return super().visit_create_sequence(create, prefix=prefix, **kw) 

2711 

2712 def _can_comment_on_constraint(self, ddl_instance): 

2713 constraint = ddl_instance.element 

2714 if constraint.name is None: 

2715 raise exc.CompileError( 

2716 f"Can't emit COMMENT ON for constraint {constraint!r}: " 

2717 "it has no name" 

2718 ) 

2719 if constraint.table is None: 

2720 raise exc.CompileError( 

2721 f"Can't emit COMMENT ON for constraint {constraint!r}: " 

2722 "it has no associated table" 

2723 ) 

2724 

2725 def visit_set_constraint_comment(self, create, **kw): 

2726 self._can_comment_on_constraint(create) 

2727 return "COMMENT ON CONSTRAINT %s ON %s IS %s" % ( 

2728 self.preparer.format_constraint(create.element), 

2729 self.preparer.format_table(create.element.table), 

2730 self.sql_compiler.render_literal_value( 

2731 create.element.comment, sqltypes.String() 

2732 ), 

2733 ) 

2734 

2735 def visit_drop_constraint_comment(self, drop, **kw): 

2736 self._can_comment_on_constraint(drop) 

2737 return "COMMENT ON CONSTRAINT %s ON %s IS NULL" % ( 

2738 self.preparer.format_constraint(drop.element), 

2739 self.preparer.format_table(drop.element.table), 

2740 ) 

2741 

2742 

2743class PGTypeCompiler(compiler.GenericTypeCompiler): 

2744 def visit_TSVECTOR(self, type_, **kw): 

2745 return "TSVECTOR" 

2746 

2747 def visit_TSQUERY(self, type_, **kw): 

2748 return "TSQUERY" 

2749 

2750 def visit_INET(self, type_, **kw): 

2751 return "INET" 

2752 

2753 def visit_CIDR(self, type_, **kw): 

2754 return "CIDR" 

2755 

2756 def visit_CITEXT(self, type_, **kw): 

2757 return "CITEXT" 

2758 

2759 def visit_MACADDR(self, type_, **kw): 

2760 return "MACADDR" 

2761 

2762 def visit_MACADDR8(self, type_, **kw): 

2763 return "MACADDR8" 

2764 

2765 def visit_MONEY(self, type_, **kw): 

2766 return "MONEY" 

2767 

2768 def visit_OID(self, type_, **kw): 

2769 return "OID" 

2770 

2771 def visit_REGCONFIG(self, type_, **kw): 

2772 return "REGCONFIG" 

2773 

2774 def visit_REGCLASS(self, type_, **kw): 

2775 return "REGCLASS" 

2776 

2777 def visit_FLOAT(self, type_, **kw): 

2778 if not type_.precision: 

2779 return "FLOAT" 

2780 else: 

2781 return "FLOAT(%(precision)s)" % {"precision": type_.precision} 

2782 

2783 def visit_double(self, type_, **kw): 

2784 return self.visit_DOUBLE_PRECISION(type, **kw) 

2785 

2786 def visit_BIGINT(self, type_, **kw): 

2787 return "BIGINT" 

2788 

2789 def visit_HSTORE(self, type_, **kw): 

2790 return "HSTORE" 

2791 

2792 def visit_JSON(self, type_, **kw): 

2793 return "JSON" 

2794 

2795 def visit_JSONB(self, type_, **kw): 

2796 return "JSONB" 

2797 

2798 def visit_INT4MULTIRANGE(self, type_, **kw): 

2799 return "INT4MULTIRANGE" 

2800 

2801 def visit_INT8MULTIRANGE(self, type_, **kw): 

2802 return "INT8MULTIRANGE" 

2803 

2804 def visit_NUMMULTIRANGE(self, type_, **kw): 

2805 return "NUMMULTIRANGE" 

2806 

2807 def visit_DATEMULTIRANGE(self, type_, **kw): 

2808 return "DATEMULTIRANGE" 

2809 

2810 def visit_TSMULTIRANGE(self, type_, **kw): 

2811 return "TSMULTIRANGE" 

2812 

2813 def visit_TSTZMULTIRANGE(self, type_, **kw): 

2814 return "TSTZMULTIRANGE" 

2815 

2816 def visit_INT4RANGE(self, type_, **kw): 

2817 return "INT4RANGE" 

2818 

2819 def visit_INT8RANGE(self, type_, **kw): 

2820 return "INT8RANGE" 

2821 

2822 def visit_NUMRANGE(self, type_, **kw): 

2823 return "NUMRANGE" 

2824 

2825 def visit_DATERANGE(self, type_, **kw): 

2826 return "DATERANGE" 

2827 

2828 def visit_TSRANGE(self, type_, **kw): 

2829 return "TSRANGE" 

2830 

2831 def visit_TSTZRANGE(self, type_, **kw): 

2832 return "TSTZRANGE" 

2833 

2834 def visit_json_int_index(self, type_, **kw): 

2835 return "INT" 

2836 

2837 def visit_json_str_index(self, type_, **kw): 

2838 return "TEXT" 

2839 

2840 def visit_datetime(self, type_, **kw): 

2841 return self.visit_TIMESTAMP(type_, **kw) 

2842 

2843 def visit_enum(self, type_, **kw): 

2844 if not type_.native_enum or not self.dialect.supports_native_enum: 

2845 return super().visit_enum(type_, **kw) 

2846 else: 

2847 return self.visit_ENUM(type_, **kw) 

2848 

2849 def visit_ENUM(self, type_, identifier_preparer=None, **kw): 

2850 if identifier_preparer is None: 

2851 identifier_preparer = self.dialect.identifier_preparer 

2852 return identifier_preparer.format_type(type_) 

2853 

2854 def visit_DOMAIN(self, type_, identifier_preparer=None, **kw): 

2855 if identifier_preparer is None: 

2856 identifier_preparer = self.dialect.identifier_preparer 

2857 return identifier_preparer.format_type(type_) 

2858 

2859 def visit_TIMESTAMP(self, type_, **kw): 

2860 return "TIMESTAMP%s %s" % ( 

2861 ( 

2862 "(%d)" % type_.precision 

2863 if getattr(type_, "precision", None) is not None 

2864 else "" 

2865 ), 

2866 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", 

2867 ) 

2868 

2869 def visit_TIME(self, type_, **kw): 

2870 return "TIME%s %s" % ( 

2871 ( 

2872 "(%d)" % type_.precision 

2873 if getattr(type_, "precision", None) is not None 

2874 else "" 

2875 ), 

2876 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", 

2877 ) 

2878 

2879 def visit_INTERVAL(self, type_, **kw): 

2880 text = "INTERVAL" 

2881 if type_.fields is not None: 

2882 text += " " + type_.fields 

2883 if type_.precision is not None: 

2884 text += " (%d)" % type_.precision 

2885 return text 

2886 

2887 def visit_BIT(self, type_, **kw): 

2888 if type_.varying: 

2889 compiled = "BIT VARYING" 

2890 if type_.length is not None: 

2891 compiled += "(%d)" % type_.length 

2892 else: 

2893 compiled = "BIT(%d)" % type_.length 

2894 return compiled 

2895 

2896 def visit_uuid(self, type_, **kw): 

2897 if type_.native_uuid: 

2898 return self.visit_UUID(type_, **kw) 

2899 else: 

2900 return super().visit_uuid(type_, **kw) 

2901 

2902 def visit_UUID(self, type_, **kw): 

2903 return "UUID" 

2904 

2905 def visit_large_binary(self, type_, **kw): 

2906 return self.visit_BYTEA(type_, **kw) 

2907 

2908 def visit_BYTEA(self, type_, **kw): 

2909 return "BYTEA" 

2910 

2911 def visit_ARRAY(self, type_, **kw): 

2912 inner = self.process(type_.item_type, **kw) 

2913 return re.sub( 

2914 r"((?: COLLATE.*)?)$", 

2915 ( 

2916 r"%s\1" 

2917 % ( 

2918 "[]" 

2919 * (type_.dimensions if type_.dimensions is not None else 1) 

2920 ) 

2921 ), 

2922 inner, 

2923 count=1, 

2924 ) 

2925 

2926 def visit_json_path(self, type_, **kw): 

2927 return self.visit_JSONPATH(type_, **kw) 

2928 

2929 def visit_JSONPATH(self, type_, **kw): 

2930 return "JSONPATH" 

2931 

2932 

2933class PGIdentifierPreparer(compiler.IdentifierPreparer): 

2934 reserved_words = RESERVED_WORDS 

2935 

2936 def _unquote_identifier(self, value): 

2937 if value[0] == self.initial_quote: 

2938 value = value[1:-1].replace( 

2939 self.escape_to_quote, self.escape_quote 

2940 ) 

2941 return value 

2942 

2943 def format_type(self, type_, use_schema=True): 

2944 if not type_.name: 

2945 raise exc.CompileError( 

2946 f"PostgreSQL {type_.__class__.__name__} type requires a name." 

2947 ) 

2948 

2949 name = self.quote(type_.name) 

2950 effective_schema = self.schema_for_object(type_) 

2951 

2952 if ( 

2953 not self.omit_schema 

2954 and use_schema 

2955 and effective_schema is not None 

2956 ): 

2957 name = f"{self.quote_schema(effective_schema)}.{name}" 

2958 return name 

2959 

2960 

2961class ReflectedNamedType(TypedDict): 

2962 """Represents a reflected named type.""" 

2963 

2964 name: str 

2965 """Name of the type.""" 

2966 schema: str 

2967 """The schema of the type.""" 

2968 visible: bool 

2969 """Indicates if this type is in the current search path.""" 

2970 

2971 

2972class ReflectedDomainConstraint(TypedDict): 

2973 """Represents a reflect check constraint of a domain.""" 

2974 

2975 name: str 

2976 """Name of the constraint.""" 

2977 check: str 

2978 """The check constraint text.""" 

2979 

2980 

2981class ReflectedDomain(ReflectedNamedType): 

2982 """Represents a reflected enum.""" 

2983 

2984 type: str 

2985 """The string name of the underlying data type of the domain.""" 

2986 nullable: bool 

2987 """Indicates if the domain allows null or not.""" 

2988 default: Optional[str] 

2989 """The string representation of the default value of this domain 

2990 or ``None`` if none present. 

2991 """ 

2992 constraints: List[ReflectedDomainConstraint] 

2993 """The constraints defined in the domain, if any. 

2994 The constraint are in order of evaluation by postgresql. 

2995 """ 

2996 collation: Optional[str] 

2997 """The collation for the domain.""" 

2998 

2999 

3000class ReflectedEnum(ReflectedNamedType): 

3001 """Represents a reflected enum.""" 

3002 

3003 labels: List[str] 

3004 """The labels that compose the enum.""" 

3005 

3006 

3007class PGInspector(reflection.Inspector): 

3008 dialect: PGDialect 

3009 

3010 def get_table_oid( 

3011 self, table_name: str, schema: Optional[str] = None 

3012 ) -> int: 

3013 """Return the OID for the given table name. 

3014 

3015 :param table_name: string name of the table. For special quoting, 

3016 use :class:`.quoted_name`. 

3017 

3018 :param schema: string schema name; if omitted, uses the default schema 

3019 of the database connection. For special quoting, 

3020 use :class:`.quoted_name`. 

3021 

3022 """ 

3023 

3024 with self._operation_context() as conn: 

3025 return self.dialect.get_table_oid( 

3026 conn, table_name, schema, info_cache=self.info_cache 

3027 ) 

3028 

3029 def get_domains( 

3030 self, schema: Optional[str] = None 

3031 ) -> List[ReflectedDomain]: 

3032 """Return a list of DOMAIN objects. 

3033 

3034 Each member is a dictionary containing these fields: 

3035 

3036 * name - name of the domain 

3037 * schema - the schema name for the domain. 

3038 * visible - boolean, whether or not this domain is visible 

3039 in the default search path. 

3040 * type - the type defined by this domain. 

3041 * nullable - Indicates if this domain can be ``NULL``. 

3042 * default - The default value of the domain or ``None`` if the 

3043 domain has no default. 

3044 * constraints - A list of dict with the constraint defined by this 

3045 domain. Each element contains two keys: ``name`` of the 

3046 constraint and ``check`` with the constraint text. 

3047 

3048 :param schema: schema name. If None, the default schema 

3049 (typically 'public') is used. May also be set to ``'*'`` to 

3050 indicate load domains for all schemas. 

3051 

3052 .. versionadded:: 2.0 

3053 

3054 """ 

3055 with self._operation_context() as conn: 

3056 return self.dialect._load_domains( 

3057 conn, schema, info_cache=self.info_cache 

3058 ) 

3059 

3060 def get_enums(self, schema: Optional[str] = None) -> List[ReflectedEnum]: 

3061 """Return a list of ENUM objects. 

3062 

3063 Each member is a dictionary containing these fields: 

3064 

3065 * name - name of the enum 

3066 * schema - the schema name for the enum. 

3067 * visible - boolean, whether or not this enum is visible 

3068 in the default search path. 

3069 * labels - a list of string labels that apply to the enum. 

3070 

3071 :param schema: schema name. If None, the default schema 

3072 (typically 'public') is used. May also be set to ``'*'`` to 

3073 indicate load enums for all schemas. 

3074 

3075 """ 

3076 with self._operation_context() as conn: 

3077 return self.dialect._load_enums( 

3078 conn, schema, info_cache=self.info_cache 

3079 ) 

3080 

3081 def get_foreign_table_names( 

3082 self, schema: Optional[str] = None 

3083 ) -> List[str]: 

3084 """Return a list of FOREIGN TABLE names. 

3085 

3086 Behavior is similar to that of 

3087 :meth:`_reflection.Inspector.get_table_names`, 

3088 except that the list is limited to those tables that report a 

3089 ``relkind`` value of ``f``. 

3090 

3091 """ 

3092 with self._operation_context() as conn: 

3093 return self.dialect._get_foreign_table_names( 

3094 conn, schema, info_cache=self.info_cache 

3095 ) 

3096 

3097 def has_type( 

3098 self, type_name: str, schema: Optional[str] = None, **kw: Any 

3099 ) -> bool: 

3100 """Return if the database has the specified type in the provided 

3101 schema. 

3102 

3103 :param type_name: the type to check. 

3104 :param schema: schema name. If None, the default schema 

3105 (typically 'public') is used. May also be set to ``'*'`` to 

3106 check in all schemas. 

3107 

3108 .. versionadded:: 2.0 

3109 

3110 """ 

3111 with self._operation_context() as conn: 

3112 return self.dialect.has_type( 

3113 conn, type_name, schema, info_cache=self.info_cache 

3114 ) 

3115 

3116 

3117class PGExecutionContext(default.DefaultExecutionContext): 

3118 def fire_sequence(self, seq, type_): 

3119 return self._execute_scalar( 

3120 ( 

3121 "select nextval('%s')" 

3122 % self.identifier_preparer.format_sequence(seq) 

3123 ), 

3124 type_, 

3125 ) 

3126 

3127 def get_insert_default(self, column): 

3128 if column.primary_key and column is column.table._autoincrement_column: 

3129 if column.server_default and column.server_default.has_argument: 

3130 # pre-execute passive defaults on primary key columns 

3131 return self._execute_scalar( 

3132 "select %s" % column.server_default.arg, column.type 

3133 ) 

3134 

3135 elif column.default is None or ( 

3136 column.default.is_sequence and column.default.optional 

3137 ): 

3138 # execute the sequence associated with a SERIAL primary 

3139 # key column. for non-primary-key SERIAL, the ID just 

3140 # generates server side. 

3141 

3142 try: 

3143 seq_name = column._postgresql_seq_name 

3144 except AttributeError: 

3145 tab = column.table.name 

3146 col = column.name 

3147 tab = tab[0 : 29 + max(0, (29 - len(col)))] 

3148 col = col[0 : 29 + max(0, (29 - len(tab)))] 

3149 name = "%s_%s_seq" % (tab, col) 

3150 column._postgresql_seq_name = seq_name = name 

3151 

3152 if column.table is not None: 

3153 effective_schema = self.connection.schema_for_object( 

3154 column.table 

3155 ) 

3156 else: 

3157 effective_schema = None 

3158 

3159 if effective_schema is not None: 

3160 exc = 'select nextval(\'"%s"."%s"\')' % ( 

3161 effective_schema, 

3162 seq_name, 

3163 ) 

3164 else: 

3165 exc = "select nextval('\"%s\"')" % (seq_name,) 

3166 

3167 return self._execute_scalar(exc, column.type) 

3168 

3169 return super().get_insert_default(column) 

3170 

3171 

3172class PGReadOnlyConnectionCharacteristic( 

3173 characteristics.ConnectionCharacteristic 

3174): 

3175 transactional = True 

3176 

3177 def reset_characteristic(self, dialect, dbapi_conn): 

3178 dialect.set_readonly(dbapi_conn, False) 

3179 

3180 def set_characteristic(self, dialect, dbapi_conn, value): 

3181 dialect.set_readonly(dbapi_conn, value) 

3182 

3183 def get_characteristic(self, dialect, dbapi_conn): 

3184 return dialect.get_readonly(dbapi_conn) 

3185 

3186 

3187class PGDeferrableConnectionCharacteristic( 

3188 characteristics.ConnectionCharacteristic 

3189): 

3190 transactional = True 

3191 

3192 def reset_characteristic(self, dialect, dbapi_conn): 

3193 dialect.set_deferrable(dbapi_conn, False) 

3194 

3195 def set_characteristic(self, dialect, dbapi_conn, value): 

3196 dialect.set_deferrable(dbapi_conn, value) 

3197 

3198 def get_characteristic(self, dialect, dbapi_conn): 

3199 return dialect.get_deferrable(dbapi_conn) 

3200 

3201 

3202class PGDialect(default.DefaultDialect): 

3203 name = "postgresql" 

3204 supports_statement_cache = True 

3205 supports_alter = True 

3206 max_identifier_length = 63 

3207 supports_sane_rowcount = True 

3208 

3209 bind_typing = interfaces.BindTyping.RENDER_CASTS 

3210 

3211 supports_native_enum = True 

3212 supports_native_boolean = True 

3213 supports_native_uuid = True 

3214 supports_smallserial = True 

3215 

3216 supports_sequences = True 

3217 sequences_optional = True 

3218 preexecute_autoincrement_sequences = True 

3219 postfetch_lastrowid = False 

3220 use_insertmanyvalues = True 

3221 

3222 returns_native_bytes = True 

3223 

3224 insertmanyvalues_implicit_sentinel = ( 

3225 InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT 

3226 | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT 

3227 | InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS 

3228 ) 

3229 

3230 supports_comments = True 

3231 supports_constraint_comments = True 

3232 supports_default_values = True 

3233 

3234 supports_default_metavalue = True 

3235 

3236 supports_empty_insert = False 

3237 supports_multivalues_insert = True 

3238 

3239 supports_identity_columns = True 

3240 

3241 default_paramstyle = "pyformat" 

3242 ischema_names = ischema_names 

3243 colspecs = colspecs 

3244 

3245 statement_compiler = PGCompiler 

3246 ddl_compiler = PGDDLCompiler 

3247 type_compiler_cls = PGTypeCompiler 

3248 preparer = PGIdentifierPreparer 

3249 execution_ctx_cls = PGExecutionContext 

3250 inspector = PGInspector 

3251 

3252 update_returning = True 

3253 delete_returning = True 

3254 insert_returning = True 

3255 update_returning_multifrom = True 

3256 delete_returning_multifrom = True 

3257 

3258 connection_characteristics = ( 

3259 default.DefaultDialect.connection_characteristics 

3260 ) 

3261 connection_characteristics = connection_characteristics.union( 

3262 { 

3263 "postgresql_readonly": PGReadOnlyConnectionCharacteristic(), 

3264 "postgresql_deferrable": PGDeferrableConnectionCharacteristic(), 

3265 } 

3266 ) 

3267 

3268 construct_arguments = [ 

3269 ( 

3270 schema.Index, 

3271 { 

3272 "using": False, 

3273 "include": None, 

3274 "where": None, 

3275 "ops": {}, 

3276 "concurrently": False, 

3277 "with": {}, 

3278 "tablespace": None, 

3279 "nulls_not_distinct": None, 

3280 }, 

3281 ), 

3282 ( 

3283 schema.Table, 

3284 { 

3285 "ignore_search_path": False, 

3286 "tablespace": None, 

3287 "partition_by": None, 

3288 "with_oids": None, 

3289 "on_commit": None, 

3290 "inherits": None, 

3291 "using": None, 

3292 }, 

3293 ), 

3294 ( 

3295 schema.CheckConstraint, 

3296 { 

3297 "not_valid": False, 

3298 }, 

3299 ), 

3300 ( 

3301 schema.ForeignKeyConstraint, 

3302 { 

3303 "not_valid": False, 

3304 }, 

3305 ), 

3306 ( 

3307 schema.PrimaryKeyConstraint, 

3308 {"include": None}, 

3309 ), 

3310 ( 

3311 schema.UniqueConstraint, 

3312 { 

3313 "include": None, 

3314 "nulls_not_distinct": None, 

3315 }, 

3316 ), 

3317 ] 

3318 

3319 reflection_options = ("postgresql_ignore_search_path",) 

3320 

3321 _backslash_escapes = True 

3322 _supports_create_index_concurrently = True 

3323 _supports_drop_index_concurrently = True 

3324 _supports_jsonb_subscripting = True 

3325 

3326 def __init__( 

3327 self, 

3328 native_inet_types=None, 

3329 json_serializer=None, 

3330 json_deserializer=None, 

3331 **kwargs, 

3332 ): 

3333 default.DefaultDialect.__init__(self, **kwargs) 

3334 

3335 self._native_inet_types = native_inet_types 

3336 self._json_deserializer = json_deserializer 

3337 self._json_serializer = json_serializer 

3338 

3339 def initialize(self, connection): 

3340 super().initialize(connection) 

3341 

3342 # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689 

3343 self.supports_smallserial = self.server_version_info >= (9, 2) 

3344 

3345 self._set_backslash_escapes(connection) 

3346 

3347 self._supports_drop_index_concurrently = self.server_version_info >= ( 

3348 9, 

3349 2, 

3350 ) 

3351 self.supports_identity_columns = self.server_version_info >= (10,) 

3352 

3353 self._supports_jsonb_subscripting = self.server_version_info >= (14,) 

3354 

3355 def get_isolation_level_values(self, dbapi_conn): 

3356 # note the generic dialect doesn't have AUTOCOMMIT, however 

3357 # all postgresql dialects should include AUTOCOMMIT. 

3358 return ( 

3359 "SERIALIZABLE", 

3360 "READ UNCOMMITTED", 

3361 "READ COMMITTED", 

3362 "REPEATABLE READ", 

3363 ) 

3364 

3365 def set_isolation_level(self, dbapi_connection, level): 

3366 cursor = dbapi_connection.cursor() 

3367 cursor.execute( 

3368 "SET SESSION CHARACTERISTICS AS TRANSACTION " 

3369 f"ISOLATION LEVEL {level}" 

3370 ) 

3371 cursor.execute("COMMIT") 

3372 cursor.close() 

3373 

3374 def get_isolation_level(self, dbapi_connection): 

3375 cursor = dbapi_connection.cursor() 

3376 cursor.execute("show transaction isolation level") 

3377 val = cursor.fetchone()[0] 

3378 cursor.close() 

3379 return val.upper() 

3380 

3381 def set_readonly(self, connection, value): 

3382 raise NotImplementedError() 

3383 

3384 def get_readonly(self, connection): 

3385 raise NotImplementedError() 

3386 

3387 def set_deferrable(self, connection, value): 

3388 raise NotImplementedError() 

3389 

3390 def get_deferrable(self, connection): 

3391 raise NotImplementedError() 

3392 

3393 def _split_multihost_from_url(self, url: URL) -> Union[ 

3394 Tuple[None, None], 

3395 Tuple[Tuple[Optional[str], ...], Tuple[Optional[int], ...]], 

3396 ]: 

3397 hosts: Optional[Tuple[Optional[str], ...]] = None 

3398 ports_str: Union[str, Tuple[Optional[str], ...], None] = None 

3399 

3400 integrated_multihost = False 

3401 

3402 if "host" in url.query: 

3403 if isinstance(url.query["host"], (list, tuple)): 

3404 integrated_multihost = True 

3405 hosts, ports_str = zip( 

3406 *[ 

3407 token.split(":") if ":" in token else (token, None) 

3408 for token in url.query["host"] 

3409 ] 

3410 ) 

3411 

3412 elif isinstance(url.query["host"], str): 

3413 hosts = tuple(url.query["host"].split(",")) 

3414 

3415 if ( 

3416 "port" not in url.query 

3417 and len(hosts) == 1 

3418 and ":" in hosts[0] 

3419 ): 

3420 # internet host is alphanumeric plus dots or hyphens. 

3421 # this is essentially rfc1123, which refers to rfc952. 

3422 # https://stackoverflow.com/questions/3523028/ 

3423 # valid-characters-of-a-hostname 

3424 host_port_match = re.match( 

3425 r"^([a-zA-Z0-9\-\.]*)(?:\:(\d*))?$", hosts[0] 

3426 ) 

3427 if host_port_match: 

3428 integrated_multihost = True 

3429 h, p = host_port_match.group(1, 2) 

3430 if TYPE_CHECKING: 

3431 assert isinstance(h, str) 

3432 assert isinstance(p, str) 

3433 hosts = (h,) 

3434 ports_str = cast( 

3435 "Tuple[Optional[str], ...]", (p,) if p else (None,) 

3436 ) 

3437 

3438 if "port" in url.query: 

3439 if integrated_multihost: 

3440 raise exc.ArgumentError( 

3441 "Can't mix 'multihost' formats together; use " 

3442 '"host=h1,h2,h3&port=p1,p2,p3" or ' 

3443 '"host=h1:p1&host=h2:p2&host=h3:p3" separately' 

3444 ) 

3445 if isinstance(url.query["port"], (list, tuple)): 

3446 ports_str = url.query["port"] 

3447 elif isinstance(url.query["port"], str): 

3448 ports_str = tuple(url.query["port"].split(",")) 

3449 

3450 ports: Optional[Tuple[Optional[int], ...]] = None 

3451 

3452 if ports_str: 

3453 try: 

3454 ports = tuple(int(x) if x else None for x in ports_str) 

3455 except ValueError: 

3456 raise exc.ArgumentError( 

3457 f"Received non-integer port arguments: {ports_str}" 

3458 ) from None 

3459 

3460 if ports and ( 

3461 (not hosts and len(ports) > 1) 

3462 or ( 

3463 hosts 

3464 and ports 

3465 and len(hosts) != len(ports) 

3466 and (len(hosts) > 1 or len(ports) > 1) 

3467 ) 

3468 ): 

3469 raise exc.ArgumentError("number of hosts and ports don't match") 

3470 

3471 if hosts is not None: 

3472 if ports is None: 

3473 ports = tuple(None for _ in hosts) 

3474 

3475 return hosts, ports # type: ignore 

3476 

3477 def do_begin_twophase(self, connection, xid): 

3478 self.do_begin(connection.connection) 

3479 

3480 def do_prepare_twophase(self, connection, xid): 

3481 connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid) 

3482 

3483 def do_rollback_twophase( 

3484 self, connection, xid, is_prepared=True, recover=False 

3485 ): 

3486 if is_prepared: 

3487 if recover: 

3488 # FIXME: ugly hack to get out of transaction 

3489 # context when committing recoverable transactions 

3490 # Must find out a way how to make the dbapi not 

3491 # open a transaction. 

3492 connection.exec_driver_sql("ROLLBACK") 

3493 connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid) 

3494 connection.exec_driver_sql("BEGIN") 

3495 self.do_rollback(connection.connection) 

3496 else: 

3497 self.do_rollback(connection.connection) 

3498 

3499 def do_commit_twophase( 

3500 self, connection, xid, is_prepared=True, recover=False 

3501 ): 

3502 if is_prepared: 

3503 if recover: 

3504 connection.exec_driver_sql("ROLLBACK") 

3505 connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid) 

3506 connection.exec_driver_sql("BEGIN") 

3507 self.do_rollback(connection.connection) 

3508 else: 

3509 self.do_commit(connection.connection) 

3510 

3511 def do_recover_twophase(self, connection): 

3512 return connection.scalars( 

3513 sql.text("SELECT gid FROM pg_prepared_xacts") 

3514 ).all() 

3515 

3516 def _get_default_schema_name(self, connection): 

3517 return connection.exec_driver_sql("select current_schema()").scalar() 

3518 

3519 @reflection.cache 

3520 def has_schema(self, connection, schema, **kw): 

3521 query = select(pg_catalog.pg_namespace.c.nspname).where( 

3522 pg_catalog.pg_namespace.c.nspname == schema 

3523 ) 

3524 return bool(connection.scalar(query)) 

3525 

3526 def _pg_class_filter_scope_schema( 

3527 self, query, schema, scope, pg_class_table=None 

3528 ): 

3529 if pg_class_table is None: 

3530 pg_class_table = pg_catalog.pg_class 

3531 query = query.join( 

3532 pg_catalog.pg_namespace, 

3533 pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace, 

3534 ) 

3535 

3536 if scope is ObjectScope.DEFAULT: 

3537 query = query.where(pg_class_table.c.relpersistence != "t") 

3538 elif scope is ObjectScope.TEMPORARY: 

3539 query = query.where(pg_class_table.c.relpersistence == "t") 

3540 

3541 if schema is None: 

3542 query = query.where( 

3543 pg_catalog.pg_table_is_visible(pg_class_table.c.oid), 

3544 # ignore pg_catalog schema 

3545 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

3546 ) 

3547 else: 

3548 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

3549 return query 

3550 

3551 def _pg_class_relkind_condition(self, relkinds, pg_class_table=None): 

3552 if pg_class_table is None: 

3553 pg_class_table = pg_catalog.pg_class 

3554 # uses the any form instead of in otherwise postgresql complaings 

3555 # that 'IN could not convert type character to "char"' 

3556 return pg_class_table.c.relkind == sql.any_(_array.array(relkinds)) 

3557 

3558 @lru_cache() 

3559 def _has_table_query(self, schema): 

3560 query = select(pg_catalog.pg_class.c.relname).where( 

3561 pg_catalog.pg_class.c.relname == bindparam("table_name"), 

3562 self._pg_class_relkind_condition( 

3563 pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3564 ), 

3565 ) 

3566 return self._pg_class_filter_scope_schema( 

3567 query, schema, scope=ObjectScope.ANY 

3568 ) 

3569 

3570 @reflection.cache 

3571 def has_table(self, connection, table_name, schema=None, **kw): 

3572 self._ensure_has_table_connection(connection) 

3573 query = self._has_table_query(schema) 

3574 return bool(connection.scalar(query, {"table_name": table_name})) 

3575 

3576 @reflection.cache 

3577 def has_sequence(self, connection, sequence_name, schema=None, **kw): 

3578 query = select(pg_catalog.pg_class.c.relname).where( 

3579 pg_catalog.pg_class.c.relkind == "S", 

3580 pg_catalog.pg_class.c.relname == sequence_name, 

3581 ) 

3582 query = self._pg_class_filter_scope_schema( 

3583 query, schema, scope=ObjectScope.ANY 

3584 ) 

3585 return bool(connection.scalar(query)) 

3586 

3587 @reflection.cache 

3588 def has_type(self, connection, type_name, schema=None, **kw): 

3589 query = ( 

3590 select(pg_catalog.pg_type.c.typname) 

3591 .join( 

3592 pg_catalog.pg_namespace, 

3593 pg_catalog.pg_namespace.c.oid 

3594 == pg_catalog.pg_type.c.typnamespace, 

3595 ) 

3596 .where(pg_catalog.pg_type.c.typname == type_name) 

3597 ) 

3598 if schema is None: 

3599 query = query.where( 

3600 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), 

3601 # ignore pg_catalog schema 

3602 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

3603 ) 

3604 elif schema != "*": 

3605 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

3606 

3607 return bool(connection.scalar(query)) 

3608 

3609 def _get_server_version_info(self, connection): 

3610 v = connection.exec_driver_sql("select pg_catalog.version()").scalar() 

3611 m = re.match( 

3612 r".*(?:PostgreSQL|EnterpriseDB) " 

3613 r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?", 

3614 v, 

3615 ) 

3616 if not m: 

3617 raise AssertionError( 

3618 "Could not determine version from string '%s'" % v 

3619 ) 

3620 return tuple([int(x) for x in m.group(1, 2, 3) if x is not None]) 

3621 

3622 @reflection.cache 

3623 def get_table_oid(self, connection, table_name, schema=None, **kw): 

3624 """Fetch the oid for schema.table_name.""" 

3625 query = select(pg_catalog.pg_class.c.oid).where( 

3626 pg_catalog.pg_class.c.relname == table_name, 

3627 self._pg_class_relkind_condition( 

3628 pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3629 ), 

3630 ) 

3631 query = self._pg_class_filter_scope_schema( 

3632 query, schema, scope=ObjectScope.ANY 

3633 ) 

3634 table_oid = connection.scalar(query) 

3635 if table_oid is None: 

3636 raise exc.NoSuchTableError( 

3637 f"{schema}.{table_name}" if schema else table_name 

3638 ) 

3639 return table_oid 

3640 

3641 @reflection.cache 

3642 def get_schema_names(self, connection, **kw): 

3643 query = ( 

3644 select(pg_catalog.pg_namespace.c.nspname) 

3645 .where(pg_catalog.pg_namespace.c.nspname.not_like("pg_%")) 

3646 .order_by(pg_catalog.pg_namespace.c.nspname) 

3647 ) 

3648 return connection.scalars(query).all() 

3649 

3650 def _get_relnames_for_relkinds(self, connection, schema, relkinds, scope): 

3651 query = select(pg_catalog.pg_class.c.relname).where( 

3652 self._pg_class_relkind_condition(relkinds) 

3653 ) 

3654 query = self._pg_class_filter_scope_schema(query, schema, scope=scope) 

3655 return connection.scalars(query).all() 

3656 

3657 @reflection.cache 

3658 def get_table_names(self, connection, schema=None, **kw): 

3659 return self._get_relnames_for_relkinds( 

3660 connection, 

3661 schema, 

3662 pg_catalog.RELKINDS_TABLE_NO_FOREIGN, 

3663 scope=ObjectScope.DEFAULT, 

3664 ) 

3665 

3666 @reflection.cache 

3667 def get_temp_table_names(self, connection, **kw): 

3668 return self._get_relnames_for_relkinds( 

3669 connection, 

3670 schema=None, 

3671 relkinds=pg_catalog.RELKINDS_TABLE_NO_FOREIGN, 

3672 scope=ObjectScope.TEMPORARY, 

3673 ) 

3674 

3675 @reflection.cache 

3676 def _get_foreign_table_names(self, connection, schema=None, **kw): 

3677 return self._get_relnames_for_relkinds( 

3678 connection, schema, relkinds=("f",), scope=ObjectScope.ANY 

3679 ) 

3680 

3681 @reflection.cache 

3682 def get_view_names(self, connection, schema=None, **kw): 

3683 return self._get_relnames_for_relkinds( 

3684 connection, 

3685 schema, 

3686 pg_catalog.RELKINDS_VIEW, 

3687 scope=ObjectScope.DEFAULT, 

3688 ) 

3689 

3690 @reflection.cache 

3691 def get_materialized_view_names(self, connection, schema=None, **kw): 

3692 return self._get_relnames_for_relkinds( 

3693 connection, 

3694 schema, 

3695 pg_catalog.RELKINDS_MAT_VIEW, 

3696 scope=ObjectScope.DEFAULT, 

3697 ) 

3698 

3699 @reflection.cache 

3700 def get_temp_view_names(self, connection, schema=None, **kw): 

3701 return self._get_relnames_for_relkinds( 

3702 connection, 

3703 schema, 

3704 # NOTE: do not include temp materialzied views (that do not 

3705 # seem to be a thing at least up to version 14) 

3706 pg_catalog.RELKINDS_VIEW, 

3707 scope=ObjectScope.TEMPORARY, 

3708 ) 

3709 

3710 @reflection.cache 

3711 def get_sequence_names(self, connection, schema=None, **kw): 

3712 return self._get_relnames_for_relkinds( 

3713 connection, schema, relkinds=("S",), scope=ObjectScope.ANY 

3714 ) 

3715 

3716 @reflection.cache 

3717 def get_view_definition(self, connection, view_name, schema=None, **kw): 

3718 query = ( 

3719 select(pg_catalog.pg_get_viewdef(pg_catalog.pg_class.c.oid)) 

3720 .select_from(pg_catalog.pg_class) 

3721 .where( 

3722 pg_catalog.pg_class.c.relname == view_name, 

3723 self._pg_class_relkind_condition( 

3724 pg_catalog.RELKINDS_VIEW + pg_catalog.RELKINDS_MAT_VIEW 

3725 ), 

3726 ) 

3727 ) 

3728 query = self._pg_class_filter_scope_schema( 

3729 query, schema, scope=ObjectScope.ANY 

3730 ) 

3731 res = connection.scalar(query) 

3732 if res is None: 

3733 raise exc.NoSuchTableError( 

3734 f"{schema}.{view_name}" if schema else view_name 

3735 ) 

3736 else: 

3737 return res 

3738 

3739 def _value_or_raise(self, data, table, schema): 

3740 try: 

3741 return dict(data)[(schema, table)] 

3742 except KeyError: 

3743 raise exc.NoSuchTableError( 

3744 f"{schema}.{table}" if schema else table 

3745 ) from None 

3746 

3747 def _prepare_filter_names(self, filter_names): 

3748 if filter_names: 

3749 return True, {"filter_names": filter_names} 

3750 else: 

3751 return False, {} 

3752 

3753 def _kind_to_relkinds(self, kind: ObjectKind) -> Tuple[str, ...]: 

3754 if kind is ObjectKind.ANY: 

3755 return pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3756 relkinds = () 

3757 if ObjectKind.TABLE in kind: 

3758 relkinds += pg_catalog.RELKINDS_TABLE 

3759 if ObjectKind.VIEW in kind: 

3760 relkinds += pg_catalog.RELKINDS_VIEW 

3761 if ObjectKind.MATERIALIZED_VIEW in kind: 

3762 relkinds += pg_catalog.RELKINDS_MAT_VIEW 

3763 return relkinds 

3764 

3765 @reflection.cache 

3766 def get_columns(self, connection, table_name, schema=None, **kw): 

3767 data = self.get_multi_columns( 

3768 connection, 

3769 schema=schema, 

3770 filter_names=[table_name], 

3771 scope=ObjectScope.ANY, 

3772 kind=ObjectKind.ANY, 

3773 **kw, 

3774 ) 

3775 return self._value_or_raise(data, table_name, schema) 

3776 

3777 @lru_cache() 

3778 def _columns_query(self, schema, has_filter_names, scope, kind): 

3779 # NOTE: the query with the default and identity options scalar 

3780 # subquery is faster than trying to use outer joins for them 

3781 generated = ( 

3782 pg_catalog.pg_attribute.c.attgenerated.label("generated") 

3783 if self.server_version_info >= (12,) 

3784 else sql.null().label("generated") 

3785 ) 

3786 if self.server_version_info >= (10,): 

3787 # join lateral performs worse (~2x slower) than a scalar_subquery 

3788 identity = ( 

3789 select( 

3790 sql.func.json_build_object( 

3791 "always", 

3792 pg_catalog.pg_attribute.c.attidentity == "a", 

3793 "start", 

3794 pg_catalog.pg_sequence.c.seqstart, 

3795 "increment", 

3796 pg_catalog.pg_sequence.c.seqincrement, 

3797 "minvalue", 

3798 pg_catalog.pg_sequence.c.seqmin, 

3799 "maxvalue", 

3800 pg_catalog.pg_sequence.c.seqmax, 

3801 "cache", 

3802 pg_catalog.pg_sequence.c.seqcache, 

3803 "cycle", 

3804 pg_catalog.pg_sequence.c.seqcycle, 

3805 type_=sqltypes.JSON(), 

3806 ) 

3807 ) 

3808 .select_from(pg_catalog.pg_sequence) 

3809 .where( 

3810 # attidentity != '' is required or it will reflect also 

3811 # serial columns as identity. 

3812 pg_catalog.pg_attribute.c.attidentity != "", 

3813 pg_catalog.pg_sequence.c.seqrelid 

3814 == sql.cast( 

3815 sql.cast( 

3816 pg_catalog.pg_get_serial_sequence( 

3817 sql.cast( 

3818 sql.cast( 

3819 pg_catalog.pg_attribute.c.attrelid, 

3820 REGCLASS, 

3821 ), 

3822 TEXT, 

3823 ), 

3824 pg_catalog.pg_attribute.c.attname, 

3825 ), 

3826 REGCLASS, 

3827 ), 

3828 OID, 

3829 ), 

3830 ) 

3831 .correlate(pg_catalog.pg_attribute) 

3832 .scalar_subquery() 

3833 .label("identity_options") 

3834 ) 

3835 else: 

3836 identity = sql.null().label("identity_options") 

3837 

3838 # join lateral performs the same as scalar_subquery here 

3839 default = ( 

3840 select( 

3841 pg_catalog.pg_get_expr( 

3842 pg_catalog.pg_attrdef.c.adbin, 

3843 pg_catalog.pg_attrdef.c.adrelid, 

3844 ) 

3845 ) 

3846 .select_from(pg_catalog.pg_attrdef) 

3847 .where( 

3848 pg_catalog.pg_attrdef.c.adrelid 

3849 == pg_catalog.pg_attribute.c.attrelid, 

3850 pg_catalog.pg_attrdef.c.adnum 

3851 == pg_catalog.pg_attribute.c.attnum, 

3852 pg_catalog.pg_attribute.c.atthasdef, 

3853 ) 

3854 .correlate(pg_catalog.pg_attribute) 

3855 .scalar_subquery() 

3856 .label("default") 

3857 ) 

3858 

3859 # get the name of the collate when it's different from the default one 

3860 collate = sql.case( 

3861 ( 

3862 sql.and_( 

3863 pg_catalog.pg_attribute.c.attcollation != 0, 

3864 select(pg_catalog.pg_type.c.typcollation) 

3865 .where( 

3866 pg_catalog.pg_type.c.oid 

3867 == pg_catalog.pg_attribute.c.atttypid, 

3868 ) 

3869 .correlate(pg_catalog.pg_attribute) 

3870 .scalar_subquery() 

3871 != pg_catalog.pg_attribute.c.attcollation, 

3872 ), 

3873 select(pg_catalog.pg_collation.c.collname) 

3874 .where( 

3875 pg_catalog.pg_collation.c.oid 

3876 == pg_catalog.pg_attribute.c.attcollation 

3877 ) 

3878 .correlate(pg_catalog.pg_attribute) 

3879 .scalar_subquery(), 

3880 ), 

3881 else_=sql.null(), 

3882 ).label("collation") 

3883 

3884 relkinds = self._kind_to_relkinds(kind) 

3885 query = ( 

3886 select( 

3887 pg_catalog.pg_attribute.c.attname.label("name"), 

3888 pg_catalog.format_type( 

3889 pg_catalog.pg_attribute.c.atttypid, 

3890 pg_catalog.pg_attribute.c.atttypmod, 

3891 ).label("format_type"), 

3892 default, 

3893 pg_catalog.pg_attribute.c.attnotnull.label("not_null"), 

3894 pg_catalog.pg_class.c.relname.label("table_name"), 

3895 pg_catalog.pg_description.c.description.label("comment"), 

3896 generated, 

3897 identity, 

3898 collate, 

3899 ) 

3900 .select_from(pg_catalog.pg_class) 

3901 # NOTE: postgresql support table with no user column, meaning 

3902 # there is no row with pg_attribute.attnum > 0. use a left outer 

3903 # join to avoid filtering these tables. 

3904 .outerjoin( 

3905 pg_catalog.pg_attribute, 

3906 sql.and_( 

3907 pg_catalog.pg_class.c.oid 

3908 == pg_catalog.pg_attribute.c.attrelid, 

3909 pg_catalog.pg_attribute.c.attnum > 0, 

3910 ~pg_catalog.pg_attribute.c.attisdropped, 

3911 ), 

3912 ) 

3913 .outerjoin( 

3914 pg_catalog.pg_description, 

3915 sql.and_( 

3916 pg_catalog.pg_description.c.objoid 

3917 == pg_catalog.pg_attribute.c.attrelid, 

3918 pg_catalog.pg_description.c.objsubid 

3919 == pg_catalog.pg_attribute.c.attnum, 

3920 ), 

3921 ) 

3922 .where(self._pg_class_relkind_condition(relkinds)) 

3923 .order_by( 

3924 pg_catalog.pg_class.c.relname, pg_catalog.pg_attribute.c.attnum 

3925 ) 

3926 ) 

3927 query = self._pg_class_filter_scope_schema(query, schema, scope=scope) 

3928 if has_filter_names: 

3929 query = query.where( 

3930 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

3931 ) 

3932 return query 

3933 

3934 def get_multi_columns( 

3935 self, connection, schema, filter_names, scope, kind, **kw 

3936 ): 

3937 has_filter_names, params = self._prepare_filter_names(filter_names) 

3938 query = self._columns_query(schema, has_filter_names, scope, kind) 

3939 rows = connection.execute(query, params).mappings() 

3940 

3941 # dictionary with (name, ) if default search path or (schema, name) 

3942 # as keys 

3943 domains = { 

3944 ((d["schema"], d["name"]) if not d["visible"] else (d["name"],)): d 

3945 for d in self._load_domains( 

3946 connection, schema="*", info_cache=kw.get("info_cache") 

3947 ) 

3948 } 

3949 

3950 # dictionary with (name, ) if default search path or (schema, name) 

3951 # as keys 

3952 enums = dict( 

3953 ( 

3954 ((rec["name"],), rec) 

3955 if rec["visible"] 

3956 else ((rec["schema"], rec["name"]), rec) 

3957 ) 

3958 for rec in self._load_enums( 

3959 connection, schema="*", info_cache=kw.get("info_cache") 

3960 ) 

3961 ) 

3962 

3963 columns = self._get_columns_info(rows, domains, enums, schema) 

3964 

3965 return columns.items() 

3966 

3967 _format_type_args_pattern = re.compile(r"\((.*)\)") 

3968 _format_type_args_delim = re.compile(r"\s*,\s*") 

3969 _format_array_spec_pattern = re.compile(r"((?:\[\])*)$") 

3970 

3971 def _reflect_type( 

3972 self, 

3973 format_type: Optional[str], 

3974 domains: Dict[str, ReflectedDomain], 

3975 enums: Dict[str, ReflectedEnum], 

3976 type_description: str, 

3977 collation: Optional[str], 

3978 ) -> sqltypes.TypeEngine[Any]: 

3979 """ 

3980 Attempts to reconstruct a column type defined in ischema_names based 

3981 on the information available in the format_type. 

3982 

3983 If the `format_type` cannot be associated with a known `ischema_names`, 

3984 it is treated as a reference to a known PostgreSQL named `ENUM` or 

3985 `DOMAIN` type. 

3986 """ 

3987 type_description = type_description or "unknown type" 

3988 if format_type is None: 

3989 util.warn( 

3990 "PostgreSQL format_type() returned NULL for %s" 

3991 % type_description 

3992 ) 

3993 return sqltypes.NULLTYPE 

3994 

3995 attype_args_match = self._format_type_args_pattern.search(format_type) 

3996 if attype_args_match and attype_args_match.group(1): 

3997 attype_args = self._format_type_args_delim.split( 

3998 attype_args_match.group(1) 

3999 ) 

4000 else: 

4001 attype_args = () 

4002 

4003 match_array_dim = self._format_array_spec_pattern.search(format_type) 

4004 # Each "[]" in array specs corresponds to an array dimension 

4005 array_dim = len(match_array_dim.group(1) or "") // 2 

4006 

4007 # Remove all parameters and array specs from format_type to obtain an 

4008 # ischema_name candidate 

4009 attype = self._format_type_args_pattern.sub("", format_type) 

4010 attype = self._format_array_spec_pattern.sub("", attype) 

4011 

4012 schema_type = self.ischema_names.get(attype.lower(), None) 

4013 args, kwargs = (), {} 

4014 

4015 if attype == "numeric": 

4016 if len(attype_args) == 2: 

4017 precision, scale = map(int, attype_args) 

4018 args = (precision, scale) 

4019 

4020 elif attype == "double precision": 

4021 args = (53,) 

4022 

4023 elif attype == "integer": 

4024 args = () 

4025 

4026 elif attype in ("timestamp with time zone", "time with time zone"): 

4027 kwargs["timezone"] = True 

4028 if len(attype_args) == 1: 

4029 kwargs["precision"] = int(attype_args[0]) 

4030 

4031 elif attype in ( 

4032 "timestamp without time zone", 

4033 "time without time zone", 

4034 "time", 

4035 ): 

4036 kwargs["timezone"] = False 

4037 if len(attype_args) == 1: 

4038 kwargs["precision"] = int(attype_args[0]) 

4039 

4040 elif attype == "bit varying": 

4041 kwargs["varying"] = True 

4042 if len(attype_args) == 1: 

4043 charlen = int(attype_args[0]) 

4044 args = (charlen,) 

4045 

4046 # a domain or enum can start with interval, so be mindful of that. 

4047 elif attype == "interval" or attype.startswith("interval "): 

4048 schema_type = INTERVAL 

4049 

4050 field_match = re.match(r"interval (.+)", attype) 

4051 if field_match: 

4052 kwargs["fields"] = field_match.group(1) 

4053 

4054 if len(attype_args) == 1: 

4055 kwargs["precision"] = int(attype_args[0]) 

4056 

4057 else: 

4058 enum_or_domain_key = tuple(util.quoted_token_parser(attype)) 

4059 

4060 if enum_or_domain_key in enums: 

4061 schema_type = ENUM 

4062 enum = enums[enum_or_domain_key] 

4063 

4064 kwargs["name"] = enum["name"] 

4065 

4066 if not enum["visible"]: 

4067 kwargs["schema"] = enum["schema"] 

4068 args = tuple(enum["labels"]) 

4069 elif enum_or_domain_key in domains: 

4070 schema_type = DOMAIN 

4071 domain = domains[enum_or_domain_key] 

4072 

4073 data_type = self._reflect_type( 

4074 domain["type"], 

4075 domains, 

4076 enums, 

4077 type_description="DOMAIN '%s'" % domain["name"], 

4078 collation=domain["collation"], 

4079 ) 

4080 args = (domain["name"], data_type) 

4081 

4082 kwargs["collation"] = domain["collation"] 

4083 kwargs["default"] = domain["default"] 

4084 kwargs["not_null"] = not domain["nullable"] 

4085 kwargs["create_type"] = False 

4086 

4087 if domain["constraints"]: 

4088 # We only support a single constraint 

4089 check_constraint = domain["constraints"][0] 

4090 

4091 kwargs["constraint_name"] = check_constraint["name"] 

4092 kwargs["check"] = check_constraint["check"] 

4093 

4094 if not domain["visible"]: 

4095 kwargs["schema"] = domain["schema"] 

4096 

4097 else: 

4098 try: 

4099 charlen = int(attype_args[0]) 

4100 args = (charlen, *attype_args[1:]) 

4101 except (ValueError, IndexError): 

4102 args = attype_args 

4103 

4104 if not schema_type: 

4105 util.warn( 

4106 "Did not recognize type '%s' of %s" 

4107 % (attype, type_description) 

4108 ) 

4109 return sqltypes.NULLTYPE 

4110 

4111 if collation is not None: 

4112 kwargs["collation"] = collation 

4113 

4114 data_type = schema_type(*args, **kwargs) 

4115 if array_dim >= 1: 

4116 # postgres does not preserve dimensionality or size of array types. 

4117 data_type = _array.ARRAY(data_type) 

4118 

4119 return data_type 

4120 

4121 def _get_columns_info(self, rows, domains, enums, schema): 

4122 columns = defaultdict(list) 

4123 for row_dict in rows: 

4124 # ensure that each table has an entry, even if it has no columns 

4125 if row_dict["name"] is None: 

4126 columns[(schema, row_dict["table_name"])] = ( 

4127 ReflectionDefaults.columns() 

4128 ) 

4129 continue 

4130 table_cols = columns[(schema, row_dict["table_name"])] 

4131 

4132 collation = row_dict["collation"] 

4133 

4134 coltype = self._reflect_type( 

4135 row_dict["format_type"], 

4136 domains, 

4137 enums, 

4138 type_description="column '%s'" % row_dict["name"], 

4139 collation=collation, 

4140 ) 

4141 

4142 default = row_dict["default"] 

4143 name = row_dict["name"] 

4144 generated = row_dict["generated"] 

4145 nullable = not row_dict["not_null"] 

4146 

4147 if isinstance(coltype, DOMAIN): 

4148 if not default: 

4149 # domain can override the default value but 

4150 # can't set it to None 

4151 if coltype.default is not None: 

4152 default = coltype.default 

4153 

4154 nullable = nullable and not coltype.not_null 

4155 

4156 identity = row_dict["identity_options"] 

4157 

4158 # If a zero byte or blank string depending on driver (is also 

4159 # absent for older PG versions), then not a generated column. 

4160 # Otherwise, s = stored. (Other values might be added in the 

4161 # future.) 

4162 if generated not in (None, "", b"\x00"): 

4163 computed = dict( 

4164 sqltext=default, persisted=generated in ("s", b"s") 

4165 ) 

4166 default = None 

4167 else: 

4168 computed = None 

4169 

4170 # adjust the default value 

4171 autoincrement = False 

4172 if default is not None: 

4173 match = re.search(r"""(nextval\(')([^']+)('.*$)""", default) 

4174 if match is not None: 

4175 if issubclass(coltype._type_affinity, sqltypes.Integer): 

4176 autoincrement = True 

4177 # the default is related to a Sequence 

4178 if "." not in match.group(2) and schema is not None: 

4179 # unconditionally quote the schema name. this could 

4180 # later be enhanced to obey quoting rules / 

4181 # "quote schema" 

4182 default = ( 

4183 match.group(1) 

4184 + ('"%s"' % schema) 

4185 + "." 

4186 + match.group(2) 

4187 + match.group(3) 

4188 ) 

4189 

4190 column_info = { 

4191 "name": name, 

4192 "type": coltype, 

4193 "nullable": nullable, 

4194 "default": default, 

4195 "autoincrement": autoincrement or identity is not None, 

4196 "comment": row_dict["comment"], 

4197 } 

4198 if computed is not None: 

4199 column_info["computed"] = computed 

4200 if identity is not None: 

4201 column_info["identity"] = identity 

4202 

4203 table_cols.append(column_info) 

4204 

4205 return columns 

4206 

4207 @lru_cache() 

4208 def _table_oids_query(self, schema, has_filter_names, scope, kind): 

4209 relkinds = self._kind_to_relkinds(kind) 

4210 oid_q = select( 

4211 pg_catalog.pg_class.c.oid, pg_catalog.pg_class.c.relname 

4212 ).where(self._pg_class_relkind_condition(relkinds)) 

4213 oid_q = self._pg_class_filter_scope_schema(oid_q, schema, scope=scope) 

4214 

4215 if has_filter_names: 

4216 oid_q = oid_q.where( 

4217 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

4218 ) 

4219 return oid_q 

4220 

4221 @reflection.flexi_cache( 

4222 ("schema", InternalTraversal.dp_string), 

4223 ("filter_names", InternalTraversal.dp_string_list), 

4224 ("kind", InternalTraversal.dp_plain_obj), 

4225 ("scope", InternalTraversal.dp_plain_obj), 

4226 ) 

4227 def _get_table_oids( 

4228 self, connection, schema, filter_names, scope, kind, **kw 

4229 ): 

4230 has_filter_names, params = self._prepare_filter_names(filter_names) 

4231 oid_q = self._table_oids_query(schema, has_filter_names, scope, kind) 

4232 result = connection.execute(oid_q, params) 

4233 return result.all() 

4234 

4235 @util.memoized_property 

4236 def _constraint_query(self): 

4237 if self.server_version_info >= (11, 0): 

4238 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts 

4239 else: 

4240 indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts") 

4241 

4242 if self.server_version_info >= (15,): 

4243 indnullsnotdistinct = pg_catalog.pg_index.c.indnullsnotdistinct 

4244 else: 

4245 indnullsnotdistinct = sql.false().label("indnullsnotdistinct") 

4246 

4247 con_sq = ( 

4248 select( 

4249 pg_catalog.pg_constraint.c.conrelid, 

4250 pg_catalog.pg_constraint.c.conname, 

4251 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"), 

4252 sql.func.generate_subscripts( 

4253 pg_catalog.pg_index.c.indkey, 1 

4254 ).label("ord"), 

4255 indnkeyatts, 

4256 indnullsnotdistinct, 

4257 pg_catalog.pg_description.c.description, 

4258 ) 

4259 .join( 

4260 pg_catalog.pg_index, 

4261 pg_catalog.pg_constraint.c.conindid 

4262 == pg_catalog.pg_index.c.indexrelid, 

4263 ) 

4264 .outerjoin( 

4265 pg_catalog.pg_description, 

4266 pg_catalog.pg_description.c.objoid 

4267 == pg_catalog.pg_constraint.c.oid, 

4268 ) 

4269 .where( 

4270 pg_catalog.pg_constraint.c.contype == bindparam("contype"), 

4271 pg_catalog.pg_constraint.c.conrelid.in_(bindparam("oids")), 

4272 # NOTE: filtering also on pg_index.indrelid for oids does 

4273 # not seem to have a performance effect, but it may be an 

4274 # option if perf problems are reported 

4275 ) 

4276 .subquery("con") 

4277 ) 

4278 

4279 attr_sq = ( 

4280 select( 

4281 con_sq.c.conrelid, 

4282 con_sq.c.conname, 

4283 con_sq.c.description, 

4284 con_sq.c.ord, 

4285 con_sq.c.indnkeyatts, 

4286 con_sq.c.indnullsnotdistinct, 

4287 pg_catalog.pg_attribute.c.attname, 

4288 ) 

4289 .select_from(pg_catalog.pg_attribute) 

4290 .join( 

4291 con_sq, 

4292 sql.and_( 

4293 pg_catalog.pg_attribute.c.attnum == con_sq.c.attnum, 

4294 pg_catalog.pg_attribute.c.attrelid == con_sq.c.conrelid, 

4295 ), 

4296 ) 

4297 .where( 

4298 # NOTE: restate the condition here, since pg15 otherwise 

4299 # seems to get confused on pscopg2 sometimes, doing 

4300 # a sequential scan of pg_attribute. 

4301 # The condition in the con_sq subquery is not actually needed 

4302 # in pg15, but it may be needed in older versions. Keeping it 

4303 # does not seems to have any impact in any case. 

4304 con_sq.c.conrelid.in_(bindparam("oids")) 

4305 ) 

4306 .subquery("attr") 

4307 ) 

4308 

4309 return ( 

4310 select( 

4311 attr_sq.c.conrelid, 

4312 sql.func.array_agg( 

4313 # NOTE: cast since some postgresql derivatives may 

4314 # not support array_agg on the name type 

4315 aggregate_order_by( 

4316 attr_sq.c.attname.cast(TEXT), attr_sq.c.ord 

4317 ) 

4318 ).label("cols"), 

4319 attr_sq.c.conname, 

4320 sql.func.min(attr_sq.c.description).label("description"), 

4321 sql.func.min(attr_sq.c.indnkeyatts).label("indnkeyatts"), 

4322 sql.func.bool_and(attr_sq.c.indnullsnotdistinct).label( 

4323 "indnullsnotdistinct" 

4324 ), 

4325 ) 

4326 .group_by(attr_sq.c.conrelid, attr_sq.c.conname) 

4327 .order_by(attr_sq.c.conrelid, attr_sq.c.conname) 

4328 ) 

4329 

4330 def _reflect_constraint( 

4331 self, connection, contype, schema, filter_names, scope, kind, **kw 

4332 ): 

4333 # used to reflect primary and unique constraint 

4334 table_oids = self._get_table_oids( 

4335 connection, schema, filter_names, scope, kind, **kw 

4336 ) 

4337 batches = list(table_oids) 

4338 is_unique = contype == "u" 

4339 

4340 while batches: 

4341 batch = batches[0:3000] 

4342 batches[0:3000] = [] 

4343 

4344 result = connection.execute( 

4345 self._constraint_query, 

4346 {"oids": [r[0] for r in batch], "contype": contype}, 

4347 ).mappings() 

4348 

4349 result_by_oid = defaultdict(list) 

4350 for row_dict in result: 

4351 result_by_oid[row_dict["conrelid"]].append(row_dict) 

4352 

4353 for oid, tablename in batch: 

4354 for_oid = result_by_oid.get(oid, ()) 

4355 if for_oid: 

4356 for row in for_oid: 

4357 # See note in get_multi_indexes 

4358 all_cols = row["cols"] 

4359 indnkeyatts = row["indnkeyatts"] 

4360 if len(all_cols) > indnkeyatts: 

4361 inc_cols = all_cols[indnkeyatts:] 

4362 cst_cols = all_cols[:indnkeyatts] 

4363 else: 

4364 inc_cols = [] 

4365 cst_cols = all_cols 

4366 

4367 opts = {} 

4368 if self.server_version_info >= (11,): 

4369 opts["postgresql_include"] = inc_cols 

4370 if is_unique: 

4371 opts["postgresql_nulls_not_distinct"] = row[ 

4372 "indnullsnotdistinct" 

4373 ] 

4374 yield ( 

4375 tablename, 

4376 cst_cols, 

4377 row["conname"], 

4378 row["description"], 

4379 opts, 

4380 ) 

4381 else: 

4382 yield tablename, None, None, None, None 

4383 

4384 @reflection.cache 

4385 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

4386 data = self.get_multi_pk_constraint( 

4387 connection, 

4388 schema=schema, 

4389 filter_names=[table_name], 

4390 scope=ObjectScope.ANY, 

4391 kind=ObjectKind.ANY, 

4392 **kw, 

4393 ) 

4394 return self._value_or_raise(data, table_name, schema) 

4395 

4396 def get_multi_pk_constraint( 

4397 self, connection, schema, filter_names, scope, kind, **kw 

4398 ): 

4399 result = self._reflect_constraint( 

4400 connection, "p", schema, filter_names, scope, kind, **kw 

4401 ) 

4402 

4403 # only a single pk can be present for each table. Return an entry 

4404 # even if a table has no primary key 

4405 default = ReflectionDefaults.pk_constraint 

4406 

4407 def pk_constraint(pk_name, cols, comment, opts): 

4408 info = { 

4409 "constrained_columns": cols, 

4410 "name": pk_name, 

4411 "comment": comment, 

4412 } 

4413 if opts: 

4414 info["dialect_options"] = opts 

4415 return info 

4416 

4417 return ( 

4418 ( 

4419 (schema, table_name), 

4420 ( 

4421 pk_constraint(pk_name, cols, comment, opts) 

4422 if pk_name is not None 

4423 else default() 

4424 ), 

4425 ) 

4426 for table_name, cols, pk_name, comment, opts in result 

4427 ) 

4428 

4429 @reflection.cache 

4430 def get_foreign_keys( 

4431 self, 

4432 connection, 

4433 table_name, 

4434 schema=None, 

4435 postgresql_ignore_search_path=False, 

4436 **kw, 

4437 ): 

4438 data = self.get_multi_foreign_keys( 

4439 connection, 

4440 schema=schema, 

4441 filter_names=[table_name], 

4442 postgresql_ignore_search_path=postgresql_ignore_search_path, 

4443 scope=ObjectScope.ANY, 

4444 kind=ObjectKind.ANY, 

4445 **kw, 

4446 ) 

4447 return self._value_or_raise(data, table_name, schema) 

4448 

4449 @lru_cache() 

4450 def _foreing_key_query(self, schema, has_filter_names, scope, kind): 

4451 pg_class_ref = pg_catalog.pg_class.alias("cls_ref") 

4452 pg_namespace_ref = pg_catalog.pg_namespace.alias("nsp_ref") 

4453 relkinds = self._kind_to_relkinds(kind) 

4454 query = ( 

4455 select( 

4456 pg_catalog.pg_class.c.relname, 

4457 pg_catalog.pg_constraint.c.conname, 

4458 # NOTE: avoid calling pg_get_constraintdef when not needed 

4459 # to speed up the query 

4460 sql.case( 

4461 ( 

4462 pg_catalog.pg_constraint.c.oid.is_not(None), 

4463 pg_catalog.pg_get_constraintdef( 

4464 pg_catalog.pg_constraint.c.oid, True 

4465 ), 

4466 ), 

4467 else_=None, 

4468 ), 

4469 pg_namespace_ref.c.nspname, 

4470 pg_catalog.pg_description.c.description, 

4471 ) 

4472 .select_from(pg_catalog.pg_class) 

4473 .outerjoin( 

4474 pg_catalog.pg_constraint, 

4475 sql.and_( 

4476 pg_catalog.pg_class.c.oid 

4477 == pg_catalog.pg_constraint.c.conrelid, 

4478 pg_catalog.pg_constraint.c.contype == "f", 

4479 ), 

4480 ) 

4481 .outerjoin( 

4482 pg_class_ref, 

4483 pg_class_ref.c.oid == pg_catalog.pg_constraint.c.confrelid, 

4484 ) 

4485 .outerjoin( 

4486 pg_namespace_ref, 

4487 pg_class_ref.c.relnamespace == pg_namespace_ref.c.oid, 

4488 ) 

4489 .outerjoin( 

4490 pg_catalog.pg_description, 

4491 pg_catalog.pg_description.c.objoid 

4492 == pg_catalog.pg_constraint.c.oid, 

4493 ) 

4494 .order_by( 

4495 pg_catalog.pg_class.c.relname, 

4496 pg_catalog.pg_constraint.c.conname, 

4497 ) 

4498 .where(self._pg_class_relkind_condition(relkinds)) 

4499 ) 

4500 query = self._pg_class_filter_scope_schema(query, schema, scope) 

4501 if has_filter_names: 

4502 query = query.where( 

4503 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

4504 ) 

4505 return query 

4506 

4507 @util.memoized_property 

4508 def _fk_regex_pattern(self): 

4509 # optionally quoted token 

4510 qtoken = r'(?:"[^"]+"|[\w]+?)' 

4511 

4512 # https://www.postgresql.org/docs/current/static/sql-createtable.html 

4513 return re.compile( 

4514 r"FOREIGN KEY \((.*?)\) " 

4515 rf"REFERENCES (?:({qtoken})\.)?({qtoken})\(((?:{qtoken}(?: *, *)?)+)\)" # noqa: E501 

4516 r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?" 

4517 r"[\s]?(?:ON (UPDATE|DELETE) " 

4518 r"(CASCADE|RESTRICT|NO ACTION|" 

4519 r"SET (?:NULL|DEFAULT)(?:\s\(.+\))?)+)?" 

4520 r"[\s]?(?:ON (UPDATE|DELETE) " 

4521 r"(CASCADE|RESTRICT|NO ACTION|" 

4522 r"SET (?:NULL|DEFAULT)(?:\s\(.+\))?)+)?" 

4523 r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?" 

4524 r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?" 

4525 ) 

4526 

4527 def _parse_fk(self, condef): 

4528 FK_REGEX = self._fk_regex_pattern 

4529 m = re.search(FK_REGEX, condef).groups() 

4530 

4531 ( 

4532 constrained_columns, 

4533 referred_schema, 

4534 referred_table, 

4535 referred_columns, 

4536 _, 

4537 match, 

4538 upddelkey1, 

4539 upddelval1, 

4540 upddelkey2, 

4541 upddelval2, 

4542 deferrable, 

4543 _, 

4544 initially, 

4545 ) = m 

4546 

4547 onupdate = ( 

4548 upddelval1 

4549 if upddelkey1 == "UPDATE" 

4550 else upddelval2 if upddelkey2 == "UPDATE" else None 

4551 ) 

4552 ondelete = ( 

4553 upddelval1 

4554 if upddelkey1 == "DELETE" 

4555 else upddelval2 if upddelkey2 == "DELETE" else None 

4556 ) 

4557 

4558 return ( 

4559 constrained_columns, 

4560 referred_schema, 

4561 referred_table, 

4562 referred_columns, 

4563 match, 

4564 onupdate, 

4565 ondelete, 

4566 deferrable, 

4567 initially, 

4568 ) 

4569 

4570 def get_multi_foreign_keys( 

4571 self, 

4572 connection, 

4573 schema, 

4574 filter_names, 

4575 scope, 

4576 kind, 

4577 postgresql_ignore_search_path=False, 

4578 **kw, 

4579 ): 

4580 preparer = self.identifier_preparer 

4581 

4582 has_filter_names, params = self._prepare_filter_names(filter_names) 

4583 query = self._foreing_key_query(schema, has_filter_names, scope, kind) 

4584 result = connection.execute(query, params) 

4585 

4586 fkeys = defaultdict(list) 

4587 default = ReflectionDefaults.foreign_keys 

4588 for table_name, conname, condef, conschema, comment in result: 

4589 # ensure that each table has an entry, even if it has 

4590 # no foreign keys 

4591 if conname is None: 

4592 fkeys[(schema, table_name)] = default() 

4593 continue 

4594 table_fks = fkeys[(schema, table_name)] 

4595 

4596 ( 

4597 constrained_columns, 

4598 referred_schema, 

4599 referred_table, 

4600 referred_columns, 

4601 match, 

4602 onupdate, 

4603 ondelete, 

4604 deferrable, 

4605 initially, 

4606 ) = self._parse_fk(condef) 

4607 

4608 if deferrable is not None: 

4609 deferrable = True if deferrable == "DEFERRABLE" else False 

4610 constrained_columns = [ 

4611 preparer._unquote_identifier(x) 

4612 for x in re.split(r"\s*,\s*", constrained_columns) 

4613 ] 

4614 

4615 if postgresql_ignore_search_path: 

4616 # when ignoring search path, we use the actual schema 

4617 # provided it isn't the "default" schema 

4618 if conschema != self.default_schema_name: 

4619 referred_schema = conschema 

4620 else: 

4621 referred_schema = schema 

4622 elif referred_schema: 

4623 # referred_schema is the schema that we regexp'ed from 

4624 # pg_get_constraintdef(). If the schema is in the search 

4625 # path, pg_get_constraintdef() will give us None. 

4626 referred_schema = preparer._unquote_identifier(referred_schema) 

4627 elif schema is not None and schema == conschema: 

4628 # If the actual schema matches the schema of the table 

4629 # we're reflecting, then we will use that. 

4630 referred_schema = schema 

4631 

4632 referred_table = preparer._unquote_identifier(referred_table) 

4633 referred_columns = [ 

4634 preparer._unquote_identifier(x) 

4635 for x in re.split(r"\s*,\s", referred_columns) 

4636 ] 

4637 options = { 

4638 k: v 

4639 for k, v in [ 

4640 ("onupdate", onupdate), 

4641 ("ondelete", ondelete), 

4642 ("initially", initially), 

4643 ("deferrable", deferrable), 

4644 ("match", match), 

4645 ] 

4646 if v is not None and v != "NO ACTION" 

4647 } 

4648 fkey_d = { 

4649 "name": conname, 

4650 "constrained_columns": constrained_columns, 

4651 "referred_schema": referred_schema, 

4652 "referred_table": referred_table, 

4653 "referred_columns": referred_columns, 

4654 "options": options, 

4655 "comment": comment, 

4656 } 

4657 table_fks.append(fkey_d) 

4658 return fkeys.items() 

4659 

4660 @reflection.cache 

4661 def get_indexes(self, connection, table_name, schema=None, **kw): 

4662 data = self.get_multi_indexes( 

4663 connection, 

4664 schema=schema, 

4665 filter_names=[table_name], 

4666 scope=ObjectScope.ANY, 

4667 kind=ObjectKind.ANY, 

4668 **kw, 

4669 ) 

4670 return self._value_or_raise(data, table_name, schema) 

4671 

4672 @util.memoized_property 

4673 def _index_query(self): 

4674 # NOTE: pg_index is used as from two times to improve performance, 

4675 # since extraing all the index information from `idx_sq` to avoid 

4676 # the second pg_index use leads to a worse performing query in 

4677 # particular when querying for a single table (as of pg 17) 

4678 # NOTE: repeating oids clause improve query performance 

4679 

4680 # subquery to get the columns 

4681 idx_sq = ( 

4682 select( 

4683 pg_catalog.pg_index.c.indexrelid, 

4684 pg_catalog.pg_index.c.indrelid, 

4685 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"), 

4686 sql.func.unnest(pg_catalog.pg_index.c.indclass).label( 

4687 "att_opclass" 

4688 ), 

4689 sql.func.generate_subscripts( 

4690 pg_catalog.pg_index.c.indkey, 1 

4691 ).label("ord"), 

4692 ) 

4693 .where( 

4694 ~pg_catalog.pg_index.c.indisprimary, 

4695 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), 

4696 ) 

4697 .subquery("idx") 

4698 ) 

4699 

4700 attr_sq = ( 

4701 select( 

4702 idx_sq.c.indexrelid, 

4703 idx_sq.c.indrelid, 

4704 idx_sq.c.ord, 

4705 # NOTE: always using pg_get_indexdef is too slow so just 

4706 # invoke when the element is an expression 

4707 sql.case( 

4708 ( 

4709 idx_sq.c.attnum == 0, 

4710 pg_catalog.pg_get_indexdef( 

4711 idx_sq.c.indexrelid, idx_sq.c.ord + 1, True 

4712 ), 

4713 ), 

4714 # NOTE: need to cast this since attname is of type "name" 

4715 # that's limited to 63 bytes, while pg_get_indexdef 

4716 # returns "text" so its output may get cut 

4717 else_=pg_catalog.pg_attribute.c.attname.cast(TEXT), 

4718 ).label("element"), 

4719 (idx_sq.c.attnum == 0).label("is_expr"), 

4720 pg_catalog.pg_opclass.c.opcname, 

4721 pg_catalog.pg_opclass.c.opcdefault, 

4722 ) 

4723 .select_from(idx_sq) 

4724 .outerjoin( 

4725 # do not remove rows where idx_sq.c.attnum is 0 

4726 pg_catalog.pg_attribute, 

4727 sql.and_( 

4728 pg_catalog.pg_attribute.c.attnum == idx_sq.c.attnum, 

4729 pg_catalog.pg_attribute.c.attrelid == idx_sq.c.indrelid, 

4730 ), 

4731 ) 

4732 .outerjoin( 

4733 pg_catalog.pg_opclass, 

4734 pg_catalog.pg_opclass.c.oid == idx_sq.c.att_opclass, 

4735 ) 

4736 .where(idx_sq.c.indrelid.in_(bindparam("oids"))) 

4737 .subquery("idx_attr") 

4738 ) 

4739 

4740 cols_sq = ( 

4741 select( 

4742 attr_sq.c.indexrelid, 

4743 sql.func.min(attr_sq.c.indrelid), 

4744 sql.func.array_agg( 

4745 aggregate_order_by(attr_sq.c.element, attr_sq.c.ord) 

4746 ).label("elements"), 

4747 sql.func.array_agg( 

4748 aggregate_order_by(attr_sq.c.is_expr, attr_sq.c.ord) 

4749 ).label("elements_is_expr"), 

4750 sql.func.array_agg( 

4751 aggregate_order_by(attr_sq.c.opcname, attr_sq.c.ord) 

4752 ).label("elements_opclass"), 

4753 sql.func.array_agg( 

4754 aggregate_order_by(attr_sq.c.opcdefault, attr_sq.c.ord) 

4755 ).label("elements_opdefault"), 

4756 ) 

4757 .group_by(attr_sq.c.indexrelid) 

4758 .subquery("idx_cols") 

4759 ) 

4760 

4761 if self.server_version_info >= (11, 0): 

4762 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts 

4763 else: 

4764 indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts") 

4765 

4766 if self.server_version_info >= (15,): 

4767 nulls_not_distinct = pg_catalog.pg_index.c.indnullsnotdistinct 

4768 else: 

4769 nulls_not_distinct = sql.false().label("indnullsnotdistinct") 

4770 

4771 return ( 

4772 select( 

4773 pg_catalog.pg_index.c.indrelid, 

4774 pg_catalog.pg_class.c.relname, 

4775 pg_catalog.pg_index.c.indisunique, 

4776 pg_catalog.pg_constraint.c.conrelid.is_not(None).label( 

4777 "has_constraint" 

4778 ), 

4779 pg_catalog.pg_index.c.indoption, 

4780 pg_catalog.pg_class.c.reloptions, 

4781 pg_catalog.pg_am.c.amname, 

4782 # NOTE: pg_get_expr is very fast so this case has almost no 

4783 # performance impact 

4784 sql.case( 

4785 ( 

4786 pg_catalog.pg_index.c.indpred.is_not(None), 

4787 pg_catalog.pg_get_expr( 

4788 pg_catalog.pg_index.c.indpred, 

4789 pg_catalog.pg_index.c.indrelid, 

4790 ), 

4791 ), 

4792 else_=None, 

4793 ).label("filter_definition"), 

4794 indnkeyatts, 

4795 nulls_not_distinct, 

4796 cols_sq.c.elements, 

4797 cols_sq.c.elements_is_expr, 

4798 cols_sq.c.elements_opclass, 

4799 cols_sq.c.elements_opdefault, 

4800 ) 

4801 .select_from(pg_catalog.pg_index) 

4802 .where( 

4803 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), 

4804 ~pg_catalog.pg_index.c.indisprimary, 

4805 ) 

4806 .join( 

4807 pg_catalog.pg_class, 

4808 pg_catalog.pg_index.c.indexrelid == pg_catalog.pg_class.c.oid, 

4809 ) 

4810 .join( 

4811 pg_catalog.pg_am, 

4812 pg_catalog.pg_class.c.relam == pg_catalog.pg_am.c.oid, 

4813 ) 

4814 .outerjoin( 

4815 cols_sq, 

4816 pg_catalog.pg_index.c.indexrelid == cols_sq.c.indexrelid, 

4817 ) 

4818 .outerjoin( 

4819 pg_catalog.pg_constraint, 

4820 sql.and_( 

4821 pg_catalog.pg_index.c.indrelid 

4822 == pg_catalog.pg_constraint.c.conrelid, 

4823 pg_catalog.pg_index.c.indexrelid 

4824 == pg_catalog.pg_constraint.c.conindid, 

4825 pg_catalog.pg_constraint.c.contype 

4826 == sql.any_(_array.array(("p", "u", "x"))), 

4827 ), 

4828 ) 

4829 .order_by( 

4830 pg_catalog.pg_index.c.indrelid, pg_catalog.pg_class.c.relname 

4831 ) 

4832 ) 

4833 

4834 def get_multi_indexes( 

4835 self, connection, schema, filter_names, scope, kind, **kw 

4836 ): 

4837 table_oids = self._get_table_oids( 

4838 connection, schema, filter_names, scope, kind, **kw 

4839 ) 

4840 

4841 indexes = defaultdict(list) 

4842 default = ReflectionDefaults.indexes 

4843 

4844 batches = list(table_oids) 

4845 

4846 while batches: 

4847 batch = batches[0:3000] 

4848 batches[0:3000] = [] 

4849 

4850 result = connection.execute( 

4851 self._index_query, {"oids": [r[0] for r in batch]} 

4852 ).mappings() 

4853 

4854 result_by_oid = defaultdict(list) 

4855 for row_dict in result: 

4856 result_by_oid[row_dict["indrelid"]].append(row_dict) 

4857 

4858 for oid, table_name in batch: 

4859 if oid not in result_by_oid: 

4860 # ensure that each table has an entry, even if reflection 

4861 # is skipped because not supported 

4862 indexes[(schema, table_name)] = default() 

4863 continue 

4864 

4865 for row in result_by_oid[oid]: 

4866 index_name = row["relname"] 

4867 

4868 table_indexes = indexes[(schema, table_name)] 

4869 

4870 all_elements = row["elements"] 

4871 all_elements_is_expr = row["elements_is_expr"] 

4872 all_elements_opclass = row["elements_opclass"] 

4873 all_elements_opdefault = row["elements_opdefault"] 

4874 indnkeyatts = row["indnkeyatts"] 

4875 # "The number of key columns in the index, not counting any 

4876 # included columns, which are merely stored and do not 

4877 # participate in the index semantics" 

4878 if len(all_elements) > indnkeyatts: 

4879 # this is a "covering index" which has INCLUDE columns 

4880 # as well as regular index columns 

4881 inc_cols = all_elements[indnkeyatts:] 

4882 idx_elements = all_elements[:indnkeyatts] 

4883 idx_elements_is_expr = all_elements_is_expr[ 

4884 :indnkeyatts 

4885 ] 

4886 # postgresql does not support expression on included 

4887 # columns as of v14: "ERROR: expressions are not 

4888 # supported in included columns". 

4889 assert all( 

4890 not is_expr 

4891 for is_expr in all_elements_is_expr[indnkeyatts:] 

4892 ) 

4893 idx_elements_opclass = all_elements_opclass[ 

4894 :indnkeyatts 

4895 ] 

4896 idx_elements_opdefault = all_elements_opdefault[ 

4897 :indnkeyatts 

4898 ] 

4899 else: 

4900 idx_elements = all_elements 

4901 idx_elements_is_expr = all_elements_is_expr 

4902 inc_cols = [] 

4903 idx_elements_opclass = all_elements_opclass 

4904 idx_elements_opdefault = all_elements_opdefault 

4905 

4906 index = {"name": index_name, "unique": row["indisunique"]} 

4907 if any(idx_elements_is_expr): 

4908 index["column_names"] = [ 

4909 None if is_expr else expr 

4910 for expr, is_expr in zip( 

4911 idx_elements, idx_elements_is_expr 

4912 ) 

4913 ] 

4914 index["expressions"] = idx_elements 

4915 else: 

4916 index["column_names"] = idx_elements 

4917 

4918 dialect_options = {} 

4919 

4920 if not all(idx_elements_opdefault): 

4921 dialect_options["postgresql_ops"] = { 

4922 name: opclass 

4923 for name, opclass, is_default in zip( 

4924 idx_elements, 

4925 idx_elements_opclass, 

4926 idx_elements_opdefault, 

4927 ) 

4928 if not is_default 

4929 } 

4930 

4931 sorting = {} 

4932 for col_index, col_flags in enumerate(row["indoption"]): 

4933 col_sorting = () 

4934 # try to set flags only if they differ from PG 

4935 # defaults... 

4936 if col_flags & 0x01: 

4937 col_sorting += ("desc",) 

4938 if not (col_flags & 0x02): 

4939 col_sorting += ("nulls_last",) 

4940 else: 

4941 if col_flags & 0x02: 

4942 col_sorting += ("nulls_first",) 

4943 if col_sorting: 

4944 sorting[idx_elements[col_index]] = col_sorting 

4945 if sorting: 

4946 index["column_sorting"] = sorting 

4947 if row["has_constraint"]: 

4948 index["duplicates_constraint"] = index_name 

4949 

4950 if row["reloptions"]: 

4951 dialect_options["postgresql_with"] = dict( 

4952 [ 

4953 option.split("=", 1) 

4954 for option in row["reloptions"] 

4955 ] 

4956 ) 

4957 # it *might* be nice to include that this is 'btree' in the 

4958 # reflection info. But we don't want an Index object 

4959 # to have a ``postgresql_using`` in it that is just the 

4960 # default, so for the moment leaving this out. 

4961 amname = row["amname"] 

4962 if amname != "btree": 

4963 dialect_options["postgresql_using"] = row["amname"] 

4964 if row["filter_definition"]: 

4965 dialect_options["postgresql_where"] = row[ 

4966 "filter_definition" 

4967 ] 

4968 if self.server_version_info >= (11,): 

4969 # NOTE: this is legacy, this is part of 

4970 # dialect_options now as of #7382 

4971 index["include_columns"] = inc_cols 

4972 dialect_options["postgresql_include"] = inc_cols 

4973 if row["indnullsnotdistinct"]: 

4974 # the default is False, so ignore it. 

4975 dialect_options["postgresql_nulls_not_distinct"] = row[ 

4976 "indnullsnotdistinct" 

4977 ] 

4978 

4979 if dialect_options: 

4980 index["dialect_options"] = dialect_options 

4981 

4982 table_indexes.append(index) 

4983 return indexes.items() 

4984 

4985 @reflection.cache 

4986 def get_unique_constraints( 

4987 self, connection, table_name, schema=None, **kw 

4988 ): 

4989 data = self.get_multi_unique_constraints( 

4990 connection, 

4991 schema=schema, 

4992 filter_names=[table_name], 

4993 scope=ObjectScope.ANY, 

4994 kind=ObjectKind.ANY, 

4995 **kw, 

4996 ) 

4997 return self._value_or_raise(data, table_name, schema) 

4998 

4999 def get_multi_unique_constraints( 

5000 self, 

5001 connection, 

5002 schema, 

5003 filter_names, 

5004 scope, 

5005 kind, 

5006 **kw, 

5007 ): 

5008 result = self._reflect_constraint( 

5009 connection, "u", schema, filter_names, scope, kind, **kw 

5010 ) 

5011 

5012 # each table can have multiple unique constraints 

5013 uniques = defaultdict(list) 

5014 default = ReflectionDefaults.unique_constraints 

5015 for table_name, cols, con_name, comment, options in result: 

5016 # ensure a list is created for each table. leave it empty if 

5017 # the table has no unique constraint 

5018 if con_name is None: 

5019 uniques[(schema, table_name)] = default() 

5020 continue 

5021 

5022 uc_dict = { 

5023 "column_names": cols, 

5024 "name": con_name, 

5025 "comment": comment, 

5026 } 

5027 if options: 

5028 uc_dict["dialect_options"] = options 

5029 

5030 uniques[(schema, table_name)].append(uc_dict) 

5031 return uniques.items() 

5032 

5033 @reflection.cache 

5034 def get_table_comment(self, connection, table_name, schema=None, **kw): 

5035 data = self.get_multi_table_comment( 

5036 connection, 

5037 schema, 

5038 [table_name], 

5039 scope=ObjectScope.ANY, 

5040 kind=ObjectKind.ANY, 

5041 **kw, 

5042 ) 

5043 return self._value_or_raise(data, table_name, schema) 

5044 

5045 @lru_cache() 

5046 def _comment_query(self, schema, has_filter_names, scope, kind): 

5047 relkinds = self._kind_to_relkinds(kind) 

5048 query = ( 

5049 select( 

5050 pg_catalog.pg_class.c.relname, 

5051 pg_catalog.pg_description.c.description, 

5052 ) 

5053 .select_from(pg_catalog.pg_class) 

5054 .outerjoin( 

5055 pg_catalog.pg_description, 

5056 sql.and_( 

5057 pg_catalog.pg_class.c.oid 

5058 == pg_catalog.pg_description.c.objoid, 

5059 pg_catalog.pg_description.c.objsubid == 0, 

5060 pg_catalog.pg_description.c.classoid 

5061 == sql.func.cast("pg_catalog.pg_class", REGCLASS), 

5062 ), 

5063 ) 

5064 .where(self._pg_class_relkind_condition(relkinds)) 

5065 ) 

5066 query = self._pg_class_filter_scope_schema(query, schema, scope) 

5067 if has_filter_names: 

5068 query = query.where( 

5069 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

5070 ) 

5071 return query 

5072 

5073 def get_multi_table_comment( 

5074 self, connection, schema, filter_names, scope, kind, **kw 

5075 ): 

5076 has_filter_names, params = self._prepare_filter_names(filter_names) 

5077 query = self._comment_query(schema, has_filter_names, scope, kind) 

5078 result = connection.execute(query, params) 

5079 

5080 default = ReflectionDefaults.table_comment 

5081 return ( 

5082 ( 

5083 (schema, table), 

5084 {"text": comment} if comment is not None else default(), 

5085 ) 

5086 for table, comment in result 

5087 ) 

5088 

5089 @reflection.cache 

5090 def get_check_constraints(self, connection, table_name, schema=None, **kw): 

5091 data = self.get_multi_check_constraints( 

5092 connection, 

5093 schema, 

5094 [table_name], 

5095 scope=ObjectScope.ANY, 

5096 kind=ObjectKind.ANY, 

5097 **kw, 

5098 ) 

5099 return self._value_or_raise(data, table_name, schema) 

5100 

5101 @lru_cache() 

5102 def _check_constraint_query(self, schema, has_filter_names, scope, kind): 

5103 relkinds = self._kind_to_relkinds(kind) 

5104 query = ( 

5105 select( 

5106 pg_catalog.pg_class.c.relname, 

5107 pg_catalog.pg_constraint.c.conname, 

5108 # NOTE: avoid calling pg_get_constraintdef when not needed 

5109 # to speed up the query 

5110 sql.case( 

5111 ( 

5112 pg_catalog.pg_constraint.c.oid.is_not(None), 

5113 pg_catalog.pg_get_constraintdef( 

5114 pg_catalog.pg_constraint.c.oid, True 

5115 ), 

5116 ), 

5117 else_=None, 

5118 ), 

5119 pg_catalog.pg_description.c.description, 

5120 ) 

5121 .select_from(pg_catalog.pg_class) 

5122 .outerjoin( 

5123 pg_catalog.pg_constraint, 

5124 sql.and_( 

5125 pg_catalog.pg_class.c.oid 

5126 == pg_catalog.pg_constraint.c.conrelid, 

5127 pg_catalog.pg_constraint.c.contype == "c", 

5128 ), 

5129 ) 

5130 .outerjoin( 

5131 pg_catalog.pg_description, 

5132 pg_catalog.pg_description.c.objoid 

5133 == pg_catalog.pg_constraint.c.oid, 

5134 ) 

5135 .order_by( 

5136 pg_catalog.pg_class.c.relname, 

5137 pg_catalog.pg_constraint.c.conname, 

5138 ) 

5139 .where(self._pg_class_relkind_condition(relkinds)) 

5140 ) 

5141 query = self._pg_class_filter_scope_schema(query, schema, scope) 

5142 if has_filter_names: 

5143 query = query.where( 

5144 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

5145 ) 

5146 return query 

5147 

5148 def get_multi_check_constraints( 

5149 self, connection, schema, filter_names, scope, kind, **kw 

5150 ): 

5151 has_filter_names, params = self._prepare_filter_names(filter_names) 

5152 query = self._check_constraint_query( 

5153 schema, has_filter_names, scope, kind 

5154 ) 

5155 result = connection.execute(query, params) 

5156 

5157 check_constraints = defaultdict(list) 

5158 default = ReflectionDefaults.check_constraints 

5159 for table_name, check_name, src, comment in result: 

5160 # only two cases for check_name and src: both null or both defined 

5161 if check_name is None and src is None: 

5162 check_constraints[(schema, table_name)] = default() 

5163 continue 

5164 # samples: 

5165 # "CHECK (((a > 1) AND (a < 5)))" 

5166 # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))" 

5167 # "CHECK (((a > 1) AND (a < 5))) NOT VALID" 

5168 # "CHECK (some_boolean_function(a))" 

5169 # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)" 

5170 # "CHECK (a NOT NULL) NO INHERIT" 

5171 # "CHECK (a NOT NULL) NO INHERIT NOT VALID" 

5172 

5173 m = re.match( 

5174 r"^CHECK *\((.+)\)( NO INHERIT)?( NOT VALID)?$", 

5175 src, 

5176 flags=re.DOTALL, 

5177 ) 

5178 if not m: 

5179 util.warn("Could not parse CHECK constraint text: %r" % src) 

5180 sqltext = "" 

5181 else: 

5182 sqltext = re.compile( 

5183 r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL 

5184 ).sub(r"\1", m.group(1)) 

5185 entry = { 

5186 "name": check_name, 

5187 "sqltext": sqltext, 

5188 "comment": comment, 

5189 } 

5190 if m: 

5191 do = {} 

5192 if " NOT VALID" in m.groups(): 

5193 do["not_valid"] = True 

5194 if " NO INHERIT" in m.groups(): 

5195 do["no_inherit"] = True 

5196 if do: 

5197 entry["dialect_options"] = do 

5198 

5199 check_constraints[(schema, table_name)].append(entry) 

5200 return check_constraints.items() 

5201 

5202 def _pg_type_filter_schema(self, query, schema): 

5203 if schema is None: 

5204 query = query.where( 

5205 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), 

5206 # ignore pg_catalog schema 

5207 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

5208 ) 

5209 elif schema != "*": 

5210 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

5211 return query 

5212 

5213 @lru_cache() 

5214 def _enum_query(self, schema): 

5215 lbl_agg_sq = ( 

5216 select( 

5217 pg_catalog.pg_enum.c.enumtypid, 

5218 sql.func.array_agg( 

5219 aggregate_order_by( 

5220 # NOTE: cast since some postgresql derivatives may 

5221 # not support array_agg on the name type 

5222 pg_catalog.pg_enum.c.enumlabel.cast(TEXT), 

5223 pg_catalog.pg_enum.c.enumsortorder, 

5224 ) 

5225 ).label("labels"), 

5226 ) 

5227 .group_by(pg_catalog.pg_enum.c.enumtypid) 

5228 .subquery("lbl_agg") 

5229 ) 

5230 

5231 query = ( 

5232 select( 

5233 pg_catalog.pg_type.c.typname.label("name"), 

5234 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( 

5235 "visible" 

5236 ), 

5237 pg_catalog.pg_namespace.c.nspname.label("schema"), 

5238 lbl_agg_sq.c.labels.label("labels"), 

5239 ) 

5240 .join( 

5241 pg_catalog.pg_namespace, 

5242 pg_catalog.pg_namespace.c.oid 

5243 == pg_catalog.pg_type.c.typnamespace, 

5244 ) 

5245 .outerjoin( 

5246 lbl_agg_sq, pg_catalog.pg_type.c.oid == lbl_agg_sq.c.enumtypid 

5247 ) 

5248 .where(pg_catalog.pg_type.c.typtype == "e") 

5249 .order_by( 

5250 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname 

5251 ) 

5252 ) 

5253 

5254 return self._pg_type_filter_schema(query, schema) 

5255 

5256 @reflection.cache 

5257 def _load_enums(self, connection, schema=None, **kw): 

5258 if not self.supports_native_enum: 

5259 return [] 

5260 

5261 result = connection.execute(self._enum_query(schema)) 

5262 

5263 enums = [] 

5264 for name, visible, schema, labels in result: 

5265 enums.append( 

5266 { 

5267 "name": name, 

5268 "schema": schema, 

5269 "visible": visible, 

5270 "labels": [] if labels is None else labels, 

5271 } 

5272 ) 

5273 return enums 

5274 

5275 @lru_cache() 

5276 def _domain_query(self, schema): 

5277 con_sq = ( 

5278 select( 

5279 pg_catalog.pg_constraint.c.contypid, 

5280 sql.func.array_agg( 

5281 pg_catalog.pg_get_constraintdef( 

5282 pg_catalog.pg_constraint.c.oid, True 

5283 ) 

5284 ).label("condefs"), 

5285 sql.func.array_agg( 

5286 # NOTE: cast since some postgresql derivatives may 

5287 # not support array_agg on the name type 

5288 pg_catalog.pg_constraint.c.conname.cast(TEXT) 

5289 ).label("connames"), 

5290 ) 

5291 # The domain this constraint is on; zero if not a domain constraint 

5292 .where(pg_catalog.pg_constraint.c.contypid != 0) 

5293 .group_by(pg_catalog.pg_constraint.c.contypid) 

5294 .subquery("domain_constraints") 

5295 ) 

5296 

5297 query = ( 

5298 select( 

5299 pg_catalog.pg_type.c.typname.label("name"), 

5300 pg_catalog.format_type( 

5301 pg_catalog.pg_type.c.typbasetype, 

5302 pg_catalog.pg_type.c.typtypmod, 

5303 ).label("attype"), 

5304 (~pg_catalog.pg_type.c.typnotnull).label("nullable"), 

5305 pg_catalog.pg_type.c.typdefault.label("default"), 

5306 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( 

5307 "visible" 

5308 ), 

5309 pg_catalog.pg_namespace.c.nspname.label("schema"), 

5310 con_sq.c.condefs, 

5311 con_sq.c.connames, 

5312 pg_catalog.pg_collation.c.collname, 

5313 ) 

5314 .join( 

5315 pg_catalog.pg_namespace, 

5316 pg_catalog.pg_namespace.c.oid 

5317 == pg_catalog.pg_type.c.typnamespace, 

5318 ) 

5319 .outerjoin( 

5320 pg_catalog.pg_collation, 

5321 pg_catalog.pg_type.c.typcollation 

5322 == pg_catalog.pg_collation.c.oid, 

5323 ) 

5324 .outerjoin( 

5325 con_sq, 

5326 pg_catalog.pg_type.c.oid == con_sq.c.contypid, 

5327 ) 

5328 .where(pg_catalog.pg_type.c.typtype == "d") 

5329 .order_by( 

5330 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname 

5331 ) 

5332 ) 

5333 return self._pg_type_filter_schema(query, schema) 

5334 

5335 @reflection.cache 

5336 def _load_domains(self, connection, schema=None, **kw): 

5337 result = connection.execute(self._domain_query(schema)) 

5338 

5339 domains: List[ReflectedDomain] = [] 

5340 for domain in result.mappings(): 

5341 # strip (30) from character varying(30) 

5342 attype = re.search(r"([^\(]+)", domain["attype"]).group(1) 

5343 constraints: List[ReflectedDomainConstraint] = [] 

5344 if domain["connames"]: 

5345 # When a domain has multiple CHECK constraints, they will 

5346 # be tested in alphabetical order by name. 

5347 sorted_constraints = sorted( 

5348 zip(domain["connames"], domain["condefs"]), 

5349 key=lambda t: t[0], 

5350 ) 

5351 for name, def_ in sorted_constraints: 

5352 # constraint is in the form "CHECK (expression)" 

5353 # or "NOT NULL". Ignore the "NOT NULL" and 

5354 # remove "CHECK (" and the tailing ")". 

5355 if def_.casefold().startswith("check"): 

5356 check = def_[7:-1] 

5357 constraints.append({"name": name, "check": check}) 

5358 domain_rec: ReflectedDomain = { 

5359 "name": domain["name"], 

5360 "schema": domain["schema"], 

5361 "visible": domain["visible"], 

5362 "type": attype, 

5363 "nullable": domain["nullable"], 

5364 "default": domain["default"], 

5365 "constraints": constraints, 

5366 "collation": domain["collname"], 

5367 } 

5368 domains.append(domain_rec) 

5369 

5370 return domains 

5371 

5372 def _set_backslash_escapes(self, connection): 

5373 # this method is provided as an override hook for descendant 

5374 # dialects (e.g. Redshift), so removing it may break them 

5375 std_string = connection.exec_driver_sql( 

5376 "show standard_conforming_strings" 

5377 ).scalar() 

5378 self._backslash_escapes = std_string == "off"