Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1415 statements  

1# dialects/postgresql/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9r""" 

10.. dialect:: postgresql 

11 :name: PostgreSQL 

12 :normal_support: 9.6+ 

13 :best_effort: 9+ 

14 

15.. _postgresql_sequences: 

16 

17Sequences/SERIAL/IDENTITY 

18------------------------- 

19 

20PostgreSQL supports sequences, and SQLAlchemy uses these as the default means 

21of creating new primary key values for integer-based primary key columns. When 

22creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for 

23integer-based primary key columns, which generates a sequence and server side 

24default corresponding to the column. 

25 

26To specify a specific named sequence to be used for primary key generation, 

27use the :func:`~sqlalchemy.schema.Sequence` construct:: 

28 

29 Table( 

30 "sometable", 

31 metadata, 

32 Column( 

33 "id", Integer, Sequence("some_id_seq", start=1), primary_key=True 

34 ), 

35 ) 

36 

37When SQLAlchemy issues a single INSERT statement, to fulfill the contract of 

38having the "last insert identifier" available, a RETURNING clause is added to 

39the INSERT statement which specifies the primary key columns should be 

40returned after the statement completes. The RETURNING functionality only takes 

41place if PostgreSQL 8.2 or later is in use. As a fallback approach, the 

42sequence, whether specified explicitly or implicitly via ``SERIAL``, is 

43executed independently beforehand, the returned value to be used in the 

44subsequent insert. Note that when an 

45:func:`~sqlalchemy.sql.expression.insert()` construct is executed using 

46"executemany" semantics, the "last inserted identifier" functionality does not 

47apply; no RETURNING clause is emitted nor is the sequence pre-executed in this 

48case. 

49 

50 

51PostgreSQL 10 and above IDENTITY columns 

52^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

53 

54PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use 

55of SERIAL. The :class:`_schema.Identity` construct in a 

56:class:`_schema.Column` can be used to control its behavior:: 

57 

58 from sqlalchemy import Table, Column, MetaData, Integer, Computed 

59 

60 metadata = MetaData() 

61 

62 data = Table( 

63 "data", 

64 metadata, 

65 Column( 

66 "id", Integer, Identity(start=42, cycle=True), primary_key=True 

67 ), 

68 Column("data", String), 

69 ) 

70 

71The CREATE TABLE for the above :class:`_schema.Table` object would be: 

72 

73.. sourcecode:: sql 

74 

75 CREATE TABLE data ( 

76 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE), 

77 data VARCHAR, 

78 PRIMARY KEY (id) 

79 ) 

80 

81.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct 

82 in a :class:`_schema.Column` to specify the option of an autoincrementing 

83 column. 

84 

85.. note:: 

86 

87 Previous versions of SQLAlchemy did not have built-in support for rendering 

88 of IDENTITY, and could use the following compilation hook to replace 

89 occurrences of SERIAL with IDENTITY:: 

90 

91 from sqlalchemy.schema import CreateColumn 

92 from sqlalchemy.ext.compiler import compiles 

93 

94 

95 @compiles(CreateColumn, "postgresql") 

96 def use_identity(element, compiler, **kw): 

97 text = compiler.visit_create_column(element, **kw) 

98 text = text.replace("SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY") 

99 return text 

100 

101 Using the above, a table such as:: 

102 

103 t = Table( 

104 "t", m, Column("id", Integer, primary_key=True), Column("data", String) 

105 ) 

106 

107 Will generate on the backing database as: 

108 

109 .. sourcecode:: sql 

110 

111 CREATE TABLE t ( 

112 id INT GENERATED BY DEFAULT AS IDENTITY, 

113 data VARCHAR, 

114 PRIMARY KEY (id) 

115 ) 

116 

117.. _postgresql_ss_cursors: 

118 

119Server Side Cursors 

120------------------- 

121 

122Server-side cursor support is available for the psycopg2, asyncpg 

123dialects and may also be available in others. 

124 

125Server side cursors are enabled on a per-statement basis by using the 

126:paramref:`.Connection.execution_options.stream_results` connection execution 

127option:: 

128 

129 with engine.connect() as conn: 

130 result = conn.execution_options(stream_results=True).execute( 

131 text("select * from table") 

132 ) 

133 

134Note that some kinds of SQL statements may not be supported with 

135server side cursors; generally, only SQL statements that return rows should be 

136used with this option. 

137 

138.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated 

139 and will be removed in a future release. Please use the 

140 :paramref:`_engine.Connection.stream_results` execution option for 

141 unbuffered cursor support. 

142 

143.. seealso:: 

144 

145 :ref:`engine_stream_results` 

146 

147.. _postgresql_isolation_level: 

148 

149Transaction Isolation Level 

150--------------------------- 

151 

152Most SQLAlchemy dialects support setting of transaction isolation level 

153using the :paramref:`_sa.create_engine.isolation_level` parameter 

154at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection` 

155level via the :paramref:`.Connection.execution_options.isolation_level` 

156parameter. 

157 

158For PostgreSQL dialects, this feature works either by making use of the 

159DBAPI-specific features, such as psycopg2's isolation level flags which will 

160embed the isolation level setting inline with the ``"BEGIN"`` statement, or for 

161DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS 

162TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement 

163emitted by the DBAPI. For the special AUTOCOMMIT isolation level, 

164DBAPI-specific techniques are used which is typically an ``.autocommit`` 

165flag on the DBAPI connection object. 

166 

167To set isolation level using :func:`_sa.create_engine`:: 

168 

169 engine = create_engine( 

170 "postgresql+pg8000://scott:tiger@localhost/test", 

171 isolation_level="REPEATABLE READ", 

172 ) 

173 

174To set using per-connection execution options:: 

175 

176 with engine.connect() as conn: 

177 conn = conn.execution_options(isolation_level="REPEATABLE READ") 

178 with conn.begin(): 

179 ... # work with transaction 

180 

181There are also more options for isolation level configurations, such as 

182"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply 

183different isolation level settings. See the discussion at 

184:ref:`dbapi_autocommit` for background. 

185 

186Valid values for ``isolation_level`` on most PostgreSQL dialects include: 

187 

188* ``READ COMMITTED`` 

189* ``READ UNCOMMITTED`` 

190* ``REPEATABLE READ`` 

191* ``SERIALIZABLE`` 

192* ``AUTOCOMMIT`` 

193 

194.. seealso:: 

195 

196 :ref:`dbapi_autocommit` 

197 

198 :ref:`postgresql_readonly_deferrable` 

199 

200 :ref:`psycopg2_isolation_level` 

201 

202 :ref:`pg8000_isolation_level` 

203 

204.. _postgresql_readonly_deferrable: 

205 

206Setting READ ONLY / DEFERRABLE 

207------------------------------ 

208 

209Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE" 

210characteristics of the transaction, which is in addition to the isolation level 

211setting. These two attributes can be established either in conjunction with or 

212independently of the isolation level by passing the ``postgresql_readonly`` and 

213``postgresql_deferrable`` flags with 

214:meth:`_engine.Connection.execution_options`. The example below illustrates 

215passing the ``"SERIALIZABLE"`` isolation level at the same time as setting 

216"READ ONLY" and "DEFERRABLE":: 

217 

218 with engine.connect() as conn: 

219 conn = conn.execution_options( 

220 isolation_level="SERIALIZABLE", 

221 postgresql_readonly=True, 

222 postgresql_deferrable=True, 

223 ) 

224 with conn.begin(): 

225 ... # work with transaction 

226 

227Note that some DBAPIs such as asyncpg only support "readonly" with 

228SERIALIZABLE isolation. 

229 

230.. versionadded:: 1.4 added support for the ``postgresql_readonly`` 

231 and ``postgresql_deferrable`` execution options. 

232 

233.. _postgresql_reset_on_return: 

234 

235Temporary Table / Resource Reset for Connection Pooling 

236------------------------------------------------------- 

237 

238The :class:`.QueuePool` connection pool implementation used 

239by the SQLAlchemy :class:`.Engine` object includes 

240:ref:`reset on return <pool_reset_on_return>` behavior that will invoke 

241the DBAPI ``.rollback()`` method when connections are returned to the pool. 

242While this rollback will clear out the immediate state used by the previous 

243transaction, it does not cover a wider range of session-level state, including 

244temporary tables as well as other server state such as prepared statement 

245handles and statement caches. The PostgreSQL database includes a variety 

246of commands which may be used to reset this state, including 

247``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``. 

248 

249 

250To install 

251one or more of these commands as the means of performing reset-on-return, 

252the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated 

253in the example below. The implementation 

254will end transactions in progress as well as discard temporary tables 

255using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL 

256documentation for background on what each of these statements do. 

257 

258The :paramref:`_sa.create_engine.pool_reset_on_return` parameter 

259is set to ``None`` so that the custom scheme can replace the default behavior 

260completely. The custom hook implementation calls ``.rollback()`` in any case, 

261as it's usually important that the DBAPI's own tracking of commit/rollback 

262will remain consistent with the state of the transaction:: 

263 

264 

265 from sqlalchemy import create_engine 

266 from sqlalchemy import event 

267 

268 postgresql_engine = create_engine( 

269 "postgresql+psycopg2://scott:tiger@hostname/dbname", 

270 # disable default reset-on-return scheme 

271 pool_reset_on_return=None, 

272 ) 

273 

274 

275 @event.listens_for(postgresql_engine, "reset") 

276 def _reset_postgresql(dbapi_connection, connection_record, reset_state): 

277 if not reset_state.terminate_only: 

278 dbapi_connection.execute("CLOSE ALL") 

279 dbapi_connection.execute("RESET ALL") 

280 dbapi_connection.execute("DISCARD TEMP") 

281 

282 # so that the DBAPI itself knows that the connection has been 

283 # reset 

284 dbapi_connection.rollback() 

285 

286.. versionchanged:: 2.0.0b3 Added additional state arguments to 

287 the :meth:`.PoolEvents.reset` event and additionally ensured the event 

288 is invoked for all "reset" occurrences, so that it's appropriate 

289 as a place for custom "reset" handlers. Previous schemes which 

290 use the :meth:`.PoolEvents.checkin` handler remain usable as well. 

291 

292.. seealso:: 

293 

294 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation 

295 

296.. _postgresql_alternate_search_path: 

297 

298Setting Alternate Search Paths on Connect 

299------------------------------------------ 

300 

301The PostgreSQL ``search_path`` variable refers to the list of schema names 

302that will be implicitly referenced when a particular table or other 

303object is referenced in a SQL statement. As detailed in the next section 

304:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around 

305the concept of keeping this variable at its default value of ``public``, 

306however, in order to have it set to any arbitrary name or names when connections 

307are used automatically, the "SET SESSION search_path" command may be invoked 

308for all connections in a pool using the following event handler, as discussed 

309at :ref:`schema_set_default_connections`:: 

310 

311 from sqlalchemy import event 

312 from sqlalchemy import create_engine 

313 

314 engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname") 

315 

316 

317 @event.listens_for(engine, "connect", insert=True) 

318 def set_search_path(dbapi_connection, connection_record): 

319 existing_autocommit = dbapi_connection.autocommit 

320 dbapi_connection.autocommit = True 

321 cursor = dbapi_connection.cursor() 

322 cursor.execute("SET SESSION search_path='%s'" % schema_name) 

323 cursor.close() 

324 dbapi_connection.autocommit = existing_autocommit 

325 

326The reason the recipe is complicated by use of the ``.autocommit`` DBAPI 

327attribute is so that when the ``SET SESSION search_path`` directive is invoked, 

328it is invoked outside of the scope of any transaction and therefore will not 

329be reverted when the DBAPI connection has a rollback. 

330 

331.. seealso:: 

332 

333 :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation 

334 

335.. _postgresql_schema_reflection: 

336 

337Remote-Schema Table Introspection and PostgreSQL search_path 

338------------------------------------------------------------ 

339 

340.. admonition:: Section Best Practices Summarized 

341 

342 keep the ``search_path`` variable set to its default of ``public``, without 

343 any other schema names. Ensure the username used to connect **does not** 

344 match remote schemas, or ensure the ``"$user"`` token is **removed** from 

345 ``search_path``. For other schema names, name these explicitly 

346 within :class:`_schema.Table` definitions. Alternatively, the 

347 ``postgresql_ignore_search_path`` option will cause all reflected 

348 :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema` 

349 attribute set up. 

350 

351The PostgreSQL dialect can reflect tables from any schema, as outlined in 

352:ref:`metadata_reflection_schemas`. 

353 

354In all cases, the first thing SQLAlchemy does when reflecting tables is 

355to **determine the default schema for the current database connection**. 

356It does this using the PostgreSQL ``current_schema()`` 

357function, illustated below using a PostgreSQL client session (i.e. using 

358the ``psql`` tool): 

359 

360.. sourcecode:: sql 

361 

362 test=> select current_schema(); 

363 current_schema 

364 ---------------- 

365 public 

366 (1 row) 

367 

368Above we see that on a plain install of PostgreSQL, the default schema name 

369is the name ``public``. 

370 

371However, if your database username **matches the name of a schema**, PostgreSQL's 

372default is to then **use that name as the default schema**. Below, we log in 

373using the username ``scott``. When we create a schema named ``scott``, **it 

374implicitly changes the default schema**: 

375 

376.. sourcecode:: sql 

377 

378 test=> select current_schema(); 

379 current_schema 

380 ---------------- 

381 public 

382 (1 row) 

383 

384 test=> create schema scott; 

385 CREATE SCHEMA 

386 test=> select current_schema(); 

387 current_schema 

388 ---------------- 

389 scott 

390 (1 row) 

391 

392The behavior of ``current_schema()`` is derived from the 

393`PostgreSQL search path 

394<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_ 

395variable ``search_path``, which in modern PostgreSQL versions defaults to this: 

396 

397.. sourcecode:: sql 

398 

399 test=> show search_path; 

400 search_path 

401 ----------------- 

402 "$user", public 

403 (1 row) 

404 

405Where above, the ``"$user"`` variable will inject the current username as the 

406default schema, if one exists. Otherwise, ``public`` is used. 

407 

408When a :class:`_schema.Table` object is reflected, if it is present in the 

409schema indicated by the ``current_schema()`` function, **the schema name assigned 

410to the ".schema" attribute of the Table is the Python "None" value**. Otherwise, the 

411".schema" attribute will be assigned the string name of that schema. 

412 

413With regards to tables which these :class:`_schema.Table` 

414objects refer to via foreign key constraint, a decision must be made as to how 

415the ``.schema`` is represented in those remote tables, in the case where that 

416remote schema name is also a member of the current ``search_path``. 

417 

418By default, the PostgreSQL dialect mimics the behavior encouraged by 

419PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function 

420returns a sample definition for a particular foreign key constraint, 

421omitting the referenced schema name from that definition when the name is 

422also in the PostgreSQL schema search path. The interaction below 

423illustrates this behavior: 

424 

425.. sourcecode:: sql 

426 

427 test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY); 

428 CREATE TABLE 

429 test=> CREATE TABLE referring( 

430 test(> id INTEGER PRIMARY KEY, 

431 test(> referred_id INTEGER REFERENCES test_schema.referred(id)); 

432 CREATE TABLE 

433 test=> SET search_path TO public, test_schema; 

434 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM 

435 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n 

436 test-> ON n.oid = c.relnamespace 

437 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid 

438 test-> WHERE c.relname='referring' AND r.contype = 'f' 

439 test-> ; 

440 pg_get_constraintdef 

441 --------------------------------------------------- 

442 FOREIGN KEY (referred_id) REFERENCES referred(id) 

443 (1 row) 

444 

445Above, we created a table ``referred`` as a member of the remote schema 

446``test_schema``, however when we added ``test_schema`` to the 

447PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the 

448``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of 

449the function. 

450 

451On the other hand, if we set the search path back to the typical default 

452of ``public``: 

453 

454.. sourcecode:: sql 

455 

456 test=> SET search_path TO public; 

457 SET 

458 

459The same query against ``pg_get_constraintdef()`` now returns the fully 

460schema-qualified name for us: 

461 

462.. sourcecode:: sql 

463 

464 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM 

465 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n 

466 test-> ON n.oid = c.relnamespace 

467 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid 

468 test-> WHERE c.relname='referring' AND r.contype = 'f'; 

469 pg_get_constraintdef 

470 --------------------------------------------------------------- 

471 FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id) 

472 (1 row) 

473 

474SQLAlchemy will by default use the return value of ``pg_get_constraintdef()`` 

475in order to determine the remote schema name. That is, if our ``search_path`` 

476were set to include ``test_schema``, and we invoked a table 

477reflection process as follows:: 

478 

479 >>> from sqlalchemy import Table, MetaData, create_engine, text 

480 >>> engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

481 >>> with engine.connect() as conn: 

482 ... conn.execute(text("SET search_path TO test_schema, public")) 

483 ... metadata_obj = MetaData() 

484 ... referring = Table("referring", metadata_obj, autoload_with=conn) 

485 <sqlalchemy.engine.result.CursorResult object at 0x101612ed0> 

486 

487The above process would deliver to the :attr:`_schema.MetaData.tables` 

488collection 

489``referred`` table named **without** the schema:: 

490 

491 >>> metadata_obj.tables["referred"].schema is None 

492 True 

493 

494To alter the behavior of reflection such that the referred schema is 

495maintained regardless of the ``search_path`` setting, use the 

496``postgresql_ignore_search_path`` option, which can be specified as a 

497dialect-specific argument to both :class:`_schema.Table` as well as 

498:meth:`_schema.MetaData.reflect`:: 

499 

500 >>> with engine.connect() as conn: 

501 ... conn.execute(text("SET search_path TO test_schema, public")) 

502 ... metadata_obj = MetaData() 

503 ... referring = Table( 

504 ... "referring", 

505 ... metadata_obj, 

506 ... autoload_with=conn, 

507 ... postgresql_ignore_search_path=True, 

508 ... ) 

509 <sqlalchemy.engine.result.CursorResult object at 0x1016126d0> 

510 

511We will now have ``test_schema.referred`` stored as schema-qualified:: 

512 

513 >>> metadata_obj.tables["test_schema.referred"].schema 

514 'test_schema' 

515 

516.. sidebar:: Best Practices for PostgreSQL Schema reflection 

517 

518 The description of PostgreSQL schema reflection behavior is complex, and 

519 is the product of many years of dealing with widely varied use cases and 

520 user preferences. But in fact, there's no need to understand any of it if 

521 you just stick to the simplest use pattern: leave the ``search_path`` set 

522 to its default of ``public`` only, never refer to the name ``public`` as 

523 an explicit schema name otherwise, and refer to all other schema names 

524 explicitly when building up a :class:`_schema.Table` object. The options 

525 described here are only for those users who can't, or prefer not to, stay 

526 within these guidelines. 

527 

528.. seealso:: 

529 

530 :ref:`reflection_schema_qualified_interaction` - discussion of the issue 

531 from a backend-agnostic perspective 

532 

533 `The Schema Search Path 

534 <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_ 

535 - on the PostgreSQL website. 

536 

537INSERT/UPDATE...RETURNING 

538------------------------- 

539 

540The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and 

541``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default 

542for single-row INSERT statements in order to fetch newly generated 

543primary key identifiers. To specify an explicit ``RETURNING`` clause, 

544use the :meth:`._UpdateBase.returning` method on a per-statement basis:: 

545 

546 # INSERT..RETURNING 

547 result = ( 

548 table.insert().returning(table.c.col1, table.c.col2).values(name="foo") 

549 ) 

550 print(result.fetchall()) 

551 

552 # UPDATE..RETURNING 

553 result = ( 

554 table.update() 

555 .returning(table.c.col1, table.c.col2) 

556 .where(table.c.name == "foo") 

557 .values(name="bar") 

558 ) 

559 print(result.fetchall()) 

560 

561 # DELETE..RETURNING 

562 result = ( 

563 table.delete() 

564 .returning(table.c.col1, table.c.col2) 

565 .where(table.c.name == "foo") 

566 ) 

567 print(result.fetchall()) 

568 

569.. _postgresql_insert_on_conflict: 

570 

571INSERT...ON CONFLICT (Upsert) 

572------------------------------ 

573 

574Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of 

575rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A 

576candidate row will only be inserted if that row does not violate any unique 

577constraints. In the case of a unique constraint violation, a secondary action 

578can occur which can be either "DO UPDATE", indicating that the data in the 

579target row should be updated, or "DO NOTHING", which indicates to silently skip 

580this row. 

581 

582Conflicts are determined using existing unique constraints and indexes. These 

583constraints may be identified either using their name as stated in DDL, 

584or they may be inferred by stating the columns and conditions that comprise 

585the indexes. 

586 

587SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific 

588:func:`_postgresql.insert()` function, which provides 

589the generative methods :meth:`_postgresql.Insert.on_conflict_do_update` 

590and :meth:`~.postgresql.Insert.on_conflict_do_nothing`: 

591 

592.. sourcecode:: pycon+sql 

593 

594 >>> from sqlalchemy.dialects.postgresql import insert 

595 >>> insert_stmt = insert(my_table).values( 

596 ... id="some_existing_id", data="inserted value" 

597 ... ) 

598 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"]) 

599 >>> print(do_nothing_stmt) 

600 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

601 ON CONFLICT (id) DO NOTHING 

602 {stop} 

603 

604 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

605 ... constraint="pk_my_table", set_=dict(data="updated value") 

606 ... ) 

607 >>> print(do_update_stmt) 

608 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

609 ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s 

610 

611.. seealso:: 

612 

613 `INSERT .. ON CONFLICT 

614 <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_ 

615 - in the PostgreSQL documentation. 

616 

617Specifying the Target 

618^^^^^^^^^^^^^^^^^^^^^ 

619 

620Both methods supply the "target" of the conflict using either the 

621named constraint or by column inference: 

622 

623* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument 

624 specifies a sequence containing string column names, :class:`_schema.Column` 

625 objects, and/or SQL expression elements, which would identify a unique 

626 index: 

627 

628 .. sourcecode:: pycon+sql 

629 

630 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

631 ... index_elements=["id"], set_=dict(data="updated value") 

632 ... ) 

633 >>> print(do_update_stmt) 

634 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

635 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

636 {stop} 

637 

638 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

639 ... index_elements=[my_table.c.id], set_=dict(data="updated value") 

640 ... ) 

641 >>> print(do_update_stmt) 

642 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

643 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

644 

645* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to 

646 infer an index, a partial index can be inferred by also specifying the 

647 use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter: 

648 

649 .. sourcecode:: pycon+sql 

650 

651 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data") 

652 >>> stmt = stmt.on_conflict_do_update( 

653 ... index_elements=[my_table.c.user_email], 

654 ... index_where=my_table.c.user_email.like("%@gmail.com"), 

655 ... set_=dict(data=stmt.excluded.data), 

656 ... ) 

657 >>> print(stmt) 

658 {printsql}INSERT INTO my_table (data, user_email) 

659 VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email) 

660 WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data 

661 

662* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is 

663 used to specify an index directly rather than inferring it. This can be 

664 the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX: 

665 

666 .. sourcecode:: pycon+sql 

667 

668 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

669 ... constraint="my_table_idx_1", set_=dict(data="updated value") 

670 ... ) 

671 >>> print(do_update_stmt) 

672 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

673 ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s 

674 {stop} 

675 

676 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

677 ... constraint="my_table_pk", set_=dict(data="updated value") 

678 ... ) 

679 >>> print(do_update_stmt) 

680 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

681 ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s 

682 {stop} 

683 

684* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may 

685 also refer to a SQLAlchemy construct representing a constraint, 

686 e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`, 

687 :class:`.Index`, or :class:`.ExcludeConstraint`. In this use, 

688 if the constraint has a name, it is used directly. Otherwise, if the 

689 constraint is unnamed, then inference will be used, where the expressions 

690 and optional WHERE clause of the constraint will be spelled out in the 

691 construct. This use is especially convenient 

692 to refer to the named or unnamed primary key of a :class:`_schema.Table` 

693 using the 

694 :attr:`_schema.Table.primary_key` attribute: 

695 

696 .. sourcecode:: pycon+sql 

697 

698 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

699 ... constraint=my_table.primary_key, set_=dict(data="updated value") 

700 ... ) 

701 >>> print(do_update_stmt) 

702 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

703 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

704 

705The SET Clause 

706^^^^^^^^^^^^^^^ 

707 

708``ON CONFLICT...DO UPDATE`` is used to perform an update of the already 

709existing row, using any combination of new values as well as values 

710from the proposed insertion. These values are specified using the 

711:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This 

712parameter accepts a dictionary which consists of direct values 

713for UPDATE: 

714 

715.. sourcecode:: pycon+sql 

716 

717 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

718 >>> do_update_stmt = stmt.on_conflict_do_update( 

719 ... index_elements=["id"], set_=dict(data="updated value") 

720 ... ) 

721 >>> print(do_update_stmt) 

722 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

723 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

724 

725.. warning:: 

726 

727 The :meth:`_expression.Insert.on_conflict_do_update` 

728 method does **not** take into 

729 account Python-side default UPDATE values or generation functions, e.g. 

730 those specified using :paramref:`_schema.Column.onupdate`. 

731 These values will not be exercised for an ON CONFLICT style of UPDATE, 

732 unless they are manually specified in the 

733 :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary. 

734 

735Updating using the Excluded INSERT Values 

736^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

737 

738In order to refer to the proposed insertion row, the special alias 

739:attr:`~.postgresql.Insert.excluded` is available as an attribute on 

740the :class:`_postgresql.Insert` object; this object is a 

741:class:`_expression.ColumnCollection` 

742which alias contains all columns of the target 

743table: 

744 

745.. sourcecode:: pycon+sql 

746 

747 >>> stmt = insert(my_table).values( 

748 ... id="some_id", data="inserted value", author="jlh" 

749 ... ) 

750 >>> do_update_stmt = stmt.on_conflict_do_update( 

751 ... index_elements=["id"], 

752 ... set_=dict(data="updated value", author=stmt.excluded.author), 

753 ... ) 

754 >>> print(do_update_stmt) 

755 {printsql}INSERT INTO my_table (id, data, author) 

756 VALUES (%(id)s, %(data)s, %(author)s) 

757 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author 

758 

759Additional WHERE Criteria 

760^^^^^^^^^^^^^^^^^^^^^^^^^ 

761 

762The :meth:`_expression.Insert.on_conflict_do_update` method also accepts 

763a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where` 

764parameter, which will limit those rows which receive an UPDATE: 

765 

766.. sourcecode:: pycon+sql 

767 

768 >>> stmt = insert(my_table).values( 

769 ... id="some_id", data="inserted value", author="jlh" 

770 ... ) 

771 >>> on_update_stmt = stmt.on_conflict_do_update( 

772 ... index_elements=["id"], 

773 ... set_=dict(data="updated value", author=stmt.excluded.author), 

774 ... where=(my_table.c.status == 2), 

775 ... ) 

776 >>> print(on_update_stmt) 

777 {printsql}INSERT INTO my_table (id, data, author) 

778 VALUES (%(id)s, %(data)s, %(author)s) 

779 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author 

780 WHERE my_table.status = %(status_1)s 

781 

782Skipping Rows with DO NOTHING 

783^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

784 

785``ON CONFLICT`` may be used to skip inserting a row entirely 

786if any conflict with a unique or exclusion constraint occurs; below 

787this is illustrated using the 

788:meth:`~.postgresql.Insert.on_conflict_do_nothing` method: 

789 

790.. sourcecode:: pycon+sql 

791 

792 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

793 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) 

794 >>> print(stmt) 

795 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

796 ON CONFLICT (id) DO NOTHING 

797 

798If ``DO NOTHING`` is used without specifying any columns or constraint, 

799it has the effect of skipping the INSERT for any unique or exclusion 

800constraint violation which occurs: 

801 

802.. sourcecode:: pycon+sql 

803 

804 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

805 >>> stmt = stmt.on_conflict_do_nothing() 

806 >>> print(stmt) 

807 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

808 ON CONFLICT DO NOTHING 

809 

810.. _postgresql_match: 

811 

812Full Text Search 

813---------------- 

814 

815PostgreSQL's full text search system is available through the use of the 

816:data:`.func` namespace, combined with the use of custom operators 

817via the :meth:`.Operators.bool_op` method. For simple cases with some 

818degree of cross-backend compatibility, the :meth:`.Operators.match` operator 

819may also be used. 

820 

821.. _postgresql_simple_match: 

822 

823Simple plain text matching with ``match()`` 

824^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

825 

826The :meth:`.Operators.match` operator provides for cross-compatible simple 

827text matching. For the PostgreSQL backend, it's hardcoded to generate 

828an expression using the ``@@`` operator in conjunction with the 

829``plainto_tsquery()`` PostgreSQL function. 

830 

831On the PostgreSQL dialect, an expression like the following:: 

832 

833 select(sometable.c.text.match("search string")) 

834 

835would emit to the database: 

836 

837.. sourcecode:: sql 

838 

839 SELECT text @@ plainto_tsquery('search string') FROM table 

840 

841Above, passing a plain string to :meth:`.Operators.match` will automatically 

842make use of ``plainto_tsquery()`` to specify the type of tsquery. This 

843establishes basic database cross-compatibility for :meth:`.Operators.match` 

844with other backends. 

845 

846.. versionchanged:: 2.0 The default tsquery generation function used by the 

847 PostgreSQL dialect with :meth:`.Operators.match` is ``plainto_tsquery()``. 

848 

849 To render exactly what was rendered in 1.4, use the following form:: 

850 

851 from sqlalchemy import func 

852 

853 select(sometable.c.text.bool_op("@@")(func.to_tsquery("search string"))) 

854 

855 Which would emit: 

856 

857 .. sourcecode:: sql 

858 

859 SELECT text @@ to_tsquery('search string') FROM table 

860 

861Using PostgreSQL full text functions and operators directly 

862^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

863 

864Text search operations beyond the simple use of :meth:`.Operators.match` 

865may make use of the :data:`.func` namespace to generate PostgreSQL full-text 

866functions, in combination with :meth:`.Operators.bool_op` to generate 

867any boolean operator. 

868 

869For example, the query:: 

870 

871 select(func.to_tsquery("cat").bool_op("@>")(func.to_tsquery("cat & rat"))) 

872 

873would generate: 

874 

875.. sourcecode:: sql 

876 

877 SELECT to_tsquery('cat') @> to_tsquery('cat & rat') 

878 

879 

880The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST:: 

881 

882 from sqlalchemy.dialects.postgresql import TSVECTOR 

883 from sqlalchemy import select, cast 

884 

885 select(cast("some text", TSVECTOR)) 

886 

887produces a statement equivalent to: 

888 

889.. sourcecode:: sql 

890 

891 SELECT CAST('some text' AS TSVECTOR) AS anon_1 

892 

893The ``func`` namespace is augmented by the PostgreSQL dialect to set up 

894correct argument and return types for most full text search functions. 

895These functions are used automatically by the :attr:`_sql.func` namespace 

896assuming the ``sqlalchemy.dialects.postgresql`` package has been imported, 

897or :func:`_sa.create_engine` has been invoked using a ``postgresql`` 

898dialect. These functions are documented at: 

899 

900* :class:`_postgresql.to_tsvector` 

901* :class:`_postgresql.to_tsquery` 

902* :class:`_postgresql.plainto_tsquery` 

903* :class:`_postgresql.phraseto_tsquery` 

904* :class:`_postgresql.websearch_to_tsquery` 

905* :class:`_postgresql.ts_headline` 

906 

907Specifying the "regconfig" with ``match()`` or custom operators 

908^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

909 

910PostgreSQL's ``plainto_tsquery()`` function accepts an optional 

911"regconfig" argument that is used to instruct PostgreSQL to use a 

912particular pre-computed GIN or GiST index in order to perform the search. 

913When using :meth:`.Operators.match`, this additional parameter may be 

914specified using the ``postgresql_regconfig`` parameter, such as:: 

915 

916 select(mytable.c.id).where( 

917 mytable.c.title.match("somestring", postgresql_regconfig="english") 

918 ) 

919 

920Which would emit: 

921 

922.. sourcecode:: sql 

923 

924 SELECT mytable.id FROM mytable 

925 WHERE mytable.title @@ plainto_tsquery('english', 'somestring') 

926 

927When using other PostgreSQL search functions with :data:`.func`, the 

928"regconfig" parameter may be passed directly as the initial argument:: 

929 

930 select(mytable.c.id).where( 

931 func.to_tsvector("english", mytable.c.title).bool_op("@@")( 

932 func.to_tsquery("english", "somestring") 

933 ) 

934 ) 

935 

936produces a statement equivalent to: 

937 

938.. sourcecode:: sql 

939 

940 SELECT mytable.id FROM mytable 

941 WHERE to_tsvector('english', mytable.title) @@ 

942 to_tsquery('english', 'somestring') 

943 

944It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from 

945PostgreSQL to ensure that you are generating queries with SQLAlchemy that 

946take full advantage of any indexes you may have created for full text search. 

947 

948.. seealso:: 

949 

950 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation 

951 

952 

953FROM ONLY ... 

954------------- 

955 

956The dialect supports PostgreSQL's ONLY keyword for targeting only a particular 

957table in an inheritance hierarchy. This can be used to produce the 

958``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...`` 

959syntaxes. It uses SQLAlchemy's hints mechanism:: 

960 

961 # SELECT ... FROM ONLY ... 

962 result = table.select().with_hint(table, "ONLY", "postgresql") 

963 print(result.fetchall()) 

964 

965 # UPDATE ONLY ... 

966 table.update(values=dict(foo="bar")).with_hint( 

967 "ONLY", dialect_name="postgresql" 

968 ) 

969 

970 # DELETE FROM ONLY ... 

971 table.delete().with_hint("ONLY", dialect_name="postgresql") 

972 

973.. _postgresql_indexes: 

974 

975PostgreSQL-Specific Index Options 

976--------------------------------- 

977 

978Several extensions to the :class:`.Index` construct are available, specific 

979to the PostgreSQL dialect. 

980 

981.. _postgresql_covering_indexes: 

982 

983Covering Indexes 

984^^^^^^^^^^^^^^^^ 

985 

986The ``postgresql_include`` option renders INCLUDE(colname) for the given 

987string names:: 

988 

989 Index("my_index", table.c.x, postgresql_include=["y"]) 

990 

991would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` 

992 

993Note that this feature requires PostgreSQL 11 or later. 

994 

995.. seealso:: 

996 

997 :ref:`postgresql_constraint_options` 

998 

999.. versionadded:: 1.4 

1000 

1001.. _postgresql_partial_indexes: 

1002 

1003Partial Indexes 

1004^^^^^^^^^^^^^^^ 

1005 

1006Partial indexes add criterion to the index definition so that the index is 

1007applied to a subset of rows. These can be specified on :class:`.Index` 

1008using the ``postgresql_where`` keyword argument:: 

1009 

1010 Index("my_index", my_table.c.id, postgresql_where=my_table.c.value > 10) 

1011 

1012.. _postgresql_operator_classes: 

1013 

1014Operator Classes 

1015^^^^^^^^^^^^^^^^ 

1016 

1017PostgreSQL allows the specification of an *operator class* for each column of 

1018an index (see 

1019https://www.postgresql.org/docs/current/interactive/indexes-opclass.html). 

1020The :class:`.Index` construct allows these to be specified via the 

1021``postgresql_ops`` keyword argument:: 

1022 

1023 Index( 

1024 "my_index", 

1025 my_table.c.id, 

1026 my_table.c.data, 

1027 postgresql_ops={"data": "text_pattern_ops", "id": "int4_ops"}, 

1028 ) 

1029 

1030Note that the keys in the ``postgresql_ops`` dictionaries are the 

1031"key" name of the :class:`_schema.Column`, i.e. the name used to access it from 

1032the ``.c`` collection of :class:`_schema.Table`, which can be configured to be 

1033different than the actual name of the column as expressed in the database. 

1034 

1035If ``postgresql_ops`` is to be used against a complex SQL expression such 

1036as a function call, then to apply to the column it must be given a label 

1037that is identified in the dictionary by name, e.g.:: 

1038 

1039 Index( 

1040 "my_index", 

1041 my_table.c.id, 

1042 func.lower(my_table.c.data).label("data_lower"), 

1043 postgresql_ops={"data_lower": "text_pattern_ops", "id": "int4_ops"}, 

1044 ) 

1045 

1046Operator classes are also supported by the 

1047:class:`_postgresql.ExcludeConstraint` construct using the 

1048:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for 

1049details. 

1050 

1051.. versionadded:: 1.3.21 added support for operator classes with 

1052 :class:`_postgresql.ExcludeConstraint`. 

1053 

1054 

1055Index Types 

1056^^^^^^^^^^^ 

1057 

1058PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well 

1059as the ability for users to create their own (see 

1060https://www.postgresql.org/docs/current/static/indexes-types.html). These can be 

1061specified on :class:`.Index` using the ``postgresql_using`` keyword argument:: 

1062 

1063 Index("my_index", my_table.c.data, postgresql_using="gin") 

1064 

1065The value passed to the keyword argument will be simply passed through to the 

1066underlying CREATE INDEX command, so it *must* be a valid index type for your 

1067version of PostgreSQL. 

1068 

1069.. _postgresql_index_storage: 

1070 

1071Index Storage Parameters 

1072^^^^^^^^^^^^^^^^^^^^^^^^ 

1073 

1074PostgreSQL allows storage parameters to be set on indexes. The storage 

1075parameters available depend on the index method used by the index. Storage 

1076parameters can be specified on :class:`.Index` using the ``postgresql_with`` 

1077keyword argument:: 

1078 

1079 Index("my_index", my_table.c.data, postgresql_with={"fillfactor": 50}) 

1080 

1081PostgreSQL allows to define the tablespace in which to create the index. 

1082The tablespace can be specified on :class:`.Index` using the 

1083``postgresql_tablespace`` keyword argument:: 

1084 

1085 Index("my_index", my_table.c.data, postgresql_tablespace="my_tablespace") 

1086 

1087Note that the same option is available on :class:`_schema.Table` as well. 

1088 

1089.. _postgresql_index_concurrently: 

1090 

1091Indexes with CONCURRENTLY 

1092^^^^^^^^^^^^^^^^^^^^^^^^^ 

1093 

1094The PostgreSQL index option CONCURRENTLY is supported by passing the 

1095flag ``postgresql_concurrently`` to the :class:`.Index` construct:: 

1096 

1097 tbl = Table("testtbl", m, Column("data", Integer)) 

1098 

1099 idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True) 

1100 

1101The above index construct will render DDL for CREATE INDEX, assuming 

1102PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as: 

1103 

1104.. sourcecode:: sql 

1105 

1106 CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data) 

1107 

1108For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for 

1109a connection-less dialect, it will emit: 

1110 

1111.. sourcecode:: sql 

1112 

1113 DROP INDEX CONCURRENTLY test_idx1 

1114 

1115When using CONCURRENTLY, the PostgreSQL database requires that the statement 

1116be invoked outside of a transaction block. The Python DBAPI enforces that 

1117even for a single statement, a transaction is present, so to use this 

1118construct, the DBAPI's "autocommit" mode must be used:: 

1119 

1120 metadata = MetaData() 

1121 table = Table("foo", metadata, Column("id", String)) 

1122 index = Index("foo_idx", table.c.id, postgresql_concurrently=True) 

1123 

1124 with engine.connect() as conn: 

1125 with conn.execution_options(isolation_level="AUTOCOMMIT"): 

1126 table.create(conn) 

1127 

1128.. seealso:: 

1129 

1130 :ref:`postgresql_isolation_level` 

1131 

1132.. _postgresql_index_reflection: 

1133 

1134PostgreSQL Index Reflection 

1135--------------------------- 

1136 

1137The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the 

1138UNIQUE CONSTRAINT construct is used. When inspecting a table using 

1139:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes` 

1140and the :meth:`_reflection.Inspector.get_unique_constraints` 

1141will report on these 

1142two constructs distinctly; in the case of the index, the key 

1143``duplicates_constraint`` will be present in the index entry if it is 

1144detected as mirroring a constraint. When performing reflection using 

1145``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned 

1146in :attr:`_schema.Table.indexes` when it is detected as mirroring a 

1147:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection 

1148. 

1149 

1150Special Reflection Options 

1151-------------------------- 

1152 

1153The :class:`_reflection.Inspector` 

1154used for the PostgreSQL backend is an instance 

1155of :class:`.PGInspector`, which offers additional methods:: 

1156 

1157 from sqlalchemy import create_engine, inspect 

1158 

1159 engine = create_engine("postgresql+psycopg2://localhost/test") 

1160 insp = inspect(engine) # will be a PGInspector 

1161 

1162 print(insp.get_enums()) 

1163 

1164.. autoclass:: PGInspector 

1165 :members: 

1166 

1167.. _postgresql_table_options: 

1168 

1169PostgreSQL Table Options 

1170------------------------ 

1171 

1172Several options for CREATE TABLE are supported directly by the PostgreSQL 

1173dialect in conjunction with the :class:`_schema.Table` construct: 

1174 

1175* ``INHERITS``:: 

1176 

1177 Table("some_table", metadata, ..., postgresql_inherits="some_supertable") 

1178 

1179 Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...)) 

1180 

1181* ``ON COMMIT``:: 

1182 

1183 Table("some_table", metadata, ..., postgresql_on_commit="PRESERVE ROWS") 

1184 

1185* 

1186 ``PARTITION BY``:: 

1187 

1188 Table( 

1189 "some_table", 

1190 metadata, 

1191 ..., 

1192 postgresql_partition_by="LIST (part_column)", 

1193 ) 

1194 

1195 .. versionadded:: 1.2.6 

1196 

1197* 

1198 ``TABLESPACE``:: 

1199 

1200 Table("some_table", metadata, ..., postgresql_tablespace="some_tablespace") 

1201 

1202 The above option is also available on the :class:`.Index` construct. 

1203 

1204* 

1205 ``USING``:: 

1206 

1207 Table("some_table", metadata, ..., postgresql_using="heap") 

1208 

1209 .. versionadded:: 2.0.26 

1210 

1211* ``WITH OIDS``:: 

1212 

1213 Table("some_table", metadata, ..., postgresql_with_oids=True) 

1214 

1215* ``WITHOUT OIDS``:: 

1216 

1217 Table("some_table", metadata, ..., postgresql_with_oids=False) 

1218 

1219.. seealso:: 

1220 

1221 `PostgreSQL CREATE TABLE options 

1222 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ - 

1223 in the PostgreSQL documentation. 

1224 

1225.. _postgresql_constraint_options: 

1226 

1227PostgreSQL Constraint Options 

1228----------------------------- 

1229 

1230The following option(s) are supported by the PostgreSQL dialect in conjunction 

1231with selected constraint constructs: 

1232 

1233* ``NOT VALID``: This option applies towards CHECK and FOREIGN KEY constraints 

1234 when the constraint is being added to an existing table via ALTER TABLE, 

1235 and has the effect that existing rows are not scanned during the ALTER 

1236 operation against the constraint being added. 

1237 

1238 When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_ 

1239 that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument 

1240 may be specified as an additional keyword argument within the operation 

1241 that creates the constraint, as in the following Alembic example:: 

1242 

1243 def update(): 

1244 op.create_foreign_key( 

1245 "fk_user_address", 

1246 "address", 

1247 "user", 

1248 ["user_id"], 

1249 ["id"], 

1250 postgresql_not_valid=True, 

1251 ) 

1252 

1253 The keyword is ultimately accepted directly by the 

1254 :class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint` 

1255 and :class:`_schema.ForeignKey` constructs; when using a tool like 

1256 Alembic, dialect-specific keyword arguments are passed through to 

1257 these constructs from the migration operation directives:: 

1258 

1259 CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True) 

1260 

1261 ForeignKeyConstraint( 

1262 ["some_id"], ["some_table.some_id"], postgresql_not_valid=True 

1263 ) 

1264 

1265 .. versionadded:: 1.4.32 

1266 

1267 .. seealso:: 

1268 

1269 `PostgreSQL ALTER TABLE options 

1270 <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ - 

1271 in the PostgreSQL documentation. 

1272 

1273* ``INCLUDE``: This option adds one or more columns as a "payload" to the 

1274 unique index created automatically by PostgreSQL for the constraint. 

1275 For example, the following table definition:: 

1276 

1277 Table( 

1278 "mytable", 

1279 metadata, 

1280 Column("id", Integer, nullable=False), 

1281 Column("value", Integer, nullable=False), 

1282 UniqueConstraint("id", postgresql_include=["value"]), 

1283 ) 

1284 

1285 would produce the DDL statement 

1286 

1287 .. sourcecode:: sql 

1288 

1289 CREATE TABLE mytable ( 

1290 id INTEGER NOT NULL, 

1291 value INTEGER NOT NULL, 

1292 UNIQUE (id) INCLUDE (value) 

1293 ) 

1294 

1295 Note that this feature requires PostgreSQL 11 or later. 

1296 

1297 .. versionadded:: 2.0.41 

1298 

1299 .. seealso:: 

1300 

1301 :ref:`postgresql_covering_indexes` 

1302 

1303 .. seealso:: 

1304 

1305 `PostgreSQL CREATE TABLE options 

1306 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ - 

1307 in the PostgreSQL documentation. 

1308 

1309* Column list with foreign key ``ON DELETE SET`` actions: This applies to 

1310 :class:`.ForeignKey` and :class:`.ForeignKeyConstraint`, the :paramref:`.ForeignKey.ondelete` 

1311 parameter will accept on the PostgreSQL backend only a string list of column 

1312 names inside parenthesis, following the ``SET NULL`` or ``SET DEFAULT`` 

1313 phrases, which will limit the set of columns that are subject to the 

1314 action:: 

1315 

1316 fktable = Table( 

1317 "fktable", 

1318 metadata, 

1319 Column("tid", Integer), 

1320 Column("id", Integer), 

1321 Column("fk_id_del_set_null", Integer), 

1322 ForeignKeyConstraint( 

1323 columns=["tid", "fk_id_del_set_null"], 

1324 refcolumns=[pktable.c.tid, pktable.c.id], 

1325 ondelete="SET NULL (fk_id_del_set_null)", 

1326 ), 

1327 ) 

1328 

1329 .. versionadded:: 2.0.40 

1330 

1331 

1332.. _postgresql_table_valued_overview: 

1333 

1334Table values, Table and Column valued functions, Row and Tuple objects 

1335----------------------------------------------------------------------- 

1336 

1337PostgreSQL makes great use of modern SQL forms such as table-valued functions, 

1338tables and rows as values. These constructs are commonly used as part 

1339of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other 

1340datatypes. SQLAlchemy's SQL expression language has native support for 

1341most table-valued and row-valued forms. 

1342 

1343.. _postgresql_table_valued: 

1344 

1345Table-Valued Functions 

1346^^^^^^^^^^^^^^^^^^^^^^^ 

1347 

1348Many PostgreSQL built-in functions are intended to be used in the FROM clause 

1349of a SELECT statement, and are capable of returning table rows or sets of table 

1350rows. A large portion of PostgreSQL's JSON functions for example such as 

1351``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``, 

1352``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such 

1353forms. These classes of SQL function calling forms in SQLAlchemy are available 

1354using the :meth:`_functions.FunctionElement.table_valued` method in conjunction 

1355with :class:`_functions.Function` objects generated from the :data:`_sql.func` 

1356namespace. 

1357 

1358Examples from PostgreSQL's reference documentation follow below: 

1359 

1360* ``json_each()``: 

1361 

1362 .. sourcecode:: pycon+sql 

1363 

1364 >>> from sqlalchemy import select, func 

1365 >>> stmt = select( 

1366 ... func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value") 

1367 ... ) 

1368 >>> print(stmt) 

1369 {printsql}SELECT anon_1.key, anon_1.value 

1370 FROM json_each(:json_each_1) AS anon_1 

1371 

1372* ``json_populate_record()``: 

1373 

1374 .. sourcecode:: pycon+sql 

1375 

1376 >>> from sqlalchemy import select, func, literal_column 

1377 >>> stmt = select( 

1378 ... func.json_populate_record( 

1379 ... literal_column("null::myrowtype"), '{"a":1,"b":2}' 

1380 ... ).table_valued("a", "b", name="x") 

1381 ... ) 

1382 >>> print(stmt) 

1383 {printsql}SELECT x.a, x.b 

1384 FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x 

1385 

1386* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived 

1387 columns in the alias, where we may make use of :func:`_sql.column` elements with 

1388 types to produce them. The :meth:`_functions.FunctionElement.table_valued` 

1389 method produces a :class:`_sql.TableValuedAlias` construct, and the method 

1390 :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived 

1391 columns specification: 

1392 

1393 .. sourcecode:: pycon+sql 

1394 

1395 >>> from sqlalchemy import select, func, column, Integer, Text 

1396 >>> stmt = select( 

1397 ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}') 

1398 ... .table_valued( 

1399 ... column("a", Integer), 

1400 ... column("b", Text), 

1401 ... column("d", Text), 

1402 ... ) 

1403 ... .render_derived(name="x", with_types=True) 

1404 ... ) 

1405 >>> print(stmt) 

1406 {printsql}SELECT x.a, x.b, x.d 

1407 FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT) 

1408 

1409* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an 

1410 ordinal counter to the output of a function and is accepted by a limited set 

1411 of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The 

1412 :meth:`_functions.FunctionElement.table_valued` method accepts a keyword 

1413 parameter ``with_ordinality`` for this purpose, which accepts the string name 

1414 that will be applied to the "ordinality" column: 

1415 

1416 .. sourcecode:: pycon+sql 

1417 

1418 >>> from sqlalchemy import select, func 

1419 >>> stmt = select( 

1420 ... func.generate_series(4, 1, -1) 

1421 ... .table_valued("value", with_ordinality="ordinality") 

1422 ... .render_derived() 

1423 ... ) 

1424 >>> print(stmt) 

1425 {printsql}SELECT anon_1.value, anon_1.ordinality 

1426 FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3) 

1427 WITH ORDINALITY AS anon_1(value, ordinality) 

1428 

1429.. versionadded:: 1.4.0b2 

1430 

1431.. seealso:: 

1432 

1433 :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial` 

1434 

1435.. _postgresql_column_valued: 

1436 

1437Column Valued Functions 

1438^^^^^^^^^^^^^^^^^^^^^^^ 

1439 

1440Similar to the table valued function, a column valued function is present 

1441in the FROM clause, but delivers itself to the columns clause as a single 

1442scalar value. PostgreSQL functions such as ``json_array_elements()``, 

1443``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the 

1444:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`: 

1445 

1446* ``json_array_elements()``: 

1447 

1448 .. sourcecode:: pycon+sql 

1449 

1450 >>> from sqlalchemy import select, func 

1451 >>> stmt = select( 

1452 ... func.json_array_elements('["one", "two"]').column_valued("x") 

1453 ... ) 

1454 >>> print(stmt) 

1455 {printsql}SELECT x 

1456 FROM json_array_elements(:json_array_elements_1) AS x 

1457 

1458* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the 

1459 :func:`_postgresql.array` construct may be used: 

1460 

1461 .. sourcecode:: pycon+sql 

1462 

1463 >>> from sqlalchemy.dialects.postgresql import array 

1464 >>> from sqlalchemy import select, func 

1465 >>> stmt = select(func.unnest(array([1, 2])).column_valued()) 

1466 >>> print(stmt) 

1467 {printsql}SELECT anon_1 

1468 FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1 

1469 

1470 The function can of course be used against an existing table-bound column 

1471 that's of type :class:`_types.ARRAY`: 

1472 

1473 .. sourcecode:: pycon+sql 

1474 

1475 >>> from sqlalchemy import table, column, ARRAY, Integer 

1476 >>> from sqlalchemy import select, func 

1477 >>> t = table("t", column("value", ARRAY(Integer))) 

1478 >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value")) 

1479 >>> print(stmt) 

1480 {printsql}SELECT unnested_value 

1481 FROM unnest(t.value) AS unnested_value 

1482 

1483.. seealso:: 

1484 

1485 :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial` 

1486 

1487 

1488Row Types 

1489^^^^^^^^^ 

1490 

1491Built-in support for rendering a ``ROW`` may be approximated using 

1492``func.ROW`` with the :attr:`_sa.func` namespace, or by using the 

1493:func:`_sql.tuple_` construct: 

1494 

1495.. sourcecode:: pycon+sql 

1496 

1497 >>> from sqlalchemy import table, column, func, tuple_ 

1498 >>> t = table("t", column("id"), column("fk")) 

1499 >>> stmt = ( 

1500 ... t.select() 

1501 ... .where(tuple_(t.c.id, t.c.fk) > (1, 2)) 

1502 ... .where(func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7)) 

1503 ... ) 

1504 >>> print(stmt) 

1505 {printsql}SELECT t.id, t.fk 

1506 FROM t 

1507 WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2) 

1508 

1509.. seealso:: 

1510 

1511 `PostgreSQL Row Constructors 

1512 <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_ 

1513 

1514 `PostgreSQL Row Constructor Comparison 

1515 <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_ 

1516 

1517Table Types passed to Functions 

1518^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

1519 

1520PostgreSQL supports passing a table as an argument to a function, which is 

1521known as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects 

1522such as :class:`_schema.Table` support this special form using the 

1523:meth:`_sql.FromClause.table_valued` method, which is comparable to the 

1524:meth:`_functions.FunctionElement.table_valued` method except that the collection 

1525of columns is already established by that of the :class:`_sql.FromClause` 

1526itself: 

1527 

1528.. sourcecode:: pycon+sql 

1529 

1530 >>> from sqlalchemy import table, column, func, select 

1531 >>> a = table("a", column("id"), column("x"), column("y")) 

1532 >>> stmt = select(func.row_to_json(a.table_valued())) 

1533 >>> print(stmt) 

1534 {printsql}SELECT row_to_json(a) AS row_to_json_1 

1535 FROM a 

1536 

1537.. versionadded:: 1.4.0b2 

1538 

1539 

1540 

1541""" # noqa: E501 

1542 

1543from __future__ import annotations 

1544 

1545from collections import defaultdict 

1546from functools import lru_cache 

1547import re 

1548from typing import Any 

1549from typing import cast 

1550from typing import Dict 

1551from typing import List 

1552from typing import Optional 

1553from typing import Tuple 

1554from typing import TYPE_CHECKING 

1555from typing import Union 

1556 

1557from . import arraylib as _array 

1558from . import json as _json 

1559from . import pg_catalog 

1560from . import ranges as _ranges 

1561from .ext import _regconfig_fn 

1562from .ext import aggregate_order_by 

1563from .hstore import HSTORE 

1564from .named_types import CreateDomainType as CreateDomainType # noqa: F401 

1565from .named_types import CreateEnumType as CreateEnumType # noqa: F401 

1566from .named_types import DOMAIN as DOMAIN # noqa: F401 

1567from .named_types import DropDomainType as DropDomainType # noqa: F401 

1568from .named_types import DropEnumType as DropEnumType # noqa: F401 

1569from .named_types import ENUM as ENUM # noqa: F401 

1570from .named_types import NamedType as NamedType # noqa: F401 

1571from .types import _DECIMAL_TYPES # noqa: F401 

1572from .types import _FLOAT_TYPES # noqa: F401 

1573from .types import _INT_TYPES # noqa: F401 

1574from .types import BIT as BIT 

1575from .types import BYTEA as BYTEA 

1576from .types import CIDR as CIDR 

1577from .types import CITEXT as CITEXT 

1578from .types import INET as INET 

1579from .types import INTERVAL as INTERVAL 

1580from .types import MACADDR as MACADDR 

1581from .types import MACADDR8 as MACADDR8 

1582from .types import MONEY as MONEY 

1583from .types import OID as OID 

1584from .types import PGBit as PGBit # noqa: F401 

1585from .types import PGCidr as PGCidr # noqa: F401 

1586from .types import PGInet as PGInet # noqa: F401 

1587from .types import PGInterval as PGInterval # noqa: F401 

1588from .types import PGMacAddr as PGMacAddr # noqa: F401 

1589from .types import PGMacAddr8 as PGMacAddr8 # noqa: F401 

1590from .types import PGUuid as PGUuid 

1591from .types import REGCLASS as REGCLASS 

1592from .types import REGCONFIG as REGCONFIG # noqa: F401 

1593from .types import TIME as TIME 

1594from .types import TIMESTAMP as TIMESTAMP 

1595from .types import TSVECTOR as TSVECTOR 

1596from ... import exc 

1597from ... import schema 

1598from ... import select 

1599from ... import sql 

1600from ... import util 

1601from ...engine import characteristics 

1602from ...engine import default 

1603from ...engine import interfaces 

1604from ...engine import ObjectKind 

1605from ...engine import ObjectScope 

1606from ...engine import reflection 

1607from ...engine import URL 

1608from ...engine.reflection import ReflectionDefaults 

1609from ...sql import bindparam 

1610from ...sql import coercions 

1611from ...sql import compiler 

1612from ...sql import elements 

1613from ...sql import expression 

1614from ...sql import roles 

1615from ...sql import sqltypes 

1616from ...sql import util as sql_util 

1617from ...sql.compiler import InsertmanyvaluesSentinelOpts 

1618from ...sql.visitors import InternalTraversal 

1619from ...types import BIGINT 

1620from ...types import BOOLEAN 

1621from ...types import CHAR 

1622from ...types import DATE 

1623from ...types import DOUBLE_PRECISION 

1624from ...types import FLOAT 

1625from ...types import INTEGER 

1626from ...types import NUMERIC 

1627from ...types import REAL 

1628from ...types import SMALLINT 

1629from ...types import TEXT 

1630from ...types import UUID as UUID 

1631from ...types import VARCHAR 

1632from ...util.typing import TypedDict 

1633 

1634IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I) 

1635 

1636RESERVED_WORDS = { 

1637 "all", 

1638 "analyse", 

1639 "analyze", 

1640 "and", 

1641 "any", 

1642 "array", 

1643 "as", 

1644 "asc", 

1645 "asymmetric", 

1646 "both", 

1647 "case", 

1648 "cast", 

1649 "check", 

1650 "collate", 

1651 "column", 

1652 "constraint", 

1653 "create", 

1654 "current_catalog", 

1655 "current_date", 

1656 "current_role", 

1657 "current_time", 

1658 "current_timestamp", 

1659 "current_user", 

1660 "default", 

1661 "deferrable", 

1662 "desc", 

1663 "distinct", 

1664 "do", 

1665 "else", 

1666 "end", 

1667 "except", 

1668 "false", 

1669 "fetch", 

1670 "for", 

1671 "foreign", 

1672 "from", 

1673 "grant", 

1674 "group", 

1675 "having", 

1676 "in", 

1677 "initially", 

1678 "intersect", 

1679 "into", 

1680 "leading", 

1681 "limit", 

1682 "localtime", 

1683 "localtimestamp", 

1684 "new", 

1685 "not", 

1686 "null", 

1687 "of", 

1688 "off", 

1689 "offset", 

1690 "old", 

1691 "on", 

1692 "only", 

1693 "or", 

1694 "order", 

1695 "placing", 

1696 "primary", 

1697 "references", 

1698 "returning", 

1699 "select", 

1700 "session_user", 

1701 "some", 

1702 "symmetric", 

1703 "table", 

1704 "then", 

1705 "to", 

1706 "trailing", 

1707 "true", 

1708 "union", 

1709 "unique", 

1710 "user", 

1711 "using", 

1712 "variadic", 

1713 "when", 

1714 "where", 

1715 "window", 

1716 "with", 

1717 "authorization", 

1718 "between", 

1719 "binary", 

1720 "cross", 

1721 "current_schema", 

1722 "freeze", 

1723 "full", 

1724 "ilike", 

1725 "inner", 

1726 "is", 

1727 "isnull", 

1728 "join", 

1729 "left", 

1730 "like", 

1731 "natural", 

1732 "notnull", 

1733 "outer", 

1734 "over", 

1735 "overlaps", 

1736 "right", 

1737 "similar", 

1738 "verbose", 

1739} 

1740 

1741 

1742colspecs = { 

1743 sqltypes.ARRAY: _array.ARRAY, 

1744 sqltypes.Interval: INTERVAL, 

1745 sqltypes.Enum: ENUM, 

1746 sqltypes.JSON.JSONPathType: _json.JSONPATH, 

1747 sqltypes.JSON: _json.JSON, 

1748 sqltypes.Uuid: PGUuid, 

1749} 

1750 

1751 

1752ischema_names = { 

1753 "_array": _array.ARRAY, 

1754 "hstore": HSTORE, 

1755 "json": _json.JSON, 

1756 "jsonb": _json.JSONB, 

1757 "int4range": _ranges.INT4RANGE, 

1758 "int8range": _ranges.INT8RANGE, 

1759 "numrange": _ranges.NUMRANGE, 

1760 "daterange": _ranges.DATERANGE, 

1761 "tsrange": _ranges.TSRANGE, 

1762 "tstzrange": _ranges.TSTZRANGE, 

1763 "int4multirange": _ranges.INT4MULTIRANGE, 

1764 "int8multirange": _ranges.INT8MULTIRANGE, 

1765 "nummultirange": _ranges.NUMMULTIRANGE, 

1766 "datemultirange": _ranges.DATEMULTIRANGE, 

1767 "tsmultirange": _ranges.TSMULTIRANGE, 

1768 "tstzmultirange": _ranges.TSTZMULTIRANGE, 

1769 "integer": INTEGER, 

1770 "bigint": BIGINT, 

1771 "smallint": SMALLINT, 

1772 "character varying": VARCHAR, 

1773 "character": CHAR, 

1774 '"char"': sqltypes.String, 

1775 "name": sqltypes.String, 

1776 "text": TEXT, 

1777 "numeric": NUMERIC, 

1778 "float": FLOAT, 

1779 "real": REAL, 

1780 "inet": INET, 

1781 "cidr": CIDR, 

1782 "citext": CITEXT, 

1783 "uuid": UUID, 

1784 "bit": BIT, 

1785 "bit varying": BIT, 

1786 "macaddr": MACADDR, 

1787 "macaddr8": MACADDR8, 

1788 "money": MONEY, 

1789 "oid": OID, 

1790 "regclass": REGCLASS, 

1791 "double precision": DOUBLE_PRECISION, 

1792 "timestamp": TIMESTAMP, 

1793 "timestamp with time zone": TIMESTAMP, 

1794 "timestamp without time zone": TIMESTAMP, 

1795 "time with time zone": TIME, 

1796 "time without time zone": TIME, 

1797 "date": DATE, 

1798 "time": TIME, 

1799 "bytea": BYTEA, 

1800 "boolean": BOOLEAN, 

1801 "interval": INTERVAL, 

1802 "tsvector": TSVECTOR, 

1803} 

1804 

1805 

1806class PGCompiler(compiler.SQLCompiler): 

1807 def visit_to_tsvector_func(self, element, **kw): 

1808 return self._assert_pg_ts_ext(element, **kw) 

1809 

1810 def visit_to_tsquery_func(self, element, **kw): 

1811 return self._assert_pg_ts_ext(element, **kw) 

1812 

1813 def visit_plainto_tsquery_func(self, element, **kw): 

1814 return self._assert_pg_ts_ext(element, **kw) 

1815 

1816 def visit_phraseto_tsquery_func(self, element, **kw): 

1817 return self._assert_pg_ts_ext(element, **kw) 

1818 

1819 def visit_websearch_to_tsquery_func(self, element, **kw): 

1820 return self._assert_pg_ts_ext(element, **kw) 

1821 

1822 def visit_ts_headline_func(self, element, **kw): 

1823 return self._assert_pg_ts_ext(element, **kw) 

1824 

1825 def _assert_pg_ts_ext(self, element, **kw): 

1826 if not isinstance(element, _regconfig_fn): 

1827 # other options here include trying to rewrite the function 

1828 # with the correct types. however, that means we have to 

1829 # "un-SQL-ize" the first argument, which can't work in a 

1830 # generalized way. Also, parent compiler class has already added 

1831 # the incorrect return type to the result map. So let's just 

1832 # make sure the function we want is used up front. 

1833 

1834 raise exc.CompileError( 

1835 f'Can\'t compile "{element.name}()" full text search ' 

1836 f"function construct that does not originate from the " 

1837 f'"sqlalchemy.dialects.postgresql" package. ' 

1838 f'Please ensure "import sqlalchemy.dialects.postgresql" is ' 

1839 f"called before constructing " 

1840 f'"sqlalchemy.func.{element.name}()" to ensure registration ' 

1841 f"of the correct argument and return types." 

1842 ) 

1843 

1844 return f"{element.name}{self.function_argspec(element, **kw)}" 

1845 

1846 def render_bind_cast(self, type_, dbapi_type, sqltext): 

1847 if dbapi_type._type_affinity is sqltypes.String and dbapi_type.length: 

1848 # use VARCHAR with no length for VARCHAR cast. 

1849 # see #9511 

1850 dbapi_type = sqltypes.STRINGTYPE 

1851 return f"""{sqltext}::{ 

1852 self.dialect.type_compiler_instance.process( 

1853 dbapi_type, identifier_preparer=self.preparer 

1854 ) 

1855 }""" 

1856 

1857 def visit_array(self, element, **kw): 

1858 if not element.clauses and not element.type.item_type._isnull: 

1859 return "ARRAY[]::%s" % element.type.compile(self.dialect) 

1860 return "ARRAY[%s]" % self.visit_clauselist(element, **kw) 

1861 

1862 def visit_slice(self, element, **kw): 

1863 return "%s:%s" % ( 

1864 self.process(element.start, **kw), 

1865 self.process(element.stop, **kw), 

1866 ) 

1867 

1868 def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 

1869 return self._generate_generic_binary(binary, " # ", **kw) 

1870 

1871 def visit_json_getitem_op_binary( 

1872 self, binary, operator, _cast_applied=False, **kw 

1873 ): 

1874 if ( 

1875 not _cast_applied 

1876 and binary.type._type_affinity is not sqltypes.JSON 

1877 ): 

1878 kw["_cast_applied"] = True 

1879 return self.process(sql.cast(binary, binary.type), **kw) 

1880 

1881 kw["eager_grouping"] = True 

1882 

1883 return self._generate_generic_binary( 

1884 binary, " -> " if not _cast_applied else " ->> ", **kw 

1885 ) 

1886 

1887 def visit_json_path_getitem_op_binary( 

1888 self, binary, operator, _cast_applied=False, **kw 

1889 ): 

1890 if ( 

1891 not _cast_applied 

1892 and binary.type._type_affinity is not sqltypes.JSON 

1893 ): 

1894 kw["_cast_applied"] = True 

1895 return self.process(sql.cast(binary, binary.type), **kw) 

1896 

1897 kw["eager_grouping"] = True 

1898 return self._generate_generic_binary( 

1899 binary, " #> " if not _cast_applied else " #>> ", **kw 

1900 ) 

1901 

1902 def visit_getitem_binary(self, binary, operator, **kw): 

1903 return "%s[%s]" % ( 

1904 self.process(binary.left, **kw), 

1905 self.process(binary.right, **kw), 

1906 ) 

1907 

1908 def visit_aggregate_order_by(self, element, **kw): 

1909 return "%s ORDER BY %s" % ( 

1910 self.process(element.target, **kw), 

1911 self.process(element.order_by, **kw), 

1912 ) 

1913 

1914 def visit_match_op_binary(self, binary, operator, **kw): 

1915 if "postgresql_regconfig" in binary.modifiers: 

1916 regconfig = self.render_literal_value( 

1917 binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE 

1918 ) 

1919 if regconfig: 

1920 return "%s @@ plainto_tsquery(%s, %s)" % ( 

1921 self.process(binary.left, **kw), 

1922 regconfig, 

1923 self.process(binary.right, **kw), 

1924 ) 

1925 return "%s @@ plainto_tsquery(%s)" % ( 

1926 self.process(binary.left, **kw), 

1927 self.process(binary.right, **kw), 

1928 ) 

1929 

1930 def visit_ilike_case_insensitive_operand(self, element, **kw): 

1931 return element.element._compiler_dispatch(self, **kw) 

1932 

1933 def visit_ilike_op_binary(self, binary, operator, **kw): 

1934 escape = binary.modifiers.get("escape", None) 

1935 

1936 return "%s ILIKE %s" % ( 

1937 self.process(binary.left, **kw), 

1938 self.process(binary.right, **kw), 

1939 ) + ( 

1940 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

1941 if escape is not None 

1942 else "" 

1943 ) 

1944 

1945 def visit_not_ilike_op_binary(self, binary, operator, **kw): 

1946 escape = binary.modifiers.get("escape", None) 

1947 return "%s NOT ILIKE %s" % ( 

1948 self.process(binary.left, **kw), 

1949 self.process(binary.right, **kw), 

1950 ) + ( 

1951 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

1952 if escape is not None 

1953 else "" 

1954 ) 

1955 

1956 def _regexp_match(self, base_op, binary, operator, kw): 

1957 flags = binary.modifiers["flags"] 

1958 if flags is None: 

1959 return self._generate_generic_binary( 

1960 binary, " %s " % base_op, **kw 

1961 ) 

1962 if flags == "i": 

1963 return self._generate_generic_binary( 

1964 binary, " %s* " % base_op, **kw 

1965 ) 

1966 return "%s %s CONCAT('(?', %s, ')', %s)" % ( 

1967 self.process(binary.left, **kw), 

1968 base_op, 

1969 self.render_literal_value(flags, sqltypes.STRINGTYPE), 

1970 self.process(binary.right, **kw), 

1971 ) 

1972 

1973 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

1974 return self._regexp_match("~", binary, operator, kw) 

1975 

1976 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

1977 return self._regexp_match("!~", binary, operator, kw) 

1978 

1979 def visit_regexp_replace_op_binary(self, binary, operator, **kw): 

1980 string = self.process(binary.left, **kw) 

1981 pattern_replace = self.process(binary.right, **kw) 

1982 flags = binary.modifiers["flags"] 

1983 if flags is None: 

1984 return "REGEXP_REPLACE(%s, %s)" % ( 

1985 string, 

1986 pattern_replace, 

1987 ) 

1988 else: 

1989 return "REGEXP_REPLACE(%s, %s, %s)" % ( 

1990 string, 

1991 pattern_replace, 

1992 self.render_literal_value(flags, sqltypes.STRINGTYPE), 

1993 ) 

1994 

1995 def visit_empty_set_expr(self, element_types, **kw): 

1996 # cast the empty set to the type we are comparing against. if 

1997 # we are comparing against the null type, pick an arbitrary 

1998 # datatype for the empty set 

1999 return "SELECT %s WHERE 1!=1" % ( 

2000 ", ".join( 

2001 "CAST(NULL AS %s)" 

2002 % self.dialect.type_compiler_instance.process( 

2003 INTEGER() if type_._isnull else type_ 

2004 ) 

2005 for type_ in element_types or [INTEGER()] 

2006 ), 

2007 ) 

2008 

2009 def render_literal_value(self, value, type_): 

2010 value = super().render_literal_value(value, type_) 

2011 

2012 if self.dialect._backslash_escapes: 

2013 value = value.replace("\\", "\\\\") 

2014 return value 

2015 

2016 def visit_aggregate_strings_func(self, fn, **kw): 

2017 return "string_agg%s" % self.function_argspec(fn) 

2018 

2019 def visit_sequence(self, seq, **kw): 

2020 return "nextval('%s')" % self.preparer.format_sequence(seq) 

2021 

2022 def limit_clause(self, select, **kw): 

2023 text = "" 

2024 if select._limit_clause is not None: 

2025 text += " \n LIMIT " + self.process(select._limit_clause, **kw) 

2026 if select._offset_clause is not None: 

2027 if select._limit_clause is None: 

2028 text += "\n LIMIT ALL" 

2029 text += " OFFSET " + self.process(select._offset_clause, **kw) 

2030 return text 

2031 

2032 def format_from_hint_text(self, sqltext, table, hint, iscrud): 

2033 if hint.upper() != "ONLY": 

2034 raise exc.CompileError("Unrecognized hint: %r" % hint) 

2035 return "ONLY " + sqltext 

2036 

2037 def get_select_precolumns(self, select, **kw): 

2038 # Do not call super().get_select_precolumns because 

2039 # it will warn/raise when distinct on is present 

2040 if select._distinct or select._distinct_on: 

2041 if select._distinct_on: 

2042 return ( 

2043 "DISTINCT ON (" 

2044 + ", ".join( 

2045 [ 

2046 self.process(col, **kw) 

2047 for col in select._distinct_on 

2048 ] 

2049 ) 

2050 + ") " 

2051 ) 

2052 else: 

2053 return "DISTINCT " 

2054 else: 

2055 return "" 

2056 

2057 def for_update_clause(self, select, **kw): 

2058 if select._for_update_arg.read: 

2059 if select._for_update_arg.key_share: 

2060 tmp = " FOR KEY SHARE" 

2061 else: 

2062 tmp = " FOR SHARE" 

2063 elif select._for_update_arg.key_share: 

2064 tmp = " FOR NO KEY UPDATE" 

2065 else: 

2066 tmp = " FOR UPDATE" 

2067 

2068 if select._for_update_arg.of: 

2069 tables = util.OrderedSet() 

2070 for c in select._for_update_arg.of: 

2071 tables.update(sql_util.surface_selectables_only(c)) 

2072 

2073 of_kw = dict(kw) 

2074 of_kw.update(ashint=True, use_schema=False) 

2075 tmp += " OF " + ", ".join( 

2076 self.process(table, **of_kw) for table in tables 

2077 ) 

2078 

2079 if select._for_update_arg.nowait: 

2080 tmp += " NOWAIT" 

2081 if select._for_update_arg.skip_locked: 

2082 tmp += " SKIP LOCKED" 

2083 

2084 return tmp 

2085 

2086 def visit_substring_func(self, func, **kw): 

2087 s = self.process(func.clauses.clauses[0], **kw) 

2088 start = self.process(func.clauses.clauses[1], **kw) 

2089 if len(func.clauses.clauses) > 2: 

2090 length = self.process(func.clauses.clauses[2], **kw) 

2091 return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length) 

2092 else: 

2093 return "SUBSTRING(%s FROM %s)" % (s, start) 

2094 

2095 def _on_conflict_target(self, clause, **kw): 

2096 if clause.constraint_target is not None: 

2097 # target may be a name of an Index, UniqueConstraint or 

2098 # ExcludeConstraint. While there is a separate 

2099 # "max_identifier_length" for indexes, PostgreSQL uses the same 

2100 # length for all objects so we can use 

2101 # truncate_and_render_constraint_name 

2102 target_text = ( 

2103 "ON CONSTRAINT %s" 

2104 % self.preparer.truncate_and_render_constraint_name( 

2105 clause.constraint_target 

2106 ) 

2107 ) 

2108 elif clause.inferred_target_elements is not None: 

2109 target_text = "(%s)" % ", ".join( 

2110 ( 

2111 self.preparer.quote(c) 

2112 if isinstance(c, str) 

2113 else self.process(c, include_table=False, use_schema=False) 

2114 ) 

2115 for c in clause.inferred_target_elements 

2116 ) 

2117 if clause.inferred_target_whereclause is not None: 

2118 target_text += " WHERE %s" % self.process( 

2119 clause.inferred_target_whereclause, 

2120 include_table=False, 

2121 use_schema=False, 

2122 ) 

2123 else: 

2124 target_text = "" 

2125 

2126 return target_text 

2127 

2128 def visit_on_conflict_do_nothing(self, on_conflict, **kw): 

2129 target_text = self._on_conflict_target(on_conflict, **kw) 

2130 

2131 if target_text: 

2132 return "ON CONFLICT %s DO NOTHING" % target_text 

2133 else: 

2134 return "ON CONFLICT DO NOTHING" 

2135 

2136 def visit_on_conflict_do_update(self, on_conflict, **kw): 

2137 clause = on_conflict 

2138 

2139 target_text = self._on_conflict_target(on_conflict, **kw) 

2140 

2141 action_set_ops = [] 

2142 

2143 set_parameters = dict(clause.update_values_to_set) 

2144 # create a list of column assignment clauses as tuples 

2145 

2146 insert_statement = self.stack[-1]["selectable"] 

2147 cols = insert_statement.table.c 

2148 for c in cols: 

2149 col_key = c.key 

2150 

2151 if col_key in set_parameters: 

2152 value = set_parameters.pop(col_key) 

2153 elif c in set_parameters: 

2154 value = set_parameters.pop(c) 

2155 else: 

2156 continue 

2157 

2158 # TODO: this coercion should be up front. we can't cache 

2159 # SQL constructs with non-bound literals buried in them 

2160 if coercions._is_literal(value): 

2161 value = elements.BindParameter(None, value, type_=c.type) 

2162 

2163 else: 

2164 if ( 

2165 isinstance(value, elements.BindParameter) 

2166 and value.type._isnull 

2167 ): 

2168 value = value._clone() 

2169 value.type = c.type 

2170 value_text = self.process(value.self_group(), use_schema=False) 

2171 

2172 key_text = self.preparer.quote(c.name) 

2173 action_set_ops.append("%s = %s" % (key_text, value_text)) 

2174 

2175 # check for names that don't match columns 

2176 if set_parameters: 

2177 util.warn( 

2178 "Additional column names not matching " 

2179 "any column keys in table '%s': %s" 

2180 % ( 

2181 self.current_executable.table.name, 

2182 (", ".join("'%s'" % c for c in set_parameters)), 

2183 ) 

2184 ) 

2185 for k, v in set_parameters.items(): 

2186 key_text = ( 

2187 self.preparer.quote(k) 

2188 if isinstance(k, str) 

2189 else self.process(k, use_schema=False) 

2190 ) 

2191 value_text = self.process( 

2192 coercions.expect(roles.ExpressionElementRole, v), 

2193 use_schema=False, 

2194 ) 

2195 action_set_ops.append("%s = %s" % (key_text, value_text)) 

2196 

2197 action_text = ", ".join(action_set_ops) 

2198 if clause.update_whereclause is not None: 

2199 action_text += " WHERE %s" % self.process( 

2200 clause.update_whereclause, include_table=True, use_schema=False 

2201 ) 

2202 

2203 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) 

2204 

2205 def update_from_clause( 

2206 self, update_stmt, from_table, extra_froms, from_hints, **kw 

2207 ): 

2208 kw["asfrom"] = True 

2209 return "FROM " + ", ".join( 

2210 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

2211 for t in extra_froms 

2212 ) 

2213 

2214 def delete_extra_from_clause( 

2215 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

2216 ): 

2217 """Render the DELETE .. USING clause specific to PostgreSQL.""" 

2218 kw["asfrom"] = True 

2219 return "USING " + ", ".join( 

2220 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

2221 for t in extra_froms 

2222 ) 

2223 

2224 def fetch_clause(self, select, **kw): 

2225 # pg requires parens for non literal clauses. It's also required for 

2226 # bind parameters if a ::type casts is used by the driver (asyncpg), 

2227 # so it's easiest to just always add it 

2228 text = "" 

2229 if select._offset_clause is not None: 

2230 text += "\n OFFSET (%s) ROWS" % self.process( 

2231 select._offset_clause, **kw 

2232 ) 

2233 if select._fetch_clause is not None: 

2234 text += "\n FETCH FIRST (%s)%s ROWS %s" % ( 

2235 self.process(select._fetch_clause, **kw), 

2236 " PERCENT" if select._fetch_clause_options["percent"] else "", 

2237 ( 

2238 "WITH TIES" 

2239 if select._fetch_clause_options["with_ties"] 

2240 else "ONLY" 

2241 ), 

2242 ) 

2243 return text 

2244 

2245 

2246class PGDDLCompiler(compiler.DDLCompiler): 

2247 def get_column_specification(self, column, **kwargs): 

2248 colspec = self.preparer.format_column(column) 

2249 impl_type = column.type.dialect_impl(self.dialect) 

2250 if isinstance(impl_type, sqltypes.TypeDecorator): 

2251 impl_type = impl_type.impl 

2252 

2253 has_identity = ( 

2254 column.identity is not None 

2255 and self.dialect.supports_identity_columns 

2256 ) 

2257 

2258 if ( 

2259 column.primary_key 

2260 and column is column.table._autoincrement_column 

2261 and ( 

2262 self.dialect.supports_smallserial 

2263 or not isinstance(impl_type, sqltypes.SmallInteger) 

2264 ) 

2265 and not has_identity 

2266 and ( 

2267 column.default is None 

2268 or ( 

2269 isinstance(column.default, schema.Sequence) 

2270 and column.default.optional 

2271 ) 

2272 ) 

2273 ): 

2274 if isinstance(impl_type, sqltypes.BigInteger): 

2275 colspec += " BIGSERIAL" 

2276 elif isinstance(impl_type, sqltypes.SmallInteger): 

2277 colspec += " SMALLSERIAL" 

2278 else: 

2279 colspec += " SERIAL" 

2280 else: 

2281 colspec += " " + self.dialect.type_compiler_instance.process( 

2282 column.type, 

2283 type_expression=column, 

2284 identifier_preparer=self.preparer, 

2285 ) 

2286 default = self.get_column_default_string(column) 

2287 if default is not None: 

2288 colspec += " DEFAULT " + default 

2289 

2290 if column.computed is not None: 

2291 colspec += " " + self.process(column.computed) 

2292 if has_identity: 

2293 colspec += " " + self.process(column.identity) 

2294 

2295 if not column.nullable and not has_identity: 

2296 colspec += " NOT NULL" 

2297 elif column.nullable and has_identity: 

2298 colspec += " NULL" 

2299 return colspec 

2300 

2301 def _define_constraint_validity(self, constraint): 

2302 not_valid = constraint.dialect_options["postgresql"]["not_valid"] 

2303 return " NOT VALID" if not_valid else "" 

2304 

2305 def _define_include(self, obj): 

2306 includeclause = obj.dialect_options["postgresql"]["include"] 

2307 if not includeclause: 

2308 return "" 

2309 inclusions = [ 

2310 obj.table.c[col] if isinstance(col, str) else col 

2311 for col in includeclause 

2312 ] 

2313 return " INCLUDE (%s)" % ", ".join( 

2314 [self.preparer.quote(c.name) for c in inclusions] 

2315 ) 

2316 

2317 def visit_check_constraint(self, constraint, **kw): 

2318 if constraint._type_bound: 

2319 typ = list(constraint.columns)[0].type 

2320 if ( 

2321 isinstance(typ, sqltypes.ARRAY) 

2322 and isinstance(typ.item_type, sqltypes.Enum) 

2323 and not typ.item_type.native_enum 

2324 ): 

2325 raise exc.CompileError( 

2326 "PostgreSQL dialect cannot produce the CHECK constraint " 

2327 "for ARRAY of non-native ENUM; please specify " 

2328 "create_constraint=False on this Enum datatype." 

2329 ) 

2330 

2331 text = super().visit_check_constraint(constraint) 

2332 text += self._define_constraint_validity(constraint) 

2333 return text 

2334 

2335 def visit_foreign_key_constraint(self, constraint, **kw): 

2336 text = super().visit_foreign_key_constraint(constraint) 

2337 text += self._define_constraint_validity(constraint) 

2338 return text 

2339 

2340 def visit_primary_key_constraint(self, constraint, **kw): 

2341 text = super().visit_primary_key_constraint(constraint) 

2342 text += self._define_include(constraint) 

2343 return text 

2344 

2345 def visit_unique_constraint(self, constraint, **kw): 

2346 text = super().visit_unique_constraint(constraint) 

2347 text += self._define_include(constraint) 

2348 return text 

2349 

2350 @util.memoized_property 

2351 def _fk_ondelete_pattern(self): 

2352 return re.compile( 

2353 r"^(?:RESTRICT|CASCADE|SET (?:NULL|DEFAULT)(?:\s*\(.+\))?" 

2354 r"|NO ACTION)$", 

2355 re.I, 

2356 ) 

2357 

2358 def define_constraint_ondelete_cascade(self, constraint): 

2359 return " ON DELETE %s" % self.preparer.validate_sql_phrase( 

2360 constraint.ondelete, self._fk_ondelete_pattern 

2361 ) 

2362 

2363 def visit_create_enum_type(self, create, **kw): 

2364 type_ = create.element 

2365 

2366 return "CREATE TYPE %s AS ENUM (%s)" % ( 

2367 self.preparer.format_type(type_), 

2368 ", ".join( 

2369 self.sql_compiler.process(sql.literal(e), literal_binds=True) 

2370 for e in type_.enums 

2371 ), 

2372 ) 

2373 

2374 def visit_drop_enum_type(self, drop, **kw): 

2375 type_ = drop.element 

2376 

2377 return "DROP TYPE %s" % (self.preparer.format_type(type_)) 

2378 

2379 def visit_create_domain_type(self, create, **kw): 

2380 domain: DOMAIN = create.element 

2381 

2382 options = [] 

2383 if domain.collation is not None: 

2384 options.append(f"COLLATE {self.preparer.quote(domain.collation)}") 

2385 if domain.default is not None: 

2386 default = self.render_default_string(domain.default) 

2387 options.append(f"DEFAULT {default}") 

2388 if domain.constraint_name is not None: 

2389 name = self.preparer.truncate_and_render_constraint_name( 

2390 domain.constraint_name 

2391 ) 

2392 options.append(f"CONSTRAINT {name}") 

2393 if domain.not_null: 

2394 options.append("NOT NULL") 

2395 if domain.check is not None: 

2396 check = self.sql_compiler.process( 

2397 domain.check, include_table=False, literal_binds=True 

2398 ) 

2399 options.append(f"CHECK ({check})") 

2400 

2401 return ( 

2402 f"CREATE DOMAIN {self.preparer.format_type(domain)} AS " 

2403 f"{self.type_compiler.process(domain.data_type)} " 

2404 f"{' '.join(options)}" 

2405 ) 

2406 

2407 def visit_drop_domain_type(self, drop, **kw): 

2408 domain = drop.element 

2409 return f"DROP DOMAIN {self.preparer.format_type(domain)}" 

2410 

2411 def visit_create_index(self, create, **kw): 

2412 preparer = self.preparer 

2413 index = create.element 

2414 self._verify_index_table(index) 

2415 text = "CREATE " 

2416 if index.unique: 

2417 text += "UNIQUE " 

2418 

2419 text += "INDEX " 

2420 

2421 if self.dialect._supports_create_index_concurrently: 

2422 concurrently = index.dialect_options["postgresql"]["concurrently"] 

2423 if concurrently: 

2424 text += "CONCURRENTLY " 

2425 

2426 if create.if_not_exists: 

2427 text += "IF NOT EXISTS " 

2428 

2429 text += "%s ON %s " % ( 

2430 self._prepared_index_name(index, include_schema=False), 

2431 preparer.format_table(index.table), 

2432 ) 

2433 

2434 using = index.dialect_options["postgresql"]["using"] 

2435 if using: 

2436 text += ( 

2437 "USING %s " 

2438 % self.preparer.validate_sql_phrase(using, IDX_USING).lower() 

2439 ) 

2440 

2441 ops = index.dialect_options["postgresql"]["ops"] 

2442 text += "(%s)" % ( 

2443 ", ".join( 

2444 [ 

2445 self.sql_compiler.process( 

2446 ( 

2447 expr.self_group() 

2448 if not isinstance(expr, expression.ColumnClause) 

2449 else expr 

2450 ), 

2451 include_table=False, 

2452 literal_binds=True, 

2453 ) 

2454 + ( 

2455 (" " + ops[expr.key]) 

2456 if hasattr(expr, "key") and expr.key in ops 

2457 else "" 

2458 ) 

2459 for expr in index.expressions 

2460 ] 

2461 ) 

2462 ) 

2463 

2464 text += self._define_include(index) 

2465 

2466 nulls_not_distinct = index.dialect_options["postgresql"][ 

2467 "nulls_not_distinct" 

2468 ] 

2469 if nulls_not_distinct is True: 

2470 text += " NULLS NOT DISTINCT" 

2471 elif nulls_not_distinct is False: 

2472 text += " NULLS DISTINCT" 

2473 

2474 withclause = index.dialect_options["postgresql"]["with"] 

2475 if withclause: 

2476 text += " WITH (%s)" % ( 

2477 ", ".join( 

2478 [ 

2479 "%s = %s" % storage_parameter 

2480 for storage_parameter in withclause.items() 

2481 ] 

2482 ) 

2483 ) 

2484 

2485 tablespace_name = index.dialect_options["postgresql"]["tablespace"] 

2486 if tablespace_name: 

2487 text += " TABLESPACE %s" % preparer.quote(tablespace_name) 

2488 

2489 whereclause = index.dialect_options["postgresql"]["where"] 

2490 if whereclause is not None: 

2491 whereclause = coercions.expect( 

2492 roles.DDLExpressionRole, whereclause 

2493 ) 

2494 

2495 where_compiled = self.sql_compiler.process( 

2496 whereclause, include_table=False, literal_binds=True 

2497 ) 

2498 text += " WHERE " + where_compiled 

2499 

2500 return text 

2501 

2502 def define_unique_constraint_distinct(self, constraint, **kw): 

2503 nulls_not_distinct = constraint.dialect_options["postgresql"][ 

2504 "nulls_not_distinct" 

2505 ] 

2506 if nulls_not_distinct is True: 

2507 nulls_not_distinct_param = "NULLS NOT DISTINCT " 

2508 elif nulls_not_distinct is False: 

2509 nulls_not_distinct_param = "NULLS DISTINCT " 

2510 else: 

2511 nulls_not_distinct_param = "" 

2512 return nulls_not_distinct_param 

2513 

2514 def visit_drop_index(self, drop, **kw): 

2515 index = drop.element 

2516 

2517 text = "\nDROP INDEX " 

2518 

2519 if self.dialect._supports_drop_index_concurrently: 

2520 concurrently = index.dialect_options["postgresql"]["concurrently"] 

2521 if concurrently: 

2522 text += "CONCURRENTLY " 

2523 

2524 if drop.if_exists: 

2525 text += "IF EXISTS " 

2526 

2527 text += self._prepared_index_name(index, include_schema=True) 

2528 return text 

2529 

2530 def visit_exclude_constraint(self, constraint, **kw): 

2531 text = "" 

2532 if constraint.name is not None: 

2533 text += "CONSTRAINT %s " % self.preparer.format_constraint( 

2534 constraint 

2535 ) 

2536 elements = [] 

2537 kw["include_table"] = False 

2538 kw["literal_binds"] = True 

2539 for expr, name, op in constraint._render_exprs: 

2540 exclude_element = self.sql_compiler.process(expr, **kw) + ( 

2541 (" " + constraint.ops[expr.key]) 

2542 if hasattr(expr, "key") and expr.key in constraint.ops 

2543 else "" 

2544 ) 

2545 

2546 elements.append("%s WITH %s" % (exclude_element, op)) 

2547 text += "EXCLUDE USING %s (%s)" % ( 

2548 self.preparer.validate_sql_phrase( 

2549 constraint.using, IDX_USING 

2550 ).lower(), 

2551 ", ".join(elements), 

2552 ) 

2553 if constraint.where is not None: 

2554 text += " WHERE (%s)" % self.sql_compiler.process( 

2555 constraint.where, literal_binds=True 

2556 ) 

2557 text += self.define_constraint_deferrability(constraint) 

2558 return text 

2559 

2560 def post_create_table(self, table): 

2561 table_opts = [] 

2562 pg_opts = table.dialect_options["postgresql"] 

2563 

2564 inherits = pg_opts.get("inherits") 

2565 if inherits is not None: 

2566 if not isinstance(inherits, (list, tuple)): 

2567 inherits = (inherits,) 

2568 table_opts.append( 

2569 "\n INHERITS ( " 

2570 + ", ".join(self.preparer.quote(name) for name in inherits) 

2571 + " )" 

2572 ) 

2573 

2574 if pg_opts["partition_by"]: 

2575 table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"]) 

2576 

2577 if pg_opts["using"]: 

2578 table_opts.append("\n USING %s" % pg_opts["using"]) 

2579 

2580 if pg_opts["with_oids"] is True: 

2581 table_opts.append("\n WITH OIDS") 

2582 elif pg_opts["with_oids"] is False: 

2583 table_opts.append("\n WITHOUT OIDS") 

2584 

2585 if pg_opts["on_commit"]: 

2586 on_commit_options = pg_opts["on_commit"].replace("_", " ").upper() 

2587 table_opts.append("\n ON COMMIT %s" % on_commit_options) 

2588 

2589 if pg_opts["tablespace"]: 

2590 tablespace_name = pg_opts["tablespace"] 

2591 table_opts.append( 

2592 "\n TABLESPACE %s" % self.preparer.quote(tablespace_name) 

2593 ) 

2594 

2595 return "".join(table_opts) 

2596 

2597 def visit_computed_column(self, generated, **kw): 

2598 if generated.persisted is False: 

2599 raise exc.CompileError( 

2600 "PostrgreSQL computed columns do not support 'virtual' " 

2601 "persistence; set the 'persisted' flag to None or True for " 

2602 "PostgreSQL support." 

2603 ) 

2604 

2605 return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process( 

2606 generated.sqltext, include_table=False, literal_binds=True 

2607 ) 

2608 

2609 def visit_create_sequence(self, create, **kw): 

2610 prefix = None 

2611 if create.element.data_type is not None: 

2612 prefix = " AS %s" % self.type_compiler.process( 

2613 create.element.data_type 

2614 ) 

2615 

2616 return super().visit_create_sequence(create, prefix=prefix, **kw) 

2617 

2618 def _can_comment_on_constraint(self, ddl_instance): 

2619 constraint = ddl_instance.element 

2620 if constraint.name is None: 

2621 raise exc.CompileError( 

2622 f"Can't emit COMMENT ON for constraint {constraint!r}: " 

2623 "it has no name" 

2624 ) 

2625 if constraint.table is None: 

2626 raise exc.CompileError( 

2627 f"Can't emit COMMENT ON for constraint {constraint!r}: " 

2628 "it has no associated table" 

2629 ) 

2630 

2631 def visit_set_constraint_comment(self, create, **kw): 

2632 self._can_comment_on_constraint(create) 

2633 return "COMMENT ON CONSTRAINT %s ON %s IS %s" % ( 

2634 self.preparer.format_constraint(create.element), 

2635 self.preparer.format_table(create.element.table), 

2636 self.sql_compiler.render_literal_value( 

2637 create.element.comment, sqltypes.String() 

2638 ), 

2639 ) 

2640 

2641 def visit_drop_constraint_comment(self, drop, **kw): 

2642 self._can_comment_on_constraint(drop) 

2643 return "COMMENT ON CONSTRAINT %s ON %s IS NULL" % ( 

2644 self.preparer.format_constraint(drop.element), 

2645 self.preparer.format_table(drop.element.table), 

2646 ) 

2647 

2648 

2649class PGTypeCompiler(compiler.GenericTypeCompiler): 

2650 def visit_TSVECTOR(self, type_, **kw): 

2651 return "TSVECTOR" 

2652 

2653 def visit_TSQUERY(self, type_, **kw): 

2654 return "TSQUERY" 

2655 

2656 def visit_INET(self, type_, **kw): 

2657 return "INET" 

2658 

2659 def visit_CIDR(self, type_, **kw): 

2660 return "CIDR" 

2661 

2662 def visit_CITEXT(self, type_, **kw): 

2663 return "CITEXT" 

2664 

2665 def visit_MACADDR(self, type_, **kw): 

2666 return "MACADDR" 

2667 

2668 def visit_MACADDR8(self, type_, **kw): 

2669 return "MACADDR8" 

2670 

2671 def visit_MONEY(self, type_, **kw): 

2672 return "MONEY" 

2673 

2674 def visit_OID(self, type_, **kw): 

2675 return "OID" 

2676 

2677 def visit_REGCONFIG(self, type_, **kw): 

2678 return "REGCONFIG" 

2679 

2680 def visit_REGCLASS(self, type_, **kw): 

2681 return "REGCLASS" 

2682 

2683 def visit_FLOAT(self, type_, **kw): 

2684 if not type_.precision: 

2685 return "FLOAT" 

2686 else: 

2687 return "FLOAT(%(precision)s)" % {"precision": type_.precision} 

2688 

2689 def visit_double(self, type_, **kw): 

2690 return self.visit_DOUBLE_PRECISION(type, **kw) 

2691 

2692 def visit_BIGINT(self, type_, **kw): 

2693 return "BIGINT" 

2694 

2695 def visit_HSTORE(self, type_, **kw): 

2696 return "HSTORE" 

2697 

2698 def visit_JSON(self, type_, **kw): 

2699 return "JSON" 

2700 

2701 def visit_JSONB(self, type_, **kw): 

2702 return "JSONB" 

2703 

2704 def visit_INT4MULTIRANGE(self, type_, **kw): 

2705 return "INT4MULTIRANGE" 

2706 

2707 def visit_INT8MULTIRANGE(self, type_, **kw): 

2708 return "INT8MULTIRANGE" 

2709 

2710 def visit_NUMMULTIRANGE(self, type_, **kw): 

2711 return "NUMMULTIRANGE" 

2712 

2713 def visit_DATEMULTIRANGE(self, type_, **kw): 

2714 return "DATEMULTIRANGE" 

2715 

2716 def visit_TSMULTIRANGE(self, type_, **kw): 

2717 return "TSMULTIRANGE" 

2718 

2719 def visit_TSTZMULTIRANGE(self, type_, **kw): 

2720 return "TSTZMULTIRANGE" 

2721 

2722 def visit_INT4RANGE(self, type_, **kw): 

2723 return "INT4RANGE" 

2724 

2725 def visit_INT8RANGE(self, type_, **kw): 

2726 return "INT8RANGE" 

2727 

2728 def visit_NUMRANGE(self, type_, **kw): 

2729 return "NUMRANGE" 

2730 

2731 def visit_DATERANGE(self, type_, **kw): 

2732 return "DATERANGE" 

2733 

2734 def visit_TSRANGE(self, type_, **kw): 

2735 return "TSRANGE" 

2736 

2737 def visit_TSTZRANGE(self, type_, **kw): 

2738 return "TSTZRANGE" 

2739 

2740 def visit_json_int_index(self, type_, **kw): 

2741 return "INT" 

2742 

2743 def visit_json_str_index(self, type_, **kw): 

2744 return "TEXT" 

2745 

2746 def visit_datetime(self, type_, **kw): 

2747 return self.visit_TIMESTAMP(type_, **kw) 

2748 

2749 def visit_enum(self, type_, **kw): 

2750 if not type_.native_enum or not self.dialect.supports_native_enum: 

2751 return super().visit_enum(type_, **kw) 

2752 else: 

2753 return self.visit_ENUM(type_, **kw) 

2754 

2755 def visit_ENUM(self, type_, identifier_preparer=None, **kw): 

2756 if identifier_preparer is None: 

2757 identifier_preparer = self.dialect.identifier_preparer 

2758 return identifier_preparer.format_type(type_) 

2759 

2760 def visit_DOMAIN(self, type_, identifier_preparer=None, **kw): 

2761 if identifier_preparer is None: 

2762 identifier_preparer = self.dialect.identifier_preparer 

2763 return identifier_preparer.format_type(type_) 

2764 

2765 def visit_TIMESTAMP(self, type_, **kw): 

2766 return "TIMESTAMP%s %s" % ( 

2767 ( 

2768 "(%d)" % type_.precision 

2769 if getattr(type_, "precision", None) is not None 

2770 else "" 

2771 ), 

2772 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", 

2773 ) 

2774 

2775 def visit_TIME(self, type_, **kw): 

2776 return "TIME%s %s" % ( 

2777 ( 

2778 "(%d)" % type_.precision 

2779 if getattr(type_, "precision", None) is not None 

2780 else "" 

2781 ), 

2782 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", 

2783 ) 

2784 

2785 def visit_INTERVAL(self, type_, **kw): 

2786 text = "INTERVAL" 

2787 if type_.fields is not None: 

2788 text += " " + type_.fields 

2789 if type_.precision is not None: 

2790 text += " (%d)" % type_.precision 

2791 return text 

2792 

2793 def visit_BIT(self, type_, **kw): 

2794 if type_.varying: 

2795 compiled = "BIT VARYING" 

2796 if type_.length is not None: 

2797 compiled += "(%d)" % type_.length 

2798 else: 

2799 compiled = "BIT(%d)" % type_.length 

2800 return compiled 

2801 

2802 def visit_uuid(self, type_, **kw): 

2803 if type_.native_uuid: 

2804 return self.visit_UUID(type_, **kw) 

2805 else: 

2806 return super().visit_uuid(type_, **kw) 

2807 

2808 def visit_UUID(self, type_, **kw): 

2809 return "UUID" 

2810 

2811 def visit_large_binary(self, type_, **kw): 

2812 return self.visit_BYTEA(type_, **kw) 

2813 

2814 def visit_BYTEA(self, type_, **kw): 

2815 return "BYTEA" 

2816 

2817 def visit_ARRAY(self, type_, **kw): 

2818 inner = self.process(type_.item_type, **kw) 

2819 return re.sub( 

2820 r"((?: COLLATE.*)?)$", 

2821 ( 

2822 r"%s\1" 

2823 % ( 

2824 "[]" 

2825 * (type_.dimensions if type_.dimensions is not None else 1) 

2826 ) 

2827 ), 

2828 inner, 

2829 count=1, 

2830 ) 

2831 

2832 def visit_json_path(self, type_, **kw): 

2833 return self.visit_JSONPATH(type_, **kw) 

2834 

2835 def visit_JSONPATH(self, type_, **kw): 

2836 return "JSONPATH" 

2837 

2838 

2839class PGIdentifierPreparer(compiler.IdentifierPreparer): 

2840 reserved_words = RESERVED_WORDS 

2841 

2842 def _unquote_identifier(self, value): 

2843 if value[0] == self.initial_quote: 

2844 value = value[1:-1].replace( 

2845 self.escape_to_quote, self.escape_quote 

2846 ) 

2847 return value 

2848 

2849 def format_type(self, type_, use_schema=True): 

2850 if not type_.name: 

2851 raise exc.CompileError( 

2852 f"PostgreSQL {type_.__class__.__name__} type requires a name." 

2853 ) 

2854 

2855 name = self.quote(type_.name) 

2856 effective_schema = self.schema_for_object(type_) 

2857 

2858 if ( 

2859 not self.omit_schema 

2860 and use_schema 

2861 and effective_schema is not None 

2862 ): 

2863 name = f"{self.quote_schema(effective_schema)}.{name}" 

2864 return name 

2865 

2866 

2867class ReflectedNamedType(TypedDict): 

2868 """Represents a reflected named type.""" 

2869 

2870 name: str 

2871 """Name of the type.""" 

2872 schema: str 

2873 """The schema of the type.""" 

2874 visible: bool 

2875 """Indicates if this type is in the current search path.""" 

2876 

2877 

2878class ReflectedDomainConstraint(TypedDict): 

2879 """Represents a reflect check constraint of a domain.""" 

2880 

2881 name: str 

2882 """Name of the constraint.""" 

2883 check: str 

2884 """The check constraint text.""" 

2885 

2886 

2887class ReflectedDomain(ReflectedNamedType): 

2888 """Represents a reflected enum.""" 

2889 

2890 type: str 

2891 """The string name of the underlying data type of the domain.""" 

2892 nullable: bool 

2893 """Indicates if the domain allows null or not.""" 

2894 default: Optional[str] 

2895 """The string representation of the default value of this domain 

2896 or ``None`` if none present. 

2897 """ 

2898 constraints: List[ReflectedDomainConstraint] 

2899 """The constraints defined in the domain, if any. 

2900 The constraint are in order of evaluation by postgresql. 

2901 """ 

2902 collation: Optional[str] 

2903 """The collation for the domain.""" 

2904 

2905 

2906class ReflectedEnum(ReflectedNamedType): 

2907 """Represents a reflected enum.""" 

2908 

2909 labels: List[str] 

2910 """The labels that compose the enum.""" 

2911 

2912 

2913class PGInspector(reflection.Inspector): 

2914 dialect: PGDialect 

2915 

2916 def get_table_oid( 

2917 self, table_name: str, schema: Optional[str] = None 

2918 ) -> int: 

2919 """Return the OID for the given table name. 

2920 

2921 :param table_name: string name of the table. For special quoting, 

2922 use :class:`.quoted_name`. 

2923 

2924 :param schema: string schema name; if omitted, uses the default schema 

2925 of the database connection. For special quoting, 

2926 use :class:`.quoted_name`. 

2927 

2928 """ 

2929 

2930 with self._operation_context() as conn: 

2931 return self.dialect.get_table_oid( 

2932 conn, table_name, schema, info_cache=self.info_cache 

2933 ) 

2934 

2935 def get_domains( 

2936 self, schema: Optional[str] = None 

2937 ) -> List[ReflectedDomain]: 

2938 """Return a list of DOMAIN objects. 

2939 

2940 Each member is a dictionary containing these fields: 

2941 

2942 * name - name of the domain 

2943 * schema - the schema name for the domain. 

2944 * visible - boolean, whether or not this domain is visible 

2945 in the default search path. 

2946 * type - the type defined by this domain. 

2947 * nullable - Indicates if this domain can be ``NULL``. 

2948 * default - The default value of the domain or ``None`` if the 

2949 domain has no default. 

2950 * constraints - A list of dict wit the constraint defined by this 

2951 domain. Each element constaints two keys: ``name`` of the 

2952 constraint and ``check`` with the constraint text. 

2953 

2954 :param schema: schema name. If None, the default schema 

2955 (typically 'public') is used. May also be set to ``'*'`` to 

2956 indicate load domains for all schemas. 

2957 

2958 .. versionadded:: 2.0 

2959 

2960 """ 

2961 with self._operation_context() as conn: 

2962 return self.dialect._load_domains( 

2963 conn, schema, info_cache=self.info_cache 

2964 ) 

2965 

2966 def get_enums(self, schema: Optional[str] = None) -> List[ReflectedEnum]: 

2967 """Return a list of ENUM objects. 

2968 

2969 Each member is a dictionary containing these fields: 

2970 

2971 * name - name of the enum 

2972 * schema - the schema name for the enum. 

2973 * visible - boolean, whether or not this enum is visible 

2974 in the default search path. 

2975 * labels - a list of string labels that apply to the enum. 

2976 

2977 :param schema: schema name. If None, the default schema 

2978 (typically 'public') is used. May also be set to ``'*'`` to 

2979 indicate load enums for all schemas. 

2980 

2981 """ 

2982 with self._operation_context() as conn: 

2983 return self.dialect._load_enums( 

2984 conn, schema, info_cache=self.info_cache 

2985 ) 

2986 

2987 def get_foreign_table_names( 

2988 self, schema: Optional[str] = None 

2989 ) -> List[str]: 

2990 """Return a list of FOREIGN TABLE names. 

2991 

2992 Behavior is similar to that of 

2993 :meth:`_reflection.Inspector.get_table_names`, 

2994 except that the list is limited to those tables that report a 

2995 ``relkind`` value of ``f``. 

2996 

2997 """ 

2998 with self._operation_context() as conn: 

2999 return self.dialect._get_foreign_table_names( 

3000 conn, schema, info_cache=self.info_cache 

3001 ) 

3002 

3003 def has_type( 

3004 self, type_name: str, schema: Optional[str] = None, **kw: Any 

3005 ) -> bool: 

3006 """Return if the database has the specified type in the provided 

3007 schema. 

3008 

3009 :param type_name: the type to check. 

3010 :param schema: schema name. If None, the default schema 

3011 (typically 'public') is used. May also be set to ``'*'`` to 

3012 check in all schemas. 

3013 

3014 .. versionadded:: 2.0 

3015 

3016 """ 

3017 with self._operation_context() as conn: 

3018 return self.dialect.has_type( 

3019 conn, type_name, schema, info_cache=self.info_cache 

3020 ) 

3021 

3022 

3023class PGExecutionContext(default.DefaultExecutionContext): 

3024 def fire_sequence(self, seq, type_): 

3025 return self._execute_scalar( 

3026 ( 

3027 "select nextval('%s')" 

3028 % self.identifier_preparer.format_sequence(seq) 

3029 ), 

3030 type_, 

3031 ) 

3032 

3033 def get_insert_default(self, column): 

3034 if column.primary_key and column is column.table._autoincrement_column: 

3035 if column.server_default and column.server_default.has_argument: 

3036 # pre-execute passive defaults on primary key columns 

3037 return self._execute_scalar( 

3038 "select %s" % column.server_default.arg, column.type 

3039 ) 

3040 

3041 elif column.default is None or ( 

3042 column.default.is_sequence and column.default.optional 

3043 ): 

3044 # execute the sequence associated with a SERIAL primary 

3045 # key column. for non-primary-key SERIAL, the ID just 

3046 # generates server side. 

3047 

3048 try: 

3049 seq_name = column._postgresql_seq_name 

3050 except AttributeError: 

3051 tab = column.table.name 

3052 col = column.name 

3053 tab = tab[0 : 29 + max(0, (29 - len(col)))] 

3054 col = col[0 : 29 + max(0, (29 - len(tab)))] 

3055 name = "%s_%s_seq" % (tab, col) 

3056 column._postgresql_seq_name = seq_name = name 

3057 

3058 if column.table is not None: 

3059 effective_schema = self.connection.schema_for_object( 

3060 column.table 

3061 ) 

3062 else: 

3063 effective_schema = None 

3064 

3065 if effective_schema is not None: 

3066 exc = 'select nextval(\'"%s"."%s"\')' % ( 

3067 effective_schema, 

3068 seq_name, 

3069 ) 

3070 else: 

3071 exc = "select nextval('\"%s\"')" % (seq_name,) 

3072 

3073 return self._execute_scalar(exc, column.type) 

3074 

3075 return super().get_insert_default(column) 

3076 

3077 

3078class PGReadOnlyConnectionCharacteristic( 

3079 characteristics.ConnectionCharacteristic 

3080): 

3081 transactional = True 

3082 

3083 def reset_characteristic(self, dialect, dbapi_conn): 

3084 dialect.set_readonly(dbapi_conn, False) 

3085 

3086 def set_characteristic(self, dialect, dbapi_conn, value): 

3087 dialect.set_readonly(dbapi_conn, value) 

3088 

3089 def get_characteristic(self, dialect, dbapi_conn): 

3090 return dialect.get_readonly(dbapi_conn) 

3091 

3092 

3093class PGDeferrableConnectionCharacteristic( 

3094 characteristics.ConnectionCharacteristic 

3095): 

3096 transactional = True 

3097 

3098 def reset_characteristic(self, dialect, dbapi_conn): 

3099 dialect.set_deferrable(dbapi_conn, False) 

3100 

3101 def set_characteristic(self, dialect, dbapi_conn, value): 

3102 dialect.set_deferrable(dbapi_conn, value) 

3103 

3104 def get_characteristic(self, dialect, dbapi_conn): 

3105 return dialect.get_deferrable(dbapi_conn) 

3106 

3107 

3108class PGDialect(default.DefaultDialect): 

3109 name = "postgresql" 

3110 supports_statement_cache = True 

3111 supports_alter = True 

3112 max_identifier_length = 63 

3113 supports_sane_rowcount = True 

3114 

3115 bind_typing = interfaces.BindTyping.RENDER_CASTS 

3116 

3117 supports_native_enum = True 

3118 supports_native_boolean = True 

3119 supports_native_uuid = True 

3120 supports_smallserial = True 

3121 

3122 supports_sequences = True 

3123 sequences_optional = True 

3124 preexecute_autoincrement_sequences = True 

3125 postfetch_lastrowid = False 

3126 use_insertmanyvalues = True 

3127 

3128 returns_native_bytes = True 

3129 

3130 insertmanyvalues_implicit_sentinel = ( 

3131 InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT 

3132 | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT 

3133 | InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS 

3134 ) 

3135 

3136 supports_comments = True 

3137 supports_constraint_comments = True 

3138 supports_default_values = True 

3139 

3140 supports_default_metavalue = True 

3141 

3142 supports_empty_insert = False 

3143 supports_multivalues_insert = True 

3144 

3145 supports_identity_columns = True 

3146 

3147 default_paramstyle = "pyformat" 

3148 ischema_names = ischema_names 

3149 colspecs = colspecs 

3150 

3151 statement_compiler = PGCompiler 

3152 ddl_compiler = PGDDLCompiler 

3153 type_compiler_cls = PGTypeCompiler 

3154 preparer = PGIdentifierPreparer 

3155 execution_ctx_cls = PGExecutionContext 

3156 inspector = PGInspector 

3157 

3158 update_returning = True 

3159 delete_returning = True 

3160 insert_returning = True 

3161 update_returning_multifrom = True 

3162 delete_returning_multifrom = True 

3163 

3164 connection_characteristics = ( 

3165 default.DefaultDialect.connection_characteristics 

3166 ) 

3167 connection_characteristics = connection_characteristics.union( 

3168 { 

3169 "postgresql_readonly": PGReadOnlyConnectionCharacteristic(), 

3170 "postgresql_deferrable": PGDeferrableConnectionCharacteristic(), 

3171 } 

3172 ) 

3173 

3174 construct_arguments = [ 

3175 ( 

3176 schema.Index, 

3177 { 

3178 "using": False, 

3179 "include": None, 

3180 "where": None, 

3181 "ops": {}, 

3182 "concurrently": False, 

3183 "with": {}, 

3184 "tablespace": None, 

3185 "nulls_not_distinct": None, 

3186 }, 

3187 ), 

3188 ( 

3189 schema.Table, 

3190 { 

3191 "ignore_search_path": False, 

3192 "tablespace": None, 

3193 "partition_by": None, 

3194 "with_oids": None, 

3195 "on_commit": None, 

3196 "inherits": None, 

3197 "using": None, 

3198 }, 

3199 ), 

3200 ( 

3201 schema.CheckConstraint, 

3202 { 

3203 "not_valid": False, 

3204 }, 

3205 ), 

3206 ( 

3207 schema.ForeignKeyConstraint, 

3208 { 

3209 "not_valid": False, 

3210 }, 

3211 ), 

3212 ( 

3213 schema.PrimaryKeyConstraint, 

3214 {"include": None}, 

3215 ), 

3216 ( 

3217 schema.UniqueConstraint, 

3218 { 

3219 "include": None, 

3220 "nulls_not_distinct": None, 

3221 }, 

3222 ), 

3223 ] 

3224 

3225 reflection_options = ("postgresql_ignore_search_path",) 

3226 

3227 _backslash_escapes = True 

3228 _supports_create_index_concurrently = True 

3229 _supports_drop_index_concurrently = True 

3230 

3231 def __init__( 

3232 self, 

3233 native_inet_types=None, 

3234 json_serializer=None, 

3235 json_deserializer=None, 

3236 **kwargs, 

3237 ): 

3238 default.DefaultDialect.__init__(self, **kwargs) 

3239 

3240 self._native_inet_types = native_inet_types 

3241 self._json_deserializer = json_deserializer 

3242 self._json_serializer = json_serializer 

3243 

3244 def initialize(self, connection): 

3245 super().initialize(connection) 

3246 

3247 # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689 

3248 self.supports_smallserial = self.server_version_info >= (9, 2) 

3249 

3250 self._set_backslash_escapes(connection) 

3251 

3252 self._supports_drop_index_concurrently = self.server_version_info >= ( 

3253 9, 

3254 2, 

3255 ) 

3256 self.supports_identity_columns = self.server_version_info >= (10,) 

3257 

3258 def get_isolation_level_values(self, dbapi_conn): 

3259 # note the generic dialect doesn't have AUTOCOMMIT, however 

3260 # all postgresql dialects should include AUTOCOMMIT. 

3261 return ( 

3262 "SERIALIZABLE", 

3263 "READ UNCOMMITTED", 

3264 "READ COMMITTED", 

3265 "REPEATABLE READ", 

3266 ) 

3267 

3268 def set_isolation_level(self, dbapi_connection, level): 

3269 cursor = dbapi_connection.cursor() 

3270 cursor.execute( 

3271 "SET SESSION CHARACTERISTICS AS TRANSACTION " 

3272 f"ISOLATION LEVEL {level}" 

3273 ) 

3274 cursor.execute("COMMIT") 

3275 cursor.close() 

3276 

3277 def get_isolation_level(self, dbapi_connection): 

3278 cursor = dbapi_connection.cursor() 

3279 cursor.execute("show transaction isolation level") 

3280 val = cursor.fetchone()[0] 

3281 cursor.close() 

3282 return val.upper() 

3283 

3284 def set_readonly(self, connection, value): 

3285 raise NotImplementedError() 

3286 

3287 def get_readonly(self, connection): 

3288 raise NotImplementedError() 

3289 

3290 def set_deferrable(self, connection, value): 

3291 raise NotImplementedError() 

3292 

3293 def get_deferrable(self, connection): 

3294 raise NotImplementedError() 

3295 

3296 def _split_multihost_from_url(self, url: URL) -> Union[ 

3297 Tuple[None, None], 

3298 Tuple[Tuple[Optional[str], ...], Tuple[Optional[int], ...]], 

3299 ]: 

3300 hosts: Optional[Tuple[Optional[str], ...]] = None 

3301 ports_str: Union[str, Tuple[Optional[str], ...], None] = None 

3302 

3303 integrated_multihost = False 

3304 

3305 if "host" in url.query: 

3306 if isinstance(url.query["host"], (list, tuple)): 

3307 integrated_multihost = True 

3308 hosts, ports_str = zip( 

3309 *[ 

3310 token.split(":") if ":" in token else (token, None) 

3311 for token in url.query["host"] 

3312 ] 

3313 ) 

3314 

3315 elif isinstance(url.query["host"], str): 

3316 hosts = tuple(url.query["host"].split(",")) 

3317 

3318 if ( 

3319 "port" not in url.query 

3320 and len(hosts) == 1 

3321 and ":" in hosts[0] 

3322 ): 

3323 # internet host is alphanumeric plus dots or hyphens. 

3324 # this is essentially rfc1123, which refers to rfc952. 

3325 # https://stackoverflow.com/questions/3523028/ 

3326 # valid-characters-of-a-hostname 

3327 host_port_match = re.match( 

3328 r"^([a-zA-Z0-9\-\.]*)(?:\:(\d*))?$", hosts[0] 

3329 ) 

3330 if host_port_match: 

3331 integrated_multihost = True 

3332 h, p = host_port_match.group(1, 2) 

3333 if TYPE_CHECKING: 

3334 assert isinstance(h, str) 

3335 assert isinstance(p, str) 

3336 hosts = (h,) 

3337 ports_str = cast( 

3338 "Tuple[Optional[str], ...]", (p,) if p else (None,) 

3339 ) 

3340 

3341 if "port" in url.query: 

3342 if integrated_multihost: 

3343 raise exc.ArgumentError( 

3344 "Can't mix 'multihost' formats together; use " 

3345 '"host=h1,h2,h3&port=p1,p2,p3" or ' 

3346 '"host=h1:p1&host=h2:p2&host=h3:p3" separately' 

3347 ) 

3348 if isinstance(url.query["port"], (list, tuple)): 

3349 ports_str = url.query["port"] 

3350 elif isinstance(url.query["port"], str): 

3351 ports_str = tuple(url.query["port"].split(",")) 

3352 

3353 ports: Optional[Tuple[Optional[int], ...]] = None 

3354 

3355 if ports_str: 

3356 try: 

3357 ports = tuple(int(x) if x else None for x in ports_str) 

3358 except ValueError: 

3359 raise exc.ArgumentError( 

3360 f"Received non-integer port arguments: {ports_str}" 

3361 ) from None 

3362 

3363 if ports and ( 

3364 (not hosts and len(ports) > 1) 

3365 or ( 

3366 hosts 

3367 and ports 

3368 and len(hosts) != len(ports) 

3369 and (len(hosts) > 1 or len(ports) > 1) 

3370 ) 

3371 ): 

3372 raise exc.ArgumentError("number of hosts and ports don't match") 

3373 

3374 if hosts is not None: 

3375 if ports is None: 

3376 ports = tuple(None for _ in hosts) 

3377 

3378 return hosts, ports # type: ignore 

3379 

3380 def do_begin_twophase(self, connection, xid): 

3381 self.do_begin(connection.connection) 

3382 

3383 def do_prepare_twophase(self, connection, xid): 

3384 connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid) 

3385 

3386 def do_rollback_twophase( 

3387 self, connection, xid, is_prepared=True, recover=False 

3388 ): 

3389 if is_prepared: 

3390 if recover: 

3391 # FIXME: ugly hack to get out of transaction 

3392 # context when committing recoverable transactions 

3393 # Must find out a way how to make the dbapi not 

3394 # open a transaction. 

3395 connection.exec_driver_sql("ROLLBACK") 

3396 connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid) 

3397 connection.exec_driver_sql("BEGIN") 

3398 self.do_rollback(connection.connection) 

3399 else: 

3400 self.do_rollback(connection.connection) 

3401 

3402 def do_commit_twophase( 

3403 self, connection, xid, is_prepared=True, recover=False 

3404 ): 

3405 if is_prepared: 

3406 if recover: 

3407 connection.exec_driver_sql("ROLLBACK") 

3408 connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid) 

3409 connection.exec_driver_sql("BEGIN") 

3410 self.do_rollback(connection.connection) 

3411 else: 

3412 self.do_commit(connection.connection) 

3413 

3414 def do_recover_twophase(self, connection): 

3415 return connection.scalars( 

3416 sql.text("SELECT gid FROM pg_prepared_xacts") 

3417 ).all() 

3418 

3419 def _get_default_schema_name(self, connection): 

3420 return connection.exec_driver_sql("select current_schema()").scalar() 

3421 

3422 @reflection.cache 

3423 def has_schema(self, connection, schema, **kw): 

3424 query = select(pg_catalog.pg_namespace.c.nspname).where( 

3425 pg_catalog.pg_namespace.c.nspname == schema 

3426 ) 

3427 return bool(connection.scalar(query)) 

3428 

3429 def _pg_class_filter_scope_schema( 

3430 self, query, schema, scope, pg_class_table=None 

3431 ): 

3432 if pg_class_table is None: 

3433 pg_class_table = pg_catalog.pg_class 

3434 query = query.join( 

3435 pg_catalog.pg_namespace, 

3436 pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace, 

3437 ) 

3438 

3439 if scope is ObjectScope.DEFAULT: 

3440 query = query.where(pg_class_table.c.relpersistence != "t") 

3441 elif scope is ObjectScope.TEMPORARY: 

3442 query = query.where(pg_class_table.c.relpersistence == "t") 

3443 

3444 if schema is None: 

3445 query = query.where( 

3446 pg_catalog.pg_table_is_visible(pg_class_table.c.oid), 

3447 # ignore pg_catalog schema 

3448 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

3449 ) 

3450 else: 

3451 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

3452 return query 

3453 

3454 def _pg_class_relkind_condition(self, relkinds, pg_class_table=None): 

3455 if pg_class_table is None: 

3456 pg_class_table = pg_catalog.pg_class 

3457 # uses the any form instead of in otherwise postgresql complaings 

3458 # that 'IN could not convert type character to "char"' 

3459 return pg_class_table.c.relkind == sql.any_(_array.array(relkinds)) 

3460 

3461 @lru_cache() 

3462 def _has_table_query(self, schema): 

3463 query = select(pg_catalog.pg_class.c.relname).where( 

3464 pg_catalog.pg_class.c.relname == bindparam("table_name"), 

3465 self._pg_class_relkind_condition( 

3466 pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3467 ), 

3468 ) 

3469 return self._pg_class_filter_scope_schema( 

3470 query, schema, scope=ObjectScope.ANY 

3471 ) 

3472 

3473 @reflection.cache 

3474 def has_table(self, connection, table_name, schema=None, **kw): 

3475 self._ensure_has_table_connection(connection) 

3476 query = self._has_table_query(schema) 

3477 return bool(connection.scalar(query, {"table_name": table_name})) 

3478 

3479 @reflection.cache 

3480 def has_sequence(self, connection, sequence_name, schema=None, **kw): 

3481 query = select(pg_catalog.pg_class.c.relname).where( 

3482 pg_catalog.pg_class.c.relkind == "S", 

3483 pg_catalog.pg_class.c.relname == sequence_name, 

3484 ) 

3485 query = self._pg_class_filter_scope_schema( 

3486 query, schema, scope=ObjectScope.ANY 

3487 ) 

3488 return bool(connection.scalar(query)) 

3489 

3490 @reflection.cache 

3491 def has_type(self, connection, type_name, schema=None, **kw): 

3492 query = ( 

3493 select(pg_catalog.pg_type.c.typname) 

3494 .join( 

3495 pg_catalog.pg_namespace, 

3496 pg_catalog.pg_namespace.c.oid 

3497 == pg_catalog.pg_type.c.typnamespace, 

3498 ) 

3499 .where(pg_catalog.pg_type.c.typname == type_name) 

3500 ) 

3501 if schema is None: 

3502 query = query.where( 

3503 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), 

3504 # ignore pg_catalog schema 

3505 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

3506 ) 

3507 elif schema != "*": 

3508 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

3509 

3510 return bool(connection.scalar(query)) 

3511 

3512 def _get_server_version_info(self, connection): 

3513 v = connection.exec_driver_sql("select pg_catalog.version()").scalar() 

3514 m = re.match( 

3515 r".*(?:PostgreSQL|EnterpriseDB) " 

3516 r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?", 

3517 v, 

3518 ) 

3519 if not m: 

3520 raise AssertionError( 

3521 "Could not determine version from string '%s'" % v 

3522 ) 

3523 return tuple([int(x) for x in m.group(1, 2, 3) if x is not None]) 

3524 

3525 @reflection.cache 

3526 def get_table_oid(self, connection, table_name, schema=None, **kw): 

3527 """Fetch the oid for schema.table_name.""" 

3528 query = select(pg_catalog.pg_class.c.oid).where( 

3529 pg_catalog.pg_class.c.relname == table_name, 

3530 self._pg_class_relkind_condition( 

3531 pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3532 ), 

3533 ) 

3534 query = self._pg_class_filter_scope_schema( 

3535 query, schema, scope=ObjectScope.ANY 

3536 ) 

3537 table_oid = connection.scalar(query) 

3538 if table_oid is None: 

3539 raise exc.NoSuchTableError( 

3540 f"{schema}.{table_name}" if schema else table_name 

3541 ) 

3542 return table_oid 

3543 

3544 @reflection.cache 

3545 def get_schema_names(self, connection, **kw): 

3546 query = ( 

3547 select(pg_catalog.pg_namespace.c.nspname) 

3548 .where(pg_catalog.pg_namespace.c.nspname.not_like("pg_%")) 

3549 .order_by(pg_catalog.pg_namespace.c.nspname) 

3550 ) 

3551 return connection.scalars(query).all() 

3552 

3553 def _get_relnames_for_relkinds(self, connection, schema, relkinds, scope): 

3554 query = select(pg_catalog.pg_class.c.relname).where( 

3555 self._pg_class_relkind_condition(relkinds) 

3556 ) 

3557 query = self._pg_class_filter_scope_schema(query, schema, scope=scope) 

3558 return connection.scalars(query).all() 

3559 

3560 @reflection.cache 

3561 def get_table_names(self, connection, schema=None, **kw): 

3562 return self._get_relnames_for_relkinds( 

3563 connection, 

3564 schema, 

3565 pg_catalog.RELKINDS_TABLE_NO_FOREIGN, 

3566 scope=ObjectScope.DEFAULT, 

3567 ) 

3568 

3569 @reflection.cache 

3570 def get_temp_table_names(self, connection, **kw): 

3571 return self._get_relnames_for_relkinds( 

3572 connection, 

3573 schema=None, 

3574 relkinds=pg_catalog.RELKINDS_TABLE_NO_FOREIGN, 

3575 scope=ObjectScope.TEMPORARY, 

3576 ) 

3577 

3578 @reflection.cache 

3579 def _get_foreign_table_names(self, connection, schema=None, **kw): 

3580 return self._get_relnames_for_relkinds( 

3581 connection, schema, relkinds=("f",), scope=ObjectScope.ANY 

3582 ) 

3583 

3584 @reflection.cache 

3585 def get_view_names(self, connection, schema=None, **kw): 

3586 return self._get_relnames_for_relkinds( 

3587 connection, 

3588 schema, 

3589 pg_catalog.RELKINDS_VIEW, 

3590 scope=ObjectScope.DEFAULT, 

3591 ) 

3592 

3593 @reflection.cache 

3594 def get_materialized_view_names(self, connection, schema=None, **kw): 

3595 return self._get_relnames_for_relkinds( 

3596 connection, 

3597 schema, 

3598 pg_catalog.RELKINDS_MAT_VIEW, 

3599 scope=ObjectScope.DEFAULT, 

3600 ) 

3601 

3602 @reflection.cache 

3603 def get_temp_view_names(self, connection, schema=None, **kw): 

3604 return self._get_relnames_for_relkinds( 

3605 connection, 

3606 schema, 

3607 # NOTE: do not include temp materialzied views (that do not 

3608 # seem to be a thing at least up to version 14) 

3609 pg_catalog.RELKINDS_VIEW, 

3610 scope=ObjectScope.TEMPORARY, 

3611 ) 

3612 

3613 @reflection.cache 

3614 def get_sequence_names(self, connection, schema=None, **kw): 

3615 return self._get_relnames_for_relkinds( 

3616 connection, schema, relkinds=("S",), scope=ObjectScope.ANY 

3617 ) 

3618 

3619 @reflection.cache 

3620 def get_view_definition(self, connection, view_name, schema=None, **kw): 

3621 query = ( 

3622 select(pg_catalog.pg_get_viewdef(pg_catalog.pg_class.c.oid)) 

3623 .select_from(pg_catalog.pg_class) 

3624 .where( 

3625 pg_catalog.pg_class.c.relname == view_name, 

3626 self._pg_class_relkind_condition( 

3627 pg_catalog.RELKINDS_VIEW + pg_catalog.RELKINDS_MAT_VIEW 

3628 ), 

3629 ) 

3630 ) 

3631 query = self._pg_class_filter_scope_schema( 

3632 query, schema, scope=ObjectScope.ANY 

3633 ) 

3634 res = connection.scalar(query) 

3635 if res is None: 

3636 raise exc.NoSuchTableError( 

3637 f"{schema}.{view_name}" if schema else view_name 

3638 ) 

3639 else: 

3640 return res 

3641 

3642 def _value_or_raise(self, data, table, schema): 

3643 try: 

3644 return dict(data)[(schema, table)] 

3645 except KeyError: 

3646 raise exc.NoSuchTableError( 

3647 f"{schema}.{table}" if schema else table 

3648 ) from None 

3649 

3650 def _prepare_filter_names(self, filter_names): 

3651 if filter_names: 

3652 return True, {"filter_names": filter_names} 

3653 else: 

3654 return False, {} 

3655 

3656 def _kind_to_relkinds(self, kind: ObjectKind) -> Tuple[str, ...]: 

3657 if kind is ObjectKind.ANY: 

3658 return pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3659 relkinds = () 

3660 if ObjectKind.TABLE in kind: 

3661 relkinds += pg_catalog.RELKINDS_TABLE 

3662 if ObjectKind.VIEW in kind: 

3663 relkinds += pg_catalog.RELKINDS_VIEW 

3664 if ObjectKind.MATERIALIZED_VIEW in kind: 

3665 relkinds += pg_catalog.RELKINDS_MAT_VIEW 

3666 return relkinds 

3667 

3668 @reflection.cache 

3669 def get_columns(self, connection, table_name, schema=None, **kw): 

3670 data = self.get_multi_columns( 

3671 connection, 

3672 schema=schema, 

3673 filter_names=[table_name], 

3674 scope=ObjectScope.ANY, 

3675 kind=ObjectKind.ANY, 

3676 **kw, 

3677 ) 

3678 return self._value_or_raise(data, table_name, schema) 

3679 

3680 @lru_cache() 

3681 def _columns_query(self, schema, has_filter_names, scope, kind): 

3682 # NOTE: the query with the default and identity options scalar 

3683 # subquery is faster than trying to use outer joins for them 

3684 generated = ( 

3685 pg_catalog.pg_attribute.c.attgenerated.label("generated") 

3686 if self.server_version_info >= (12,) 

3687 else sql.null().label("generated") 

3688 ) 

3689 if self.server_version_info >= (10,): 

3690 # join lateral performs worse (~2x slower) than a scalar_subquery 

3691 identity = ( 

3692 select( 

3693 sql.func.json_build_object( 

3694 "always", 

3695 pg_catalog.pg_attribute.c.attidentity == "a", 

3696 "start", 

3697 pg_catalog.pg_sequence.c.seqstart, 

3698 "increment", 

3699 pg_catalog.pg_sequence.c.seqincrement, 

3700 "minvalue", 

3701 pg_catalog.pg_sequence.c.seqmin, 

3702 "maxvalue", 

3703 pg_catalog.pg_sequence.c.seqmax, 

3704 "cache", 

3705 pg_catalog.pg_sequence.c.seqcache, 

3706 "cycle", 

3707 pg_catalog.pg_sequence.c.seqcycle, 

3708 type_=sqltypes.JSON(), 

3709 ) 

3710 ) 

3711 .select_from(pg_catalog.pg_sequence) 

3712 .where( 

3713 # attidentity != '' is required or it will reflect also 

3714 # serial columns as identity. 

3715 pg_catalog.pg_attribute.c.attidentity != "", 

3716 pg_catalog.pg_sequence.c.seqrelid 

3717 == sql.cast( 

3718 sql.cast( 

3719 pg_catalog.pg_get_serial_sequence( 

3720 sql.cast( 

3721 sql.cast( 

3722 pg_catalog.pg_attribute.c.attrelid, 

3723 REGCLASS, 

3724 ), 

3725 TEXT, 

3726 ), 

3727 pg_catalog.pg_attribute.c.attname, 

3728 ), 

3729 REGCLASS, 

3730 ), 

3731 OID, 

3732 ), 

3733 ) 

3734 .correlate(pg_catalog.pg_attribute) 

3735 .scalar_subquery() 

3736 .label("identity_options") 

3737 ) 

3738 else: 

3739 identity = sql.null().label("identity_options") 

3740 

3741 # join lateral performs the same as scalar_subquery here 

3742 default = ( 

3743 select( 

3744 pg_catalog.pg_get_expr( 

3745 pg_catalog.pg_attrdef.c.adbin, 

3746 pg_catalog.pg_attrdef.c.adrelid, 

3747 ) 

3748 ) 

3749 .select_from(pg_catalog.pg_attrdef) 

3750 .where( 

3751 pg_catalog.pg_attrdef.c.adrelid 

3752 == pg_catalog.pg_attribute.c.attrelid, 

3753 pg_catalog.pg_attrdef.c.adnum 

3754 == pg_catalog.pg_attribute.c.attnum, 

3755 pg_catalog.pg_attribute.c.atthasdef, 

3756 ) 

3757 .correlate(pg_catalog.pg_attribute) 

3758 .scalar_subquery() 

3759 .label("default") 

3760 ) 

3761 relkinds = self._kind_to_relkinds(kind) 

3762 query = ( 

3763 select( 

3764 pg_catalog.pg_attribute.c.attname.label("name"), 

3765 pg_catalog.format_type( 

3766 pg_catalog.pg_attribute.c.atttypid, 

3767 pg_catalog.pg_attribute.c.atttypmod, 

3768 ).label("format_type"), 

3769 default, 

3770 pg_catalog.pg_attribute.c.attnotnull.label("not_null"), 

3771 pg_catalog.pg_class.c.relname.label("table_name"), 

3772 pg_catalog.pg_description.c.description.label("comment"), 

3773 generated, 

3774 identity, 

3775 ) 

3776 .select_from(pg_catalog.pg_class) 

3777 # NOTE: postgresql support table with no user column, meaning 

3778 # there is no row with pg_attribute.attnum > 0. use a left outer 

3779 # join to avoid filtering these tables. 

3780 .outerjoin( 

3781 pg_catalog.pg_attribute, 

3782 sql.and_( 

3783 pg_catalog.pg_class.c.oid 

3784 == pg_catalog.pg_attribute.c.attrelid, 

3785 pg_catalog.pg_attribute.c.attnum > 0, 

3786 ~pg_catalog.pg_attribute.c.attisdropped, 

3787 ), 

3788 ) 

3789 .outerjoin( 

3790 pg_catalog.pg_description, 

3791 sql.and_( 

3792 pg_catalog.pg_description.c.objoid 

3793 == pg_catalog.pg_attribute.c.attrelid, 

3794 pg_catalog.pg_description.c.objsubid 

3795 == pg_catalog.pg_attribute.c.attnum, 

3796 ), 

3797 ) 

3798 .where(self._pg_class_relkind_condition(relkinds)) 

3799 .order_by( 

3800 pg_catalog.pg_class.c.relname, pg_catalog.pg_attribute.c.attnum 

3801 ) 

3802 ) 

3803 query = self._pg_class_filter_scope_schema(query, schema, scope=scope) 

3804 if has_filter_names: 

3805 query = query.where( 

3806 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

3807 ) 

3808 return query 

3809 

3810 def get_multi_columns( 

3811 self, connection, schema, filter_names, scope, kind, **kw 

3812 ): 

3813 has_filter_names, params = self._prepare_filter_names(filter_names) 

3814 query = self._columns_query(schema, has_filter_names, scope, kind) 

3815 rows = connection.execute(query, params).mappings() 

3816 

3817 # dictionary with (name, ) if default search path or (schema, name) 

3818 # as keys 

3819 domains = { 

3820 ((d["schema"], d["name"]) if not d["visible"] else (d["name"],)): d 

3821 for d in self._load_domains( 

3822 connection, schema="*", info_cache=kw.get("info_cache") 

3823 ) 

3824 } 

3825 

3826 # dictionary with (name, ) if default search path or (schema, name) 

3827 # as keys 

3828 enums = dict( 

3829 ( 

3830 ((rec["name"],), rec) 

3831 if rec["visible"] 

3832 else ((rec["schema"], rec["name"]), rec) 

3833 ) 

3834 for rec in self._load_enums( 

3835 connection, schema="*", info_cache=kw.get("info_cache") 

3836 ) 

3837 ) 

3838 

3839 columns = self._get_columns_info(rows, domains, enums, schema) 

3840 

3841 return columns.items() 

3842 

3843 _format_type_args_pattern = re.compile(r"\((.*)\)") 

3844 _format_type_args_delim = re.compile(r"\s*,\s*") 

3845 _format_array_spec_pattern = re.compile(r"((?:\[\])*)$") 

3846 

3847 def _reflect_type( 

3848 self, 

3849 format_type: Optional[str], 

3850 domains: Dict[str, ReflectedDomain], 

3851 enums: Dict[str, ReflectedEnum], 

3852 type_description: str, 

3853 ) -> sqltypes.TypeEngine[Any]: 

3854 """ 

3855 Attempts to reconstruct a column type defined in ischema_names based 

3856 on the information available in the format_type. 

3857 

3858 If the `format_type` cannot be associated with a known `ischema_names`, 

3859 it is treated as a reference to a known PostgreSQL named `ENUM` or 

3860 `DOMAIN` type. 

3861 """ 

3862 type_description = type_description or "unknown type" 

3863 if format_type is None: 

3864 util.warn( 

3865 "PostgreSQL format_type() returned NULL for %s" 

3866 % type_description 

3867 ) 

3868 return sqltypes.NULLTYPE 

3869 

3870 attype_args_match = self._format_type_args_pattern.search(format_type) 

3871 if attype_args_match and attype_args_match.group(1): 

3872 attype_args = self._format_type_args_delim.split( 

3873 attype_args_match.group(1) 

3874 ) 

3875 else: 

3876 attype_args = () 

3877 

3878 match_array_dim = self._format_array_spec_pattern.search(format_type) 

3879 # Each "[]" in array specs corresponds to an array dimension 

3880 array_dim = len(match_array_dim.group(1) or "") // 2 

3881 

3882 # Remove all parameters and array specs from format_type to obtain an 

3883 # ischema_name candidate 

3884 attype = self._format_type_args_pattern.sub("", format_type) 

3885 attype = self._format_array_spec_pattern.sub("", attype) 

3886 

3887 schema_type = self.ischema_names.get(attype.lower(), None) 

3888 args, kwargs = (), {} 

3889 

3890 if attype == "numeric": 

3891 if len(attype_args) == 2: 

3892 precision, scale = map(int, attype_args) 

3893 args = (precision, scale) 

3894 

3895 elif attype == "double precision": 

3896 args = (53,) 

3897 

3898 elif attype == "integer": 

3899 args = () 

3900 

3901 elif attype in ("timestamp with time zone", "time with time zone"): 

3902 kwargs["timezone"] = True 

3903 if len(attype_args) == 1: 

3904 kwargs["precision"] = int(attype_args[0]) 

3905 

3906 elif attype in ( 

3907 "timestamp without time zone", 

3908 "time without time zone", 

3909 "time", 

3910 ): 

3911 kwargs["timezone"] = False 

3912 if len(attype_args) == 1: 

3913 kwargs["precision"] = int(attype_args[0]) 

3914 

3915 elif attype == "bit varying": 

3916 kwargs["varying"] = True 

3917 if len(attype_args) == 1: 

3918 charlen = int(attype_args[0]) 

3919 args = (charlen,) 

3920 

3921 elif attype.startswith("interval"): 

3922 schema_type = INTERVAL 

3923 

3924 field_match = re.match(r"interval (.+)", attype) 

3925 if field_match: 

3926 kwargs["fields"] = field_match.group(1) 

3927 

3928 if len(attype_args) == 1: 

3929 kwargs["precision"] = int(attype_args[0]) 

3930 

3931 else: 

3932 enum_or_domain_key = tuple(util.quoted_token_parser(attype)) 

3933 

3934 if enum_or_domain_key in enums: 

3935 schema_type = ENUM 

3936 enum = enums[enum_or_domain_key] 

3937 

3938 kwargs["name"] = enum["name"] 

3939 

3940 if not enum["visible"]: 

3941 kwargs["schema"] = enum["schema"] 

3942 args = tuple(enum["labels"]) 

3943 elif enum_or_domain_key in domains: 

3944 schema_type = DOMAIN 

3945 domain = domains[enum_or_domain_key] 

3946 

3947 data_type = self._reflect_type( 

3948 domain["type"], 

3949 domains, 

3950 enums, 

3951 type_description="DOMAIN '%s'" % domain["name"], 

3952 ) 

3953 args = (domain["name"], data_type) 

3954 

3955 kwargs["collation"] = domain["collation"] 

3956 kwargs["default"] = domain["default"] 

3957 kwargs["not_null"] = not domain["nullable"] 

3958 kwargs["create_type"] = False 

3959 

3960 if domain["constraints"]: 

3961 # We only support a single constraint 

3962 check_constraint = domain["constraints"][0] 

3963 

3964 kwargs["constraint_name"] = check_constraint["name"] 

3965 kwargs["check"] = check_constraint["check"] 

3966 

3967 if not domain["visible"]: 

3968 kwargs["schema"] = domain["schema"] 

3969 

3970 else: 

3971 try: 

3972 charlen = int(attype_args[0]) 

3973 args = (charlen, *attype_args[1:]) 

3974 except (ValueError, IndexError): 

3975 args = attype_args 

3976 

3977 if not schema_type: 

3978 util.warn( 

3979 "Did not recognize type '%s' of %s" 

3980 % (attype, type_description) 

3981 ) 

3982 return sqltypes.NULLTYPE 

3983 

3984 data_type = schema_type(*args, **kwargs) 

3985 if array_dim >= 1: 

3986 # postgres does not preserve dimensionality or size of array types. 

3987 data_type = _array.ARRAY(data_type) 

3988 

3989 return data_type 

3990 

3991 def _get_columns_info(self, rows, domains, enums, schema): 

3992 columns = defaultdict(list) 

3993 for row_dict in rows: 

3994 # ensure that each table has an entry, even if it has no columns 

3995 if row_dict["name"] is None: 

3996 columns[(schema, row_dict["table_name"])] = ( 

3997 ReflectionDefaults.columns() 

3998 ) 

3999 continue 

4000 table_cols = columns[(schema, row_dict["table_name"])] 

4001 

4002 coltype = self._reflect_type( 

4003 row_dict["format_type"], 

4004 domains, 

4005 enums, 

4006 type_description="column '%s'" % row_dict["name"], 

4007 ) 

4008 

4009 default = row_dict["default"] 

4010 name = row_dict["name"] 

4011 generated = row_dict["generated"] 

4012 nullable = not row_dict["not_null"] 

4013 

4014 if isinstance(coltype, DOMAIN): 

4015 if not default: 

4016 # domain can override the default value but 

4017 # cant set it to None 

4018 if coltype.default is not None: 

4019 default = coltype.default 

4020 

4021 nullable = nullable and not coltype.not_null 

4022 

4023 identity = row_dict["identity_options"] 

4024 

4025 # If a zero byte or blank string depending on driver (is also 

4026 # absent for older PG versions), then not a generated column. 

4027 # Otherwise, s = stored. (Other values might be added in the 

4028 # future.) 

4029 if generated not in (None, "", b"\x00"): 

4030 computed = dict( 

4031 sqltext=default, persisted=generated in ("s", b"s") 

4032 ) 

4033 default = None 

4034 else: 

4035 computed = None 

4036 

4037 # adjust the default value 

4038 autoincrement = False 

4039 if default is not None: 

4040 match = re.search(r"""(nextval\(')([^']+)('.*$)""", default) 

4041 if match is not None: 

4042 if issubclass(coltype._type_affinity, sqltypes.Integer): 

4043 autoincrement = True 

4044 # the default is related to a Sequence 

4045 if "." not in match.group(2) and schema is not None: 

4046 # unconditionally quote the schema name. this could 

4047 # later be enhanced to obey quoting rules / 

4048 # "quote schema" 

4049 default = ( 

4050 match.group(1) 

4051 + ('"%s"' % schema) 

4052 + "." 

4053 + match.group(2) 

4054 + match.group(3) 

4055 ) 

4056 

4057 column_info = { 

4058 "name": name, 

4059 "type": coltype, 

4060 "nullable": nullable, 

4061 "default": default, 

4062 "autoincrement": autoincrement or identity is not None, 

4063 "comment": row_dict["comment"], 

4064 } 

4065 if computed is not None: 

4066 column_info["computed"] = computed 

4067 if identity is not None: 

4068 column_info["identity"] = identity 

4069 

4070 table_cols.append(column_info) 

4071 

4072 return columns 

4073 

4074 @lru_cache() 

4075 def _table_oids_query(self, schema, has_filter_names, scope, kind): 

4076 relkinds = self._kind_to_relkinds(kind) 

4077 oid_q = select( 

4078 pg_catalog.pg_class.c.oid, pg_catalog.pg_class.c.relname 

4079 ).where(self._pg_class_relkind_condition(relkinds)) 

4080 oid_q = self._pg_class_filter_scope_schema(oid_q, schema, scope=scope) 

4081 

4082 if has_filter_names: 

4083 oid_q = oid_q.where( 

4084 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

4085 ) 

4086 return oid_q 

4087 

4088 @reflection.flexi_cache( 

4089 ("schema", InternalTraversal.dp_string), 

4090 ("filter_names", InternalTraversal.dp_string_list), 

4091 ("kind", InternalTraversal.dp_plain_obj), 

4092 ("scope", InternalTraversal.dp_plain_obj), 

4093 ) 

4094 def _get_table_oids( 

4095 self, connection, schema, filter_names, scope, kind, **kw 

4096 ): 

4097 has_filter_names, params = self._prepare_filter_names(filter_names) 

4098 oid_q = self._table_oids_query(schema, has_filter_names, scope, kind) 

4099 result = connection.execute(oid_q, params) 

4100 return result.all() 

4101 

4102 @util.memoized_property 

4103 def _constraint_query(self): 

4104 if self.server_version_info >= (11, 0): 

4105 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts 

4106 else: 

4107 indnkeyatts = sql.null().label("indnkeyatts") 

4108 

4109 if self.server_version_info >= (15,): 

4110 indnullsnotdistinct = pg_catalog.pg_index.c.indnullsnotdistinct 

4111 else: 

4112 indnullsnotdistinct = sql.false().label("indnullsnotdistinct") 

4113 

4114 con_sq = ( 

4115 select( 

4116 pg_catalog.pg_constraint.c.conrelid, 

4117 pg_catalog.pg_constraint.c.conname, 

4118 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"), 

4119 sql.func.generate_subscripts( 

4120 pg_catalog.pg_index.c.indkey, 1 

4121 ).label("ord"), 

4122 indnkeyatts, 

4123 indnullsnotdistinct, 

4124 pg_catalog.pg_description.c.description, 

4125 ) 

4126 .join( 

4127 pg_catalog.pg_index, 

4128 pg_catalog.pg_constraint.c.conindid 

4129 == pg_catalog.pg_index.c.indexrelid, 

4130 ) 

4131 .outerjoin( 

4132 pg_catalog.pg_description, 

4133 pg_catalog.pg_description.c.objoid 

4134 == pg_catalog.pg_constraint.c.oid, 

4135 ) 

4136 .where( 

4137 pg_catalog.pg_constraint.c.contype == bindparam("contype"), 

4138 pg_catalog.pg_constraint.c.conrelid.in_(bindparam("oids")), 

4139 # NOTE: filtering also on pg_index.indrelid for oids does 

4140 # not seem to have a performance effect, but it may be an 

4141 # option if perf problems are reported 

4142 ) 

4143 .subquery("con") 

4144 ) 

4145 

4146 attr_sq = ( 

4147 select( 

4148 con_sq.c.conrelid, 

4149 con_sq.c.conname, 

4150 con_sq.c.description, 

4151 con_sq.c.ord, 

4152 con_sq.c.indnkeyatts, 

4153 con_sq.c.indnullsnotdistinct, 

4154 pg_catalog.pg_attribute.c.attname, 

4155 ) 

4156 .select_from(pg_catalog.pg_attribute) 

4157 .join( 

4158 con_sq, 

4159 sql.and_( 

4160 pg_catalog.pg_attribute.c.attnum == con_sq.c.attnum, 

4161 pg_catalog.pg_attribute.c.attrelid == con_sq.c.conrelid, 

4162 ), 

4163 ) 

4164 .where( 

4165 # NOTE: restate the condition here, since pg15 otherwise 

4166 # seems to get confused on pscopg2 sometimes, doing 

4167 # a sequential scan of pg_attribute. 

4168 # The condition in the con_sq subquery is not actually needed 

4169 # in pg15, but it may be needed in older versions. Keeping it 

4170 # does not seems to have any inpact in any case. 

4171 con_sq.c.conrelid.in_(bindparam("oids")) 

4172 ) 

4173 .subquery("attr") 

4174 ) 

4175 

4176 return ( 

4177 select( 

4178 attr_sq.c.conrelid, 

4179 sql.func.array_agg( 

4180 # NOTE: cast since some postgresql derivatives may 

4181 # not support array_agg on the name type 

4182 aggregate_order_by( 

4183 attr_sq.c.attname.cast(TEXT), attr_sq.c.ord 

4184 ) 

4185 ).label("cols"), 

4186 attr_sq.c.conname, 

4187 sql.func.min(attr_sq.c.description).label("description"), 

4188 sql.func.min(attr_sq.c.indnkeyatts).label("indnkeyatts"), 

4189 sql.func.bool_and(attr_sq.c.indnullsnotdistinct).label( 

4190 "indnullsnotdistinct" 

4191 ), 

4192 ) 

4193 .group_by(attr_sq.c.conrelid, attr_sq.c.conname) 

4194 .order_by(attr_sq.c.conrelid, attr_sq.c.conname) 

4195 ) 

4196 

4197 def _reflect_constraint( 

4198 self, connection, contype, schema, filter_names, scope, kind, **kw 

4199 ): 

4200 # used to reflect primary and unique constraint 

4201 table_oids = self._get_table_oids( 

4202 connection, schema, filter_names, scope, kind, **kw 

4203 ) 

4204 batches = list(table_oids) 

4205 is_unique = contype == "u" 

4206 

4207 while batches: 

4208 batch = batches[0:3000] 

4209 batches[0:3000] = [] 

4210 

4211 result = connection.execute( 

4212 self._constraint_query, 

4213 {"oids": [r[0] for r in batch], "contype": contype}, 

4214 ).mappings() 

4215 

4216 result_by_oid = defaultdict(list) 

4217 for row_dict in result: 

4218 result_by_oid[row_dict["conrelid"]].append(row_dict) 

4219 

4220 for oid, tablename in batch: 

4221 for_oid = result_by_oid.get(oid, ()) 

4222 if for_oid: 

4223 for row in for_oid: 

4224 # See note in get_multi_indexes 

4225 all_cols = row["cols"] 

4226 indnkeyatts = row["indnkeyatts"] 

4227 if ( 

4228 indnkeyatts is not None 

4229 and len(all_cols) > indnkeyatts 

4230 ): 

4231 inc_cols = all_cols[indnkeyatts:] 

4232 cst_cols = all_cols[:indnkeyatts] 

4233 else: 

4234 inc_cols = [] 

4235 cst_cols = all_cols 

4236 

4237 opts = {} 

4238 if self.server_version_info >= (11,): 

4239 opts["postgresql_include"] = inc_cols 

4240 if is_unique: 

4241 opts["postgresql_nulls_not_distinct"] = row[ 

4242 "indnullsnotdistinct" 

4243 ] 

4244 yield ( 

4245 tablename, 

4246 cst_cols, 

4247 row["conname"], 

4248 row["description"], 

4249 opts, 

4250 ) 

4251 else: 

4252 yield tablename, None, None, None, None 

4253 

4254 @reflection.cache 

4255 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

4256 data = self.get_multi_pk_constraint( 

4257 connection, 

4258 schema=schema, 

4259 filter_names=[table_name], 

4260 scope=ObjectScope.ANY, 

4261 kind=ObjectKind.ANY, 

4262 **kw, 

4263 ) 

4264 return self._value_or_raise(data, table_name, schema) 

4265 

4266 def get_multi_pk_constraint( 

4267 self, connection, schema, filter_names, scope, kind, **kw 

4268 ): 

4269 result = self._reflect_constraint( 

4270 connection, "p", schema, filter_names, scope, kind, **kw 

4271 ) 

4272 

4273 # only a single pk can be present for each table. Return an entry 

4274 # even if a table has no primary key 

4275 default = ReflectionDefaults.pk_constraint 

4276 

4277 def pk_constraint(pk_name, cols, comment, opts): 

4278 info = { 

4279 "constrained_columns": cols, 

4280 "name": pk_name, 

4281 "comment": comment, 

4282 } 

4283 if opts: 

4284 info["dialect_options"] = opts 

4285 return info 

4286 

4287 return ( 

4288 ( 

4289 (schema, table_name), 

4290 ( 

4291 pk_constraint(pk_name, cols, comment, opts) 

4292 if pk_name is not None 

4293 else default() 

4294 ), 

4295 ) 

4296 for table_name, cols, pk_name, comment, opts in result 

4297 ) 

4298 

4299 @reflection.cache 

4300 def get_foreign_keys( 

4301 self, 

4302 connection, 

4303 table_name, 

4304 schema=None, 

4305 postgresql_ignore_search_path=False, 

4306 **kw, 

4307 ): 

4308 data = self.get_multi_foreign_keys( 

4309 connection, 

4310 schema=schema, 

4311 filter_names=[table_name], 

4312 postgresql_ignore_search_path=postgresql_ignore_search_path, 

4313 scope=ObjectScope.ANY, 

4314 kind=ObjectKind.ANY, 

4315 **kw, 

4316 ) 

4317 return self._value_or_raise(data, table_name, schema) 

4318 

4319 @lru_cache() 

4320 def _foreing_key_query(self, schema, has_filter_names, scope, kind): 

4321 pg_class_ref = pg_catalog.pg_class.alias("cls_ref") 

4322 pg_namespace_ref = pg_catalog.pg_namespace.alias("nsp_ref") 

4323 relkinds = self._kind_to_relkinds(kind) 

4324 query = ( 

4325 select( 

4326 pg_catalog.pg_class.c.relname, 

4327 pg_catalog.pg_constraint.c.conname, 

4328 # NOTE: avoid calling pg_get_constraintdef when not needed 

4329 # to speed up the query 

4330 sql.case( 

4331 ( 

4332 pg_catalog.pg_constraint.c.oid.is_not(None), 

4333 pg_catalog.pg_get_constraintdef( 

4334 pg_catalog.pg_constraint.c.oid, True 

4335 ), 

4336 ), 

4337 else_=None, 

4338 ), 

4339 pg_namespace_ref.c.nspname, 

4340 pg_catalog.pg_description.c.description, 

4341 ) 

4342 .select_from(pg_catalog.pg_class) 

4343 .outerjoin( 

4344 pg_catalog.pg_constraint, 

4345 sql.and_( 

4346 pg_catalog.pg_class.c.oid 

4347 == pg_catalog.pg_constraint.c.conrelid, 

4348 pg_catalog.pg_constraint.c.contype == "f", 

4349 ), 

4350 ) 

4351 .outerjoin( 

4352 pg_class_ref, 

4353 pg_class_ref.c.oid == pg_catalog.pg_constraint.c.confrelid, 

4354 ) 

4355 .outerjoin( 

4356 pg_namespace_ref, 

4357 pg_class_ref.c.relnamespace == pg_namespace_ref.c.oid, 

4358 ) 

4359 .outerjoin( 

4360 pg_catalog.pg_description, 

4361 pg_catalog.pg_description.c.objoid 

4362 == pg_catalog.pg_constraint.c.oid, 

4363 ) 

4364 .order_by( 

4365 pg_catalog.pg_class.c.relname, 

4366 pg_catalog.pg_constraint.c.conname, 

4367 ) 

4368 .where(self._pg_class_relkind_condition(relkinds)) 

4369 ) 

4370 query = self._pg_class_filter_scope_schema(query, schema, scope) 

4371 if has_filter_names: 

4372 query = query.where( 

4373 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

4374 ) 

4375 return query 

4376 

4377 @util.memoized_property 

4378 def _fk_regex_pattern(self): 

4379 # optionally quoted token 

4380 qtoken = '(?:"[^"]+"|[A-Za-z0-9_]+?)' 

4381 

4382 # https://www.postgresql.org/docs/current/static/sql-createtable.html 

4383 return re.compile( 

4384 r"FOREIGN KEY \((.*?)\) " 

4385 rf"REFERENCES (?:({qtoken})\.)?({qtoken})\(((?:{qtoken}(?: *, *)?)+)\)" # noqa: E501 

4386 r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?" 

4387 r"[\s]?(ON UPDATE " 

4388 r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?" 

4389 r"[\s]?(ON DELETE " 

4390 r"(CASCADE|RESTRICT|NO ACTION|" 

4391 r"SET (?:NULL|DEFAULT)(?:\s\(.+\))?)+)?" 

4392 r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?" 

4393 r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?" 

4394 ) 

4395 

4396 def get_multi_foreign_keys( 

4397 self, 

4398 connection, 

4399 schema, 

4400 filter_names, 

4401 scope, 

4402 kind, 

4403 postgresql_ignore_search_path=False, 

4404 **kw, 

4405 ): 

4406 preparer = self.identifier_preparer 

4407 

4408 has_filter_names, params = self._prepare_filter_names(filter_names) 

4409 query = self._foreing_key_query(schema, has_filter_names, scope, kind) 

4410 result = connection.execute(query, params) 

4411 

4412 FK_REGEX = self._fk_regex_pattern 

4413 

4414 fkeys = defaultdict(list) 

4415 default = ReflectionDefaults.foreign_keys 

4416 for table_name, conname, condef, conschema, comment in result: 

4417 # ensure that each table has an entry, even if it has 

4418 # no foreign keys 

4419 if conname is None: 

4420 fkeys[(schema, table_name)] = default() 

4421 continue 

4422 table_fks = fkeys[(schema, table_name)] 

4423 m = re.search(FK_REGEX, condef).groups() 

4424 

4425 ( 

4426 constrained_columns, 

4427 referred_schema, 

4428 referred_table, 

4429 referred_columns, 

4430 _, 

4431 match, 

4432 _, 

4433 onupdate, 

4434 _, 

4435 ondelete, 

4436 deferrable, 

4437 _, 

4438 initially, 

4439 ) = m 

4440 

4441 if deferrable is not None: 

4442 deferrable = True if deferrable == "DEFERRABLE" else False 

4443 constrained_columns = [ 

4444 preparer._unquote_identifier(x) 

4445 for x in re.split(r"\s*,\s*", constrained_columns) 

4446 ] 

4447 

4448 if postgresql_ignore_search_path: 

4449 # when ignoring search path, we use the actual schema 

4450 # provided it isn't the "default" schema 

4451 if conschema != self.default_schema_name: 

4452 referred_schema = conschema 

4453 else: 

4454 referred_schema = schema 

4455 elif referred_schema: 

4456 # referred_schema is the schema that we regexp'ed from 

4457 # pg_get_constraintdef(). If the schema is in the search 

4458 # path, pg_get_constraintdef() will give us None. 

4459 referred_schema = preparer._unquote_identifier(referred_schema) 

4460 elif schema is not None and schema == conschema: 

4461 # If the actual schema matches the schema of the table 

4462 # we're reflecting, then we will use that. 

4463 referred_schema = schema 

4464 

4465 referred_table = preparer._unquote_identifier(referred_table) 

4466 referred_columns = [ 

4467 preparer._unquote_identifier(x) 

4468 for x in re.split(r"\s*,\s", referred_columns) 

4469 ] 

4470 options = { 

4471 k: v 

4472 for k, v in [ 

4473 ("onupdate", onupdate), 

4474 ("ondelete", ondelete), 

4475 ("initially", initially), 

4476 ("deferrable", deferrable), 

4477 ("match", match), 

4478 ] 

4479 if v is not None and v != "NO ACTION" 

4480 } 

4481 fkey_d = { 

4482 "name": conname, 

4483 "constrained_columns": constrained_columns, 

4484 "referred_schema": referred_schema, 

4485 "referred_table": referred_table, 

4486 "referred_columns": referred_columns, 

4487 "options": options, 

4488 "comment": comment, 

4489 } 

4490 table_fks.append(fkey_d) 

4491 return fkeys.items() 

4492 

4493 @reflection.cache 

4494 def get_indexes(self, connection, table_name, schema=None, **kw): 

4495 data = self.get_multi_indexes( 

4496 connection, 

4497 schema=schema, 

4498 filter_names=[table_name], 

4499 scope=ObjectScope.ANY, 

4500 kind=ObjectKind.ANY, 

4501 **kw, 

4502 ) 

4503 return self._value_or_raise(data, table_name, schema) 

4504 

4505 @util.memoized_property 

4506 def _index_query(self): 

4507 # NOTE: pg_index is used as from two times to improve performance, 

4508 # since extraing all the index information from `idx_sq` to avoid 

4509 # the second pg_index use leads to a worse performing query in 

4510 # particular when querying for a single table (as of pg 17) 

4511 # NOTE: repeating oids clause improve query performance 

4512 

4513 # subquery to get the columns 

4514 idx_sq = ( 

4515 select( 

4516 pg_catalog.pg_index.c.indexrelid, 

4517 pg_catalog.pg_index.c.indrelid, 

4518 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"), 

4519 sql.func.generate_subscripts( 

4520 pg_catalog.pg_index.c.indkey, 1 

4521 ).label("ord"), 

4522 ) 

4523 .where( 

4524 ~pg_catalog.pg_index.c.indisprimary, 

4525 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), 

4526 ) 

4527 .subquery("idx") 

4528 ) 

4529 

4530 attr_sq = ( 

4531 select( 

4532 idx_sq.c.indexrelid, 

4533 idx_sq.c.indrelid, 

4534 idx_sq.c.ord, 

4535 # NOTE: always using pg_get_indexdef is too slow so just 

4536 # invoke when the element is an expression 

4537 sql.case( 

4538 ( 

4539 idx_sq.c.attnum == 0, 

4540 pg_catalog.pg_get_indexdef( 

4541 idx_sq.c.indexrelid, idx_sq.c.ord + 1, True 

4542 ), 

4543 ), 

4544 # NOTE: need to cast this since attname is of type "name" 

4545 # that's limited to 63 bytes, while pg_get_indexdef 

4546 # returns "text" so its output may get cut 

4547 else_=pg_catalog.pg_attribute.c.attname.cast(TEXT), 

4548 ).label("element"), 

4549 (idx_sq.c.attnum == 0).label("is_expr"), 

4550 ) 

4551 .select_from(idx_sq) 

4552 .outerjoin( 

4553 # do not remove rows where idx_sq.c.attnum is 0 

4554 pg_catalog.pg_attribute, 

4555 sql.and_( 

4556 pg_catalog.pg_attribute.c.attnum == idx_sq.c.attnum, 

4557 pg_catalog.pg_attribute.c.attrelid == idx_sq.c.indrelid, 

4558 ), 

4559 ) 

4560 .where(idx_sq.c.indrelid.in_(bindparam("oids"))) 

4561 .subquery("idx_attr") 

4562 ) 

4563 

4564 cols_sq = ( 

4565 select( 

4566 attr_sq.c.indexrelid, 

4567 sql.func.min(attr_sq.c.indrelid), 

4568 sql.func.array_agg( 

4569 aggregate_order_by(attr_sq.c.element, attr_sq.c.ord) 

4570 ).label("elements"), 

4571 sql.func.array_agg( 

4572 aggregate_order_by(attr_sq.c.is_expr, attr_sq.c.ord) 

4573 ).label("elements_is_expr"), 

4574 ) 

4575 .group_by(attr_sq.c.indexrelid) 

4576 .subquery("idx_cols") 

4577 ) 

4578 

4579 if self.server_version_info >= (11, 0): 

4580 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts 

4581 else: 

4582 indnkeyatts = sql.null().label("indnkeyatts") 

4583 

4584 if self.server_version_info >= (15,): 

4585 nulls_not_distinct = pg_catalog.pg_index.c.indnullsnotdistinct 

4586 else: 

4587 nulls_not_distinct = sql.false().label("indnullsnotdistinct") 

4588 

4589 return ( 

4590 select( 

4591 pg_catalog.pg_index.c.indrelid, 

4592 pg_catalog.pg_class.c.relname, 

4593 pg_catalog.pg_index.c.indisunique, 

4594 pg_catalog.pg_constraint.c.conrelid.is_not(None).label( 

4595 "has_constraint" 

4596 ), 

4597 pg_catalog.pg_index.c.indoption, 

4598 pg_catalog.pg_class.c.reloptions, 

4599 pg_catalog.pg_am.c.amname, 

4600 # NOTE: pg_get_expr is very fast so this case has almost no 

4601 # performance impact 

4602 sql.case( 

4603 ( 

4604 pg_catalog.pg_index.c.indpred.is_not(None), 

4605 pg_catalog.pg_get_expr( 

4606 pg_catalog.pg_index.c.indpred, 

4607 pg_catalog.pg_index.c.indrelid, 

4608 ), 

4609 ), 

4610 else_=None, 

4611 ).label("filter_definition"), 

4612 indnkeyatts, 

4613 nulls_not_distinct, 

4614 cols_sq.c.elements, 

4615 cols_sq.c.elements_is_expr, 

4616 ) 

4617 .select_from(pg_catalog.pg_index) 

4618 .where( 

4619 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), 

4620 ~pg_catalog.pg_index.c.indisprimary, 

4621 ) 

4622 .join( 

4623 pg_catalog.pg_class, 

4624 pg_catalog.pg_index.c.indexrelid == pg_catalog.pg_class.c.oid, 

4625 ) 

4626 .join( 

4627 pg_catalog.pg_am, 

4628 pg_catalog.pg_class.c.relam == pg_catalog.pg_am.c.oid, 

4629 ) 

4630 .outerjoin( 

4631 cols_sq, 

4632 pg_catalog.pg_index.c.indexrelid == cols_sq.c.indexrelid, 

4633 ) 

4634 .outerjoin( 

4635 pg_catalog.pg_constraint, 

4636 sql.and_( 

4637 pg_catalog.pg_index.c.indrelid 

4638 == pg_catalog.pg_constraint.c.conrelid, 

4639 pg_catalog.pg_index.c.indexrelid 

4640 == pg_catalog.pg_constraint.c.conindid, 

4641 pg_catalog.pg_constraint.c.contype 

4642 == sql.any_(_array.array(("p", "u", "x"))), 

4643 ), 

4644 ) 

4645 .order_by( 

4646 pg_catalog.pg_index.c.indrelid, pg_catalog.pg_class.c.relname 

4647 ) 

4648 ) 

4649 

4650 def get_multi_indexes( 

4651 self, connection, schema, filter_names, scope, kind, **kw 

4652 ): 

4653 table_oids = self._get_table_oids( 

4654 connection, schema, filter_names, scope, kind, **kw 

4655 ) 

4656 

4657 indexes = defaultdict(list) 

4658 default = ReflectionDefaults.indexes 

4659 

4660 batches = list(table_oids) 

4661 

4662 while batches: 

4663 batch = batches[0:3000] 

4664 batches[0:3000] = [] 

4665 

4666 result = connection.execute( 

4667 self._index_query, {"oids": [r[0] for r in batch]} 

4668 ).mappings() 

4669 

4670 result_by_oid = defaultdict(list) 

4671 for row_dict in result: 

4672 result_by_oid[row_dict["indrelid"]].append(row_dict) 

4673 

4674 for oid, table_name in batch: 

4675 if oid not in result_by_oid: 

4676 # ensure that each table has an entry, even if reflection 

4677 # is skipped because not supported 

4678 indexes[(schema, table_name)] = default() 

4679 continue 

4680 

4681 for row in result_by_oid[oid]: 

4682 index_name = row["relname"] 

4683 

4684 table_indexes = indexes[(schema, table_name)] 

4685 

4686 all_elements = row["elements"] 

4687 all_elements_is_expr = row["elements_is_expr"] 

4688 indnkeyatts = row["indnkeyatts"] 

4689 # "The number of key columns in the index, not counting any 

4690 # included columns, which are merely stored and do not 

4691 # participate in the index semantics" 

4692 if ( 

4693 indnkeyatts is not None 

4694 and len(all_elements) > indnkeyatts 

4695 ): 

4696 # this is a "covering index" which has INCLUDE columns 

4697 # as well as regular index columns 

4698 inc_cols = all_elements[indnkeyatts:] 

4699 idx_elements = all_elements[:indnkeyatts] 

4700 idx_elements_is_expr = all_elements_is_expr[ 

4701 :indnkeyatts 

4702 ] 

4703 # postgresql does not support expression on included 

4704 # columns as of v14: "ERROR: expressions are not 

4705 # supported in included columns". 

4706 assert all( 

4707 not is_expr 

4708 for is_expr in all_elements_is_expr[indnkeyatts:] 

4709 ) 

4710 else: 

4711 idx_elements = all_elements 

4712 idx_elements_is_expr = all_elements_is_expr 

4713 inc_cols = [] 

4714 

4715 index = {"name": index_name, "unique": row["indisunique"]} 

4716 if any(idx_elements_is_expr): 

4717 index["column_names"] = [ 

4718 None if is_expr else expr 

4719 for expr, is_expr in zip( 

4720 idx_elements, idx_elements_is_expr 

4721 ) 

4722 ] 

4723 index["expressions"] = idx_elements 

4724 else: 

4725 index["column_names"] = idx_elements 

4726 

4727 sorting = {} 

4728 for col_index, col_flags in enumerate(row["indoption"]): 

4729 col_sorting = () 

4730 # try to set flags only if they differ from PG 

4731 # defaults... 

4732 if col_flags & 0x01: 

4733 col_sorting += ("desc",) 

4734 if not (col_flags & 0x02): 

4735 col_sorting += ("nulls_last",) 

4736 else: 

4737 if col_flags & 0x02: 

4738 col_sorting += ("nulls_first",) 

4739 if col_sorting: 

4740 sorting[idx_elements[col_index]] = col_sorting 

4741 if sorting: 

4742 index["column_sorting"] = sorting 

4743 if row["has_constraint"]: 

4744 index["duplicates_constraint"] = index_name 

4745 

4746 dialect_options = {} 

4747 if row["reloptions"]: 

4748 dialect_options["postgresql_with"] = dict( 

4749 [ 

4750 option.split("=", 1) 

4751 for option in row["reloptions"] 

4752 ] 

4753 ) 

4754 # it *might* be nice to include that this is 'btree' in the 

4755 # reflection info. But we don't want an Index object 

4756 # to have a ``postgresql_using`` in it that is just the 

4757 # default, so for the moment leaving this out. 

4758 amname = row["amname"] 

4759 if amname != "btree": 

4760 dialect_options["postgresql_using"] = row["amname"] 

4761 if row["filter_definition"]: 

4762 dialect_options["postgresql_where"] = row[ 

4763 "filter_definition" 

4764 ] 

4765 if self.server_version_info >= (11,): 

4766 # NOTE: this is legacy, this is part of 

4767 # dialect_options now as of #7382 

4768 index["include_columns"] = inc_cols 

4769 dialect_options["postgresql_include"] = inc_cols 

4770 if row["indnullsnotdistinct"]: 

4771 # the default is False, so ignore it. 

4772 dialect_options["postgresql_nulls_not_distinct"] = row[ 

4773 "indnullsnotdistinct" 

4774 ] 

4775 

4776 if dialect_options: 

4777 index["dialect_options"] = dialect_options 

4778 

4779 table_indexes.append(index) 

4780 return indexes.items() 

4781 

4782 @reflection.cache 

4783 def get_unique_constraints( 

4784 self, connection, table_name, schema=None, **kw 

4785 ): 

4786 data = self.get_multi_unique_constraints( 

4787 connection, 

4788 schema=schema, 

4789 filter_names=[table_name], 

4790 scope=ObjectScope.ANY, 

4791 kind=ObjectKind.ANY, 

4792 **kw, 

4793 ) 

4794 return self._value_or_raise(data, table_name, schema) 

4795 

4796 def get_multi_unique_constraints( 

4797 self, 

4798 connection, 

4799 schema, 

4800 filter_names, 

4801 scope, 

4802 kind, 

4803 **kw, 

4804 ): 

4805 result = self._reflect_constraint( 

4806 connection, "u", schema, filter_names, scope, kind, **kw 

4807 ) 

4808 

4809 # each table can have multiple unique constraints 

4810 uniques = defaultdict(list) 

4811 default = ReflectionDefaults.unique_constraints 

4812 for table_name, cols, con_name, comment, options in result: 

4813 # ensure a list is created for each table. leave it empty if 

4814 # the table has no unique cosntraint 

4815 if con_name is None: 

4816 uniques[(schema, table_name)] = default() 

4817 continue 

4818 

4819 uc_dict = { 

4820 "column_names": cols, 

4821 "name": con_name, 

4822 "comment": comment, 

4823 } 

4824 if options: 

4825 uc_dict["dialect_options"] = options 

4826 

4827 uniques[(schema, table_name)].append(uc_dict) 

4828 return uniques.items() 

4829 

4830 @reflection.cache 

4831 def get_table_comment(self, connection, table_name, schema=None, **kw): 

4832 data = self.get_multi_table_comment( 

4833 connection, 

4834 schema, 

4835 [table_name], 

4836 scope=ObjectScope.ANY, 

4837 kind=ObjectKind.ANY, 

4838 **kw, 

4839 ) 

4840 return self._value_or_raise(data, table_name, schema) 

4841 

4842 @lru_cache() 

4843 def _comment_query(self, schema, has_filter_names, scope, kind): 

4844 relkinds = self._kind_to_relkinds(kind) 

4845 query = ( 

4846 select( 

4847 pg_catalog.pg_class.c.relname, 

4848 pg_catalog.pg_description.c.description, 

4849 ) 

4850 .select_from(pg_catalog.pg_class) 

4851 .outerjoin( 

4852 pg_catalog.pg_description, 

4853 sql.and_( 

4854 pg_catalog.pg_class.c.oid 

4855 == pg_catalog.pg_description.c.objoid, 

4856 pg_catalog.pg_description.c.objsubid == 0, 

4857 pg_catalog.pg_description.c.classoid 

4858 == sql.func.cast("pg_catalog.pg_class", REGCLASS), 

4859 ), 

4860 ) 

4861 .where(self._pg_class_relkind_condition(relkinds)) 

4862 ) 

4863 query = self._pg_class_filter_scope_schema(query, schema, scope) 

4864 if has_filter_names: 

4865 query = query.where( 

4866 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

4867 ) 

4868 return query 

4869 

4870 def get_multi_table_comment( 

4871 self, connection, schema, filter_names, scope, kind, **kw 

4872 ): 

4873 has_filter_names, params = self._prepare_filter_names(filter_names) 

4874 query = self._comment_query(schema, has_filter_names, scope, kind) 

4875 result = connection.execute(query, params) 

4876 

4877 default = ReflectionDefaults.table_comment 

4878 return ( 

4879 ( 

4880 (schema, table), 

4881 {"text": comment} if comment is not None else default(), 

4882 ) 

4883 for table, comment in result 

4884 ) 

4885 

4886 @reflection.cache 

4887 def get_check_constraints(self, connection, table_name, schema=None, **kw): 

4888 data = self.get_multi_check_constraints( 

4889 connection, 

4890 schema, 

4891 [table_name], 

4892 scope=ObjectScope.ANY, 

4893 kind=ObjectKind.ANY, 

4894 **kw, 

4895 ) 

4896 return self._value_or_raise(data, table_name, schema) 

4897 

4898 @lru_cache() 

4899 def _check_constraint_query(self, schema, has_filter_names, scope, kind): 

4900 relkinds = self._kind_to_relkinds(kind) 

4901 query = ( 

4902 select( 

4903 pg_catalog.pg_class.c.relname, 

4904 pg_catalog.pg_constraint.c.conname, 

4905 # NOTE: avoid calling pg_get_constraintdef when not needed 

4906 # to speed up the query 

4907 sql.case( 

4908 ( 

4909 pg_catalog.pg_constraint.c.oid.is_not(None), 

4910 pg_catalog.pg_get_constraintdef( 

4911 pg_catalog.pg_constraint.c.oid, True 

4912 ), 

4913 ), 

4914 else_=None, 

4915 ), 

4916 pg_catalog.pg_description.c.description, 

4917 ) 

4918 .select_from(pg_catalog.pg_class) 

4919 .outerjoin( 

4920 pg_catalog.pg_constraint, 

4921 sql.and_( 

4922 pg_catalog.pg_class.c.oid 

4923 == pg_catalog.pg_constraint.c.conrelid, 

4924 pg_catalog.pg_constraint.c.contype == "c", 

4925 ), 

4926 ) 

4927 .outerjoin( 

4928 pg_catalog.pg_description, 

4929 pg_catalog.pg_description.c.objoid 

4930 == pg_catalog.pg_constraint.c.oid, 

4931 ) 

4932 .order_by( 

4933 pg_catalog.pg_class.c.relname, 

4934 pg_catalog.pg_constraint.c.conname, 

4935 ) 

4936 .where(self._pg_class_relkind_condition(relkinds)) 

4937 ) 

4938 query = self._pg_class_filter_scope_schema(query, schema, scope) 

4939 if has_filter_names: 

4940 query = query.where( 

4941 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

4942 ) 

4943 return query 

4944 

4945 def get_multi_check_constraints( 

4946 self, connection, schema, filter_names, scope, kind, **kw 

4947 ): 

4948 has_filter_names, params = self._prepare_filter_names(filter_names) 

4949 query = self._check_constraint_query( 

4950 schema, has_filter_names, scope, kind 

4951 ) 

4952 result = connection.execute(query, params) 

4953 

4954 check_constraints = defaultdict(list) 

4955 default = ReflectionDefaults.check_constraints 

4956 for table_name, check_name, src, comment in result: 

4957 # only two cases for check_name and src: both null or both defined 

4958 if check_name is None and src is None: 

4959 check_constraints[(schema, table_name)] = default() 

4960 continue 

4961 # samples: 

4962 # "CHECK (((a > 1) AND (a < 5)))" 

4963 # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))" 

4964 # "CHECK (((a > 1) AND (a < 5))) NOT VALID" 

4965 # "CHECK (some_boolean_function(a))" 

4966 # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)" 

4967 # "CHECK (a NOT NULL) NO INHERIT" 

4968 # "CHECK (a NOT NULL) NO INHERIT NOT VALID" 

4969 

4970 m = re.match( 

4971 r"^CHECK *\((.+)\)( NO INHERIT)?( NOT VALID)?$", 

4972 src, 

4973 flags=re.DOTALL, 

4974 ) 

4975 if not m: 

4976 util.warn("Could not parse CHECK constraint text: %r" % src) 

4977 sqltext = "" 

4978 else: 

4979 sqltext = re.compile( 

4980 r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL 

4981 ).sub(r"\1", m.group(1)) 

4982 entry = { 

4983 "name": check_name, 

4984 "sqltext": sqltext, 

4985 "comment": comment, 

4986 } 

4987 if m: 

4988 do = {} 

4989 if " NOT VALID" in m.groups(): 

4990 do["not_valid"] = True 

4991 if " NO INHERIT" in m.groups(): 

4992 do["no_inherit"] = True 

4993 if do: 

4994 entry["dialect_options"] = do 

4995 

4996 check_constraints[(schema, table_name)].append(entry) 

4997 return check_constraints.items() 

4998 

4999 def _pg_type_filter_schema(self, query, schema): 

5000 if schema is None: 

5001 query = query.where( 

5002 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), 

5003 # ignore pg_catalog schema 

5004 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

5005 ) 

5006 elif schema != "*": 

5007 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

5008 return query 

5009 

5010 @lru_cache() 

5011 def _enum_query(self, schema): 

5012 lbl_agg_sq = ( 

5013 select( 

5014 pg_catalog.pg_enum.c.enumtypid, 

5015 sql.func.array_agg( 

5016 aggregate_order_by( 

5017 # NOTE: cast since some postgresql derivatives may 

5018 # not support array_agg on the name type 

5019 pg_catalog.pg_enum.c.enumlabel.cast(TEXT), 

5020 pg_catalog.pg_enum.c.enumsortorder, 

5021 ) 

5022 ).label("labels"), 

5023 ) 

5024 .group_by(pg_catalog.pg_enum.c.enumtypid) 

5025 .subquery("lbl_agg") 

5026 ) 

5027 

5028 query = ( 

5029 select( 

5030 pg_catalog.pg_type.c.typname.label("name"), 

5031 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( 

5032 "visible" 

5033 ), 

5034 pg_catalog.pg_namespace.c.nspname.label("schema"), 

5035 lbl_agg_sq.c.labels.label("labels"), 

5036 ) 

5037 .join( 

5038 pg_catalog.pg_namespace, 

5039 pg_catalog.pg_namespace.c.oid 

5040 == pg_catalog.pg_type.c.typnamespace, 

5041 ) 

5042 .outerjoin( 

5043 lbl_agg_sq, pg_catalog.pg_type.c.oid == lbl_agg_sq.c.enumtypid 

5044 ) 

5045 .where(pg_catalog.pg_type.c.typtype == "e") 

5046 .order_by( 

5047 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname 

5048 ) 

5049 ) 

5050 

5051 return self._pg_type_filter_schema(query, schema) 

5052 

5053 @reflection.cache 

5054 def _load_enums(self, connection, schema=None, **kw): 

5055 if not self.supports_native_enum: 

5056 return [] 

5057 

5058 result = connection.execute(self._enum_query(schema)) 

5059 

5060 enums = [] 

5061 for name, visible, schema, labels in result: 

5062 enums.append( 

5063 { 

5064 "name": name, 

5065 "schema": schema, 

5066 "visible": visible, 

5067 "labels": [] if labels is None else labels, 

5068 } 

5069 ) 

5070 return enums 

5071 

5072 @lru_cache() 

5073 def _domain_query(self, schema): 

5074 con_sq = ( 

5075 select( 

5076 pg_catalog.pg_constraint.c.contypid, 

5077 sql.func.array_agg( 

5078 pg_catalog.pg_get_constraintdef( 

5079 pg_catalog.pg_constraint.c.oid, True 

5080 ) 

5081 ).label("condefs"), 

5082 sql.func.array_agg( 

5083 # NOTE: cast since some postgresql derivatives may 

5084 # not support array_agg on the name type 

5085 pg_catalog.pg_constraint.c.conname.cast(TEXT) 

5086 ).label("connames"), 

5087 ) 

5088 # The domain this constraint is on; zero if not a domain constraint 

5089 .where(pg_catalog.pg_constraint.c.contypid != 0) 

5090 .group_by(pg_catalog.pg_constraint.c.contypid) 

5091 .subquery("domain_constraints") 

5092 ) 

5093 

5094 query = ( 

5095 select( 

5096 pg_catalog.pg_type.c.typname.label("name"), 

5097 pg_catalog.format_type( 

5098 pg_catalog.pg_type.c.typbasetype, 

5099 pg_catalog.pg_type.c.typtypmod, 

5100 ).label("attype"), 

5101 (~pg_catalog.pg_type.c.typnotnull).label("nullable"), 

5102 pg_catalog.pg_type.c.typdefault.label("default"), 

5103 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( 

5104 "visible" 

5105 ), 

5106 pg_catalog.pg_namespace.c.nspname.label("schema"), 

5107 con_sq.c.condefs, 

5108 con_sq.c.connames, 

5109 pg_catalog.pg_collation.c.collname, 

5110 ) 

5111 .join( 

5112 pg_catalog.pg_namespace, 

5113 pg_catalog.pg_namespace.c.oid 

5114 == pg_catalog.pg_type.c.typnamespace, 

5115 ) 

5116 .outerjoin( 

5117 pg_catalog.pg_collation, 

5118 pg_catalog.pg_type.c.typcollation 

5119 == pg_catalog.pg_collation.c.oid, 

5120 ) 

5121 .outerjoin( 

5122 con_sq, 

5123 pg_catalog.pg_type.c.oid == con_sq.c.contypid, 

5124 ) 

5125 .where(pg_catalog.pg_type.c.typtype == "d") 

5126 .order_by( 

5127 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname 

5128 ) 

5129 ) 

5130 return self._pg_type_filter_schema(query, schema) 

5131 

5132 @reflection.cache 

5133 def _load_domains(self, connection, schema=None, **kw): 

5134 result = connection.execute(self._domain_query(schema)) 

5135 

5136 domains: List[ReflectedDomain] = [] 

5137 for domain in result.mappings(): 

5138 # strip (30) from character varying(30) 

5139 attype = re.search(r"([^\(]+)", domain["attype"]).group(1) 

5140 constraints: List[ReflectedDomainConstraint] = [] 

5141 if domain["connames"]: 

5142 # When a domain has multiple CHECK constraints, they will 

5143 # be tested in alphabetical order by name. 

5144 sorted_constraints = sorted( 

5145 zip(domain["connames"], domain["condefs"]), 

5146 key=lambda t: t[0], 

5147 ) 

5148 for name, def_ in sorted_constraints: 

5149 # constraint is in the form "CHECK (expression)" 

5150 # or "NOT NULL". Ignore the "NOT NULL" and 

5151 # remove "CHECK (" and the tailing ")". 

5152 if def_.casefold().startswith("check"): 

5153 check = def_[7:-1] 

5154 constraints.append({"name": name, "check": check}) 

5155 domain_rec: ReflectedDomain = { 

5156 "name": domain["name"], 

5157 "schema": domain["schema"], 

5158 "visible": domain["visible"], 

5159 "type": attype, 

5160 "nullable": domain["nullable"], 

5161 "default": domain["default"], 

5162 "constraints": constraints, 

5163 "collation": domain["collname"], 

5164 } 

5165 domains.append(domain_rec) 

5166 

5167 return domains 

5168 

5169 def _set_backslash_escapes(self, connection): 

5170 # this method is provided as an override hook for descendant 

5171 # dialects (e.g. Redshift), so removing it may break them 

5172 std_string = connection.exec_driver_sql( 

5173 "show standard_conforming_strings" 

5174 ).scalar() 

5175 self._backslash_escapes = std_string == "off"