Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/base.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1427 statements  

1# dialects/postgresql/base.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9r""" 

10.. dialect:: postgresql 

11 :name: PostgreSQL 

12 :normal_support: 9.6+ 

13 :best_effort: 9+ 

14 

15.. _postgresql_sequences: 

16 

17Sequences/SERIAL/IDENTITY 

18------------------------- 

19 

20PostgreSQL supports sequences, and SQLAlchemy uses these as the default means 

21of creating new primary key values for integer-based primary key columns. When 

22creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for 

23integer-based primary key columns, which generates a sequence and server side 

24default corresponding to the column. 

25 

26To specify a specific named sequence to be used for primary key generation, 

27use the :func:`~sqlalchemy.schema.Sequence` construct:: 

28 

29 Table( 

30 "sometable", 

31 metadata, 

32 Column( 

33 "id", Integer, Sequence("some_id_seq", start=1), primary_key=True 

34 ), 

35 ) 

36 

37When SQLAlchemy issues a single INSERT statement, to fulfill the contract of 

38having the "last insert identifier" available, a RETURNING clause is added to 

39the INSERT statement which specifies the primary key columns should be 

40returned after the statement completes. The RETURNING functionality only takes 

41place if PostgreSQL 8.2 or later is in use. As a fallback approach, the 

42sequence, whether specified explicitly or implicitly via ``SERIAL``, is 

43executed independently beforehand, the returned value to be used in the 

44subsequent insert. Note that when an 

45:func:`~sqlalchemy.sql.expression.insert()` construct is executed using 

46"executemany" semantics, the "last inserted identifier" functionality does not 

47apply; no RETURNING clause is emitted nor is the sequence pre-executed in this 

48case. 

49 

50 

51PostgreSQL 10 and above IDENTITY columns 

52^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

53 

54PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use 

55of SERIAL. The :class:`_schema.Identity` construct in a 

56:class:`_schema.Column` can be used to control its behavior:: 

57 

58 from sqlalchemy import Table, Column, MetaData, Integer, Computed 

59 

60 metadata = MetaData() 

61 

62 data = Table( 

63 "data", 

64 metadata, 

65 Column( 

66 "id", Integer, Identity(start=42, cycle=True), primary_key=True 

67 ), 

68 Column("data", String), 

69 ) 

70 

71The CREATE TABLE for the above :class:`_schema.Table` object would be: 

72 

73.. sourcecode:: sql 

74 

75 CREATE TABLE data ( 

76 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE), 

77 data VARCHAR, 

78 PRIMARY KEY (id) 

79 ) 

80 

81.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct 

82 in a :class:`_schema.Column` to specify the option of an autoincrementing 

83 column. 

84 

85.. note:: 

86 

87 Previous versions of SQLAlchemy did not have built-in support for rendering 

88 of IDENTITY, and could use the following compilation hook to replace 

89 occurrences of SERIAL with IDENTITY:: 

90 

91 from sqlalchemy.schema import CreateColumn 

92 from sqlalchemy.ext.compiler import compiles 

93 

94 

95 @compiles(CreateColumn, "postgresql") 

96 def use_identity(element, compiler, **kw): 

97 text = compiler.visit_create_column(element, **kw) 

98 text = text.replace("SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY") 

99 return text 

100 

101 Using the above, a table such as:: 

102 

103 t = Table( 

104 "t", m, Column("id", Integer, primary_key=True), Column("data", String) 

105 ) 

106 

107 Will generate on the backing database as: 

108 

109 .. sourcecode:: sql 

110 

111 CREATE TABLE t ( 

112 id INT GENERATED BY DEFAULT AS IDENTITY, 

113 data VARCHAR, 

114 PRIMARY KEY (id) 

115 ) 

116 

117.. _postgresql_ss_cursors: 

118 

119Server Side Cursors 

120------------------- 

121 

122Server-side cursor support is available for the psycopg2, asyncpg 

123dialects and may also be available in others. 

124 

125Server side cursors are enabled on a per-statement basis by using the 

126:paramref:`.Connection.execution_options.stream_results` connection execution 

127option:: 

128 

129 with engine.connect() as conn: 

130 result = conn.execution_options(stream_results=True).execute( 

131 text("select * from table") 

132 ) 

133 

134Note that some kinds of SQL statements may not be supported with 

135server side cursors; generally, only SQL statements that return rows should be 

136used with this option. 

137 

138.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated 

139 and will be removed in a future release. Please use the 

140 :paramref:`_engine.Connection.stream_results` execution option for 

141 unbuffered cursor support. 

142 

143.. seealso:: 

144 

145 :ref:`engine_stream_results` 

146 

147.. _postgresql_isolation_level: 

148 

149Transaction Isolation Level 

150--------------------------- 

151 

152Most SQLAlchemy dialects support setting of transaction isolation level 

153using the :paramref:`_sa.create_engine.isolation_level` parameter 

154at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection` 

155level via the :paramref:`.Connection.execution_options.isolation_level` 

156parameter. 

157 

158For PostgreSQL dialects, this feature works either by making use of the 

159DBAPI-specific features, such as psycopg2's isolation level flags which will 

160embed the isolation level setting inline with the ``"BEGIN"`` statement, or for 

161DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS 

162TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement 

163emitted by the DBAPI. For the special AUTOCOMMIT isolation level, 

164DBAPI-specific techniques are used which is typically an ``.autocommit`` 

165flag on the DBAPI connection object. 

166 

167To set isolation level using :func:`_sa.create_engine`:: 

168 

169 engine = create_engine( 

170 "postgresql+pg8000://scott:tiger@localhost/test", 

171 isolation_level="REPEATABLE READ", 

172 ) 

173 

174To set using per-connection execution options:: 

175 

176 with engine.connect() as conn: 

177 conn = conn.execution_options(isolation_level="REPEATABLE READ") 

178 with conn.begin(): 

179 ... # work with transaction 

180 

181There are also more options for isolation level configurations, such as 

182"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply 

183different isolation level settings. See the discussion at 

184:ref:`dbapi_autocommit` for background. 

185 

186Valid values for ``isolation_level`` on most PostgreSQL dialects include: 

187 

188* ``READ COMMITTED`` 

189* ``READ UNCOMMITTED`` 

190* ``REPEATABLE READ`` 

191* ``SERIALIZABLE`` 

192* ``AUTOCOMMIT`` 

193 

194.. seealso:: 

195 

196 :ref:`dbapi_autocommit` 

197 

198 :ref:`postgresql_readonly_deferrable` 

199 

200 :ref:`psycopg2_isolation_level` 

201 

202 :ref:`pg8000_isolation_level` 

203 

204.. _postgresql_readonly_deferrable: 

205 

206Setting READ ONLY / DEFERRABLE 

207------------------------------ 

208 

209Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE" 

210characteristics of the transaction, which is in addition to the isolation level 

211setting. These two attributes can be established either in conjunction with or 

212independently of the isolation level by passing the ``postgresql_readonly`` and 

213``postgresql_deferrable`` flags with 

214:meth:`_engine.Connection.execution_options`. The example below illustrates 

215passing the ``"SERIALIZABLE"`` isolation level at the same time as setting 

216"READ ONLY" and "DEFERRABLE":: 

217 

218 with engine.connect() as conn: 

219 conn = conn.execution_options( 

220 isolation_level="SERIALIZABLE", 

221 postgresql_readonly=True, 

222 postgresql_deferrable=True, 

223 ) 

224 with conn.begin(): 

225 ... # work with transaction 

226 

227Note that some DBAPIs such as asyncpg only support "readonly" with 

228SERIALIZABLE isolation. 

229 

230.. versionadded:: 1.4 added support for the ``postgresql_readonly`` 

231 and ``postgresql_deferrable`` execution options. 

232 

233.. _postgresql_reset_on_return: 

234 

235Temporary Table / Resource Reset for Connection Pooling 

236------------------------------------------------------- 

237 

238The :class:`.QueuePool` connection pool implementation used 

239by the SQLAlchemy :class:`.Engine` object includes 

240:ref:`reset on return <pool_reset_on_return>` behavior that will invoke 

241the DBAPI ``.rollback()`` method when connections are returned to the pool. 

242While this rollback will clear out the immediate state used by the previous 

243transaction, it does not cover a wider range of session-level state, including 

244temporary tables as well as other server state such as prepared statement 

245handles and statement caches. The PostgreSQL database includes a variety 

246of commands which may be used to reset this state, including 

247``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``. 

248 

249 

250To install 

251one or more of these commands as the means of performing reset-on-return, 

252the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated 

253in the example below. The implementation 

254will end transactions in progress as well as discard temporary tables 

255using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL 

256documentation for background on what each of these statements do. 

257 

258The :paramref:`_sa.create_engine.pool_reset_on_return` parameter 

259is set to ``None`` so that the custom scheme can replace the default behavior 

260completely. The custom hook implementation calls ``.rollback()`` in any case, 

261as it's usually important that the DBAPI's own tracking of commit/rollback 

262will remain consistent with the state of the transaction:: 

263 

264 

265 from sqlalchemy import create_engine 

266 from sqlalchemy import event 

267 

268 postgresql_engine = create_engine( 

269 "postgresql+psycopg2://scott:tiger@hostname/dbname", 

270 # disable default reset-on-return scheme 

271 pool_reset_on_return=None, 

272 ) 

273 

274 

275 @event.listens_for(postgresql_engine, "reset") 

276 def _reset_postgresql(dbapi_connection, connection_record, reset_state): 

277 if not reset_state.terminate_only: 

278 dbapi_connection.execute("CLOSE ALL") 

279 dbapi_connection.execute("RESET ALL") 

280 dbapi_connection.execute("DISCARD TEMP") 

281 

282 # so that the DBAPI itself knows that the connection has been 

283 # reset 

284 dbapi_connection.rollback() 

285 

286.. versionchanged:: 2.0.0b3 Added additional state arguments to 

287 the :meth:`.PoolEvents.reset` event and additionally ensured the event 

288 is invoked for all "reset" occurrences, so that it's appropriate 

289 as a place for custom "reset" handlers. Previous schemes which 

290 use the :meth:`.PoolEvents.checkin` handler remain usable as well. 

291 

292.. seealso:: 

293 

294 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation 

295 

296.. _postgresql_alternate_search_path: 

297 

298Setting Alternate Search Paths on Connect 

299------------------------------------------ 

300 

301The PostgreSQL ``search_path`` variable refers to the list of schema names 

302that will be implicitly referenced when a particular table or other 

303object is referenced in a SQL statement. As detailed in the next section 

304:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around 

305the concept of keeping this variable at its default value of ``public``, 

306however, in order to have it set to any arbitrary name or names when connections 

307are used automatically, the "SET SESSION search_path" command may be invoked 

308for all connections in a pool using the following event handler, as discussed 

309at :ref:`schema_set_default_connections`:: 

310 

311 from sqlalchemy import event 

312 from sqlalchemy import create_engine 

313 

314 engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname") 

315 

316 

317 @event.listens_for(engine, "connect", insert=True) 

318 def set_search_path(dbapi_connection, connection_record): 

319 existing_autocommit = dbapi_connection.autocommit 

320 dbapi_connection.autocommit = True 

321 cursor = dbapi_connection.cursor() 

322 cursor.execute("SET SESSION search_path='%s'" % schema_name) 

323 cursor.close() 

324 dbapi_connection.autocommit = existing_autocommit 

325 

326The reason the recipe is complicated by use of the ``.autocommit`` DBAPI 

327attribute is so that when the ``SET SESSION search_path`` directive is invoked, 

328it is invoked outside of the scope of any transaction and therefore will not 

329be reverted when the DBAPI connection has a rollback. 

330 

331.. seealso:: 

332 

333 :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation 

334 

335.. _postgresql_schema_reflection: 

336 

337Remote-Schema Table Introspection and PostgreSQL search_path 

338------------------------------------------------------------ 

339 

340.. admonition:: Section Best Practices Summarized 

341 

342 keep the ``search_path`` variable set to its default of ``public``, without 

343 any other schema names. Ensure the username used to connect **does not** 

344 match remote schemas, or ensure the ``"$user"`` token is **removed** from 

345 ``search_path``. For other schema names, name these explicitly 

346 within :class:`_schema.Table` definitions. Alternatively, the 

347 ``postgresql_ignore_search_path`` option will cause all reflected 

348 :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema` 

349 attribute set up. 

350 

351The PostgreSQL dialect can reflect tables from any schema, as outlined in 

352:ref:`metadata_reflection_schemas`. 

353 

354In all cases, the first thing SQLAlchemy does when reflecting tables is 

355to **determine the default schema for the current database connection**. 

356It does this using the PostgreSQL ``current_schema()`` 

357function, illustated below using a PostgreSQL client session (i.e. using 

358the ``psql`` tool): 

359 

360.. sourcecode:: sql 

361 

362 test=> select current_schema(); 

363 current_schema 

364 ---------------- 

365 public 

366 (1 row) 

367 

368Above we see that on a plain install of PostgreSQL, the default schema name 

369is the name ``public``. 

370 

371However, if your database username **matches the name of a schema**, PostgreSQL's 

372default is to then **use that name as the default schema**. Below, we log in 

373using the username ``scott``. When we create a schema named ``scott``, **it 

374implicitly changes the default schema**: 

375 

376.. sourcecode:: sql 

377 

378 test=> select current_schema(); 

379 current_schema 

380 ---------------- 

381 public 

382 (1 row) 

383 

384 test=> create schema scott; 

385 CREATE SCHEMA 

386 test=> select current_schema(); 

387 current_schema 

388 ---------------- 

389 scott 

390 (1 row) 

391 

392The behavior of ``current_schema()`` is derived from the 

393`PostgreSQL search path 

394<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_ 

395variable ``search_path``, which in modern PostgreSQL versions defaults to this: 

396 

397.. sourcecode:: sql 

398 

399 test=> show search_path; 

400 search_path 

401 ----------------- 

402 "$user", public 

403 (1 row) 

404 

405Where above, the ``"$user"`` variable will inject the current username as the 

406default schema, if one exists. Otherwise, ``public`` is used. 

407 

408When a :class:`_schema.Table` object is reflected, if it is present in the 

409schema indicated by the ``current_schema()`` function, **the schema name assigned 

410to the ".schema" attribute of the Table is the Python "None" value**. Otherwise, the 

411".schema" attribute will be assigned the string name of that schema. 

412 

413With regards to tables which these :class:`_schema.Table` 

414objects refer to via foreign key constraint, a decision must be made as to how 

415the ``.schema`` is represented in those remote tables, in the case where that 

416remote schema name is also a member of the current ``search_path``. 

417 

418By default, the PostgreSQL dialect mimics the behavior encouraged by 

419PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function 

420returns a sample definition for a particular foreign key constraint, 

421omitting the referenced schema name from that definition when the name is 

422also in the PostgreSQL schema search path. The interaction below 

423illustrates this behavior: 

424 

425.. sourcecode:: sql 

426 

427 test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY); 

428 CREATE TABLE 

429 test=> CREATE TABLE referring( 

430 test(> id INTEGER PRIMARY KEY, 

431 test(> referred_id INTEGER REFERENCES test_schema.referred(id)); 

432 CREATE TABLE 

433 test=> SET search_path TO public, test_schema; 

434 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM 

435 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n 

436 test-> ON n.oid = c.relnamespace 

437 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid 

438 test-> WHERE c.relname='referring' AND r.contype = 'f' 

439 test-> ; 

440 pg_get_constraintdef 

441 --------------------------------------------------- 

442 FOREIGN KEY (referred_id) REFERENCES referred(id) 

443 (1 row) 

444 

445Above, we created a table ``referred`` as a member of the remote schema 

446``test_schema``, however when we added ``test_schema`` to the 

447PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the 

448``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of 

449the function. 

450 

451On the other hand, if we set the search path back to the typical default 

452of ``public``: 

453 

454.. sourcecode:: sql 

455 

456 test=> SET search_path TO public; 

457 SET 

458 

459The same query against ``pg_get_constraintdef()`` now returns the fully 

460schema-qualified name for us: 

461 

462.. sourcecode:: sql 

463 

464 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM 

465 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n 

466 test-> ON n.oid = c.relnamespace 

467 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid 

468 test-> WHERE c.relname='referring' AND r.contype = 'f'; 

469 pg_get_constraintdef 

470 --------------------------------------------------------------- 

471 FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id) 

472 (1 row) 

473 

474SQLAlchemy will by default use the return value of ``pg_get_constraintdef()`` 

475in order to determine the remote schema name. That is, if our ``search_path`` 

476were set to include ``test_schema``, and we invoked a table 

477reflection process as follows:: 

478 

479 >>> from sqlalchemy import Table, MetaData, create_engine, text 

480 >>> engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test") 

481 >>> with engine.connect() as conn: 

482 ... conn.execute(text("SET search_path TO test_schema, public")) 

483 ... metadata_obj = MetaData() 

484 ... referring = Table("referring", metadata_obj, autoload_with=conn) 

485 <sqlalchemy.engine.result.CursorResult object at 0x101612ed0> 

486 

487The above process would deliver to the :attr:`_schema.MetaData.tables` 

488collection 

489``referred`` table named **without** the schema:: 

490 

491 >>> metadata_obj.tables["referred"].schema is None 

492 True 

493 

494To alter the behavior of reflection such that the referred schema is 

495maintained regardless of the ``search_path`` setting, use the 

496``postgresql_ignore_search_path`` option, which can be specified as a 

497dialect-specific argument to both :class:`_schema.Table` as well as 

498:meth:`_schema.MetaData.reflect`:: 

499 

500 >>> with engine.connect() as conn: 

501 ... conn.execute(text("SET search_path TO test_schema, public")) 

502 ... metadata_obj = MetaData() 

503 ... referring = Table( 

504 ... "referring", 

505 ... metadata_obj, 

506 ... autoload_with=conn, 

507 ... postgresql_ignore_search_path=True, 

508 ... ) 

509 <sqlalchemy.engine.result.CursorResult object at 0x1016126d0> 

510 

511We will now have ``test_schema.referred`` stored as schema-qualified:: 

512 

513 >>> metadata_obj.tables["test_schema.referred"].schema 

514 'test_schema' 

515 

516.. sidebar:: Best Practices for PostgreSQL Schema reflection 

517 

518 The description of PostgreSQL schema reflection behavior is complex, and 

519 is the product of many years of dealing with widely varied use cases and 

520 user preferences. But in fact, there's no need to understand any of it if 

521 you just stick to the simplest use pattern: leave the ``search_path`` set 

522 to its default of ``public`` only, never refer to the name ``public`` as 

523 an explicit schema name otherwise, and refer to all other schema names 

524 explicitly when building up a :class:`_schema.Table` object. The options 

525 described here are only for those users who can't, or prefer not to, stay 

526 within these guidelines. 

527 

528.. seealso:: 

529 

530 :ref:`reflection_schema_qualified_interaction` - discussion of the issue 

531 from a backend-agnostic perspective 

532 

533 `The Schema Search Path 

534 <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_ 

535 - on the PostgreSQL website. 

536 

537INSERT/UPDATE...RETURNING 

538------------------------- 

539 

540The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and 

541``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default 

542for single-row INSERT statements in order to fetch newly generated 

543primary key identifiers. To specify an explicit ``RETURNING`` clause, 

544use the :meth:`._UpdateBase.returning` method on a per-statement basis:: 

545 

546 # INSERT..RETURNING 

547 result = ( 

548 table.insert().returning(table.c.col1, table.c.col2).values(name="foo") 

549 ) 

550 print(result.fetchall()) 

551 

552 # UPDATE..RETURNING 

553 result = ( 

554 table.update() 

555 .returning(table.c.col1, table.c.col2) 

556 .where(table.c.name == "foo") 

557 .values(name="bar") 

558 ) 

559 print(result.fetchall()) 

560 

561 # DELETE..RETURNING 

562 result = ( 

563 table.delete() 

564 .returning(table.c.col1, table.c.col2) 

565 .where(table.c.name == "foo") 

566 ) 

567 print(result.fetchall()) 

568 

569.. _postgresql_insert_on_conflict: 

570 

571INSERT...ON CONFLICT (Upsert) 

572------------------------------ 

573 

574Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of 

575rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A 

576candidate row will only be inserted if that row does not violate any unique 

577constraints. In the case of a unique constraint violation, a secondary action 

578can occur which can be either "DO UPDATE", indicating that the data in the 

579target row should be updated, or "DO NOTHING", which indicates to silently skip 

580this row. 

581 

582Conflicts are determined using existing unique constraints and indexes. These 

583constraints may be identified either using their name as stated in DDL, 

584or they may be inferred by stating the columns and conditions that comprise 

585the indexes. 

586 

587SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific 

588:func:`_postgresql.insert()` function, which provides 

589the generative methods :meth:`_postgresql.Insert.on_conflict_do_update` 

590and :meth:`~.postgresql.Insert.on_conflict_do_nothing`: 

591 

592.. sourcecode:: pycon+sql 

593 

594 >>> from sqlalchemy.dialects.postgresql import insert 

595 >>> insert_stmt = insert(my_table).values( 

596 ... id="some_existing_id", data="inserted value" 

597 ... ) 

598 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"]) 

599 >>> print(do_nothing_stmt) 

600 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

601 ON CONFLICT (id) DO NOTHING 

602 {stop} 

603 

604 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

605 ... constraint="pk_my_table", set_=dict(data="updated value") 

606 ... ) 

607 >>> print(do_update_stmt) 

608 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

609 ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s 

610 

611.. seealso:: 

612 

613 `INSERT .. ON CONFLICT 

614 <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_ 

615 - in the PostgreSQL documentation. 

616 

617Specifying the Target 

618^^^^^^^^^^^^^^^^^^^^^ 

619 

620Both methods supply the "target" of the conflict using either the 

621named constraint or by column inference: 

622 

623* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument 

624 specifies a sequence containing string column names, :class:`_schema.Column` 

625 objects, and/or SQL expression elements, which would identify a unique 

626 index: 

627 

628 .. sourcecode:: pycon+sql 

629 

630 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

631 ... index_elements=["id"], set_=dict(data="updated value") 

632 ... ) 

633 >>> print(do_update_stmt) 

634 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

635 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

636 {stop} 

637 

638 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

639 ... index_elements=[my_table.c.id], set_=dict(data="updated value") 

640 ... ) 

641 >>> print(do_update_stmt) 

642 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

643 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

644 

645* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to 

646 infer an index, a partial index can be inferred by also specifying the 

647 use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter: 

648 

649 .. sourcecode:: pycon+sql 

650 

651 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data") 

652 >>> stmt = stmt.on_conflict_do_update( 

653 ... index_elements=[my_table.c.user_email], 

654 ... index_where=my_table.c.user_email.like("%@gmail.com"), 

655 ... set_=dict(data=stmt.excluded.data), 

656 ... ) 

657 >>> print(stmt) 

658 {printsql}INSERT INTO my_table (data, user_email) 

659 VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email) 

660 WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data 

661 

662* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is 

663 used to specify an index directly rather than inferring it. This can be 

664 the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX: 

665 

666 .. sourcecode:: pycon+sql 

667 

668 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

669 ... constraint="my_table_idx_1", set_=dict(data="updated value") 

670 ... ) 

671 >>> print(do_update_stmt) 

672 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

673 ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s 

674 {stop} 

675 

676 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

677 ... constraint="my_table_pk", set_=dict(data="updated value") 

678 ... ) 

679 >>> print(do_update_stmt) 

680 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

681 ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s 

682 {stop} 

683 

684* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may 

685 also refer to a SQLAlchemy construct representing a constraint, 

686 e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`, 

687 :class:`.Index`, or :class:`.ExcludeConstraint`. In this use, 

688 if the constraint has a name, it is used directly. Otherwise, if the 

689 constraint is unnamed, then inference will be used, where the expressions 

690 and optional WHERE clause of the constraint will be spelled out in the 

691 construct. This use is especially convenient 

692 to refer to the named or unnamed primary key of a :class:`_schema.Table` 

693 using the 

694 :attr:`_schema.Table.primary_key` attribute: 

695 

696 .. sourcecode:: pycon+sql 

697 

698 >>> do_update_stmt = insert_stmt.on_conflict_do_update( 

699 ... constraint=my_table.primary_key, set_=dict(data="updated value") 

700 ... ) 

701 >>> print(do_update_stmt) 

702 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

703 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

704 

705The SET Clause 

706^^^^^^^^^^^^^^^ 

707 

708``ON CONFLICT...DO UPDATE`` is used to perform an update of the already 

709existing row, using any combination of new values as well as values 

710from the proposed insertion. These values are specified using the 

711:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This 

712parameter accepts a dictionary which consists of direct values 

713for UPDATE: 

714 

715.. sourcecode:: pycon+sql 

716 

717 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

718 >>> do_update_stmt = stmt.on_conflict_do_update( 

719 ... index_elements=["id"], set_=dict(data="updated value") 

720 ... ) 

721 >>> print(do_update_stmt) 

722 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

723 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s 

724 

725.. warning:: 

726 

727 The :meth:`_expression.Insert.on_conflict_do_update` 

728 method does **not** take into 

729 account Python-side default UPDATE values or generation functions, e.g. 

730 those specified using :paramref:`_schema.Column.onupdate`. 

731 These values will not be exercised for an ON CONFLICT style of UPDATE, 

732 unless they are manually specified in the 

733 :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary. 

734 

735Updating using the Excluded INSERT Values 

736^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

737 

738In order to refer to the proposed insertion row, the special alias 

739:attr:`~.postgresql.Insert.excluded` is available as an attribute on 

740the :class:`_postgresql.Insert` object; this object is a 

741:class:`_expression.ColumnCollection` 

742which alias contains all columns of the target 

743table: 

744 

745.. sourcecode:: pycon+sql 

746 

747 >>> stmt = insert(my_table).values( 

748 ... id="some_id", data="inserted value", author="jlh" 

749 ... ) 

750 >>> do_update_stmt = stmt.on_conflict_do_update( 

751 ... index_elements=["id"], 

752 ... set_=dict(data="updated value", author=stmt.excluded.author), 

753 ... ) 

754 >>> print(do_update_stmt) 

755 {printsql}INSERT INTO my_table (id, data, author) 

756 VALUES (%(id)s, %(data)s, %(author)s) 

757 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author 

758 

759Additional WHERE Criteria 

760^^^^^^^^^^^^^^^^^^^^^^^^^ 

761 

762The :meth:`_expression.Insert.on_conflict_do_update` method also accepts 

763a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where` 

764parameter, which will limit those rows which receive an UPDATE: 

765 

766.. sourcecode:: pycon+sql 

767 

768 >>> stmt = insert(my_table).values( 

769 ... id="some_id", data="inserted value", author="jlh" 

770 ... ) 

771 >>> on_update_stmt = stmt.on_conflict_do_update( 

772 ... index_elements=["id"], 

773 ... set_=dict(data="updated value", author=stmt.excluded.author), 

774 ... where=(my_table.c.status == 2), 

775 ... ) 

776 >>> print(on_update_stmt) 

777 {printsql}INSERT INTO my_table (id, data, author) 

778 VALUES (%(id)s, %(data)s, %(author)s) 

779 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author 

780 WHERE my_table.status = %(status_1)s 

781 

782Skipping Rows with DO NOTHING 

783^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

784 

785``ON CONFLICT`` may be used to skip inserting a row entirely 

786if any conflict with a unique or exclusion constraint occurs; below 

787this is illustrated using the 

788:meth:`~.postgresql.Insert.on_conflict_do_nothing` method: 

789 

790.. sourcecode:: pycon+sql 

791 

792 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

793 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"]) 

794 >>> print(stmt) 

795 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

796 ON CONFLICT (id) DO NOTHING 

797 

798If ``DO NOTHING`` is used without specifying any columns or constraint, 

799it has the effect of skipping the INSERT for any unique or exclusion 

800constraint violation which occurs: 

801 

802.. sourcecode:: pycon+sql 

803 

804 >>> stmt = insert(my_table).values(id="some_id", data="inserted value") 

805 >>> stmt = stmt.on_conflict_do_nothing() 

806 >>> print(stmt) 

807 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s) 

808 ON CONFLICT DO NOTHING 

809 

810.. _postgresql_match: 

811 

812Full Text Search 

813---------------- 

814 

815PostgreSQL's full text search system is available through the use of the 

816:data:`.func` namespace, combined with the use of custom operators 

817via the :meth:`.Operators.bool_op` method. For simple cases with some 

818degree of cross-backend compatibility, the :meth:`.Operators.match` operator 

819may also be used. 

820 

821.. _postgresql_simple_match: 

822 

823Simple plain text matching with ``match()`` 

824^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

825 

826The :meth:`.Operators.match` operator provides for cross-compatible simple 

827text matching. For the PostgreSQL backend, it's hardcoded to generate 

828an expression using the ``@@`` operator in conjunction with the 

829``plainto_tsquery()`` PostgreSQL function. 

830 

831On the PostgreSQL dialect, an expression like the following:: 

832 

833 select(sometable.c.text.match("search string")) 

834 

835would emit to the database: 

836 

837.. sourcecode:: sql 

838 

839 SELECT text @@ plainto_tsquery('search string') FROM table 

840 

841Above, passing a plain string to :meth:`.Operators.match` will automatically 

842make use of ``plainto_tsquery()`` to specify the type of tsquery. This 

843establishes basic database cross-compatibility for :meth:`.Operators.match` 

844with other backends. 

845 

846.. versionchanged:: 2.0 The default tsquery generation function used by the 

847 PostgreSQL dialect with :meth:`.Operators.match` is ``plainto_tsquery()``. 

848 

849 To render exactly what was rendered in 1.4, use the following form:: 

850 

851 from sqlalchemy import func 

852 

853 select(sometable.c.text.bool_op("@@")(func.to_tsquery("search string"))) 

854 

855 Which would emit: 

856 

857 .. sourcecode:: sql 

858 

859 SELECT text @@ to_tsquery('search string') FROM table 

860 

861Using PostgreSQL full text functions and operators directly 

862^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

863 

864Text search operations beyond the simple use of :meth:`.Operators.match` 

865may make use of the :data:`.func` namespace to generate PostgreSQL full-text 

866functions, in combination with :meth:`.Operators.bool_op` to generate 

867any boolean operator. 

868 

869For example, the query:: 

870 

871 select(func.to_tsquery("cat").bool_op("@>")(func.to_tsquery("cat & rat"))) 

872 

873would generate: 

874 

875.. sourcecode:: sql 

876 

877 SELECT to_tsquery('cat') @> to_tsquery('cat & rat') 

878 

879 

880The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST:: 

881 

882 from sqlalchemy.dialects.postgresql import TSVECTOR 

883 from sqlalchemy import select, cast 

884 

885 select(cast("some text", TSVECTOR)) 

886 

887produces a statement equivalent to: 

888 

889.. sourcecode:: sql 

890 

891 SELECT CAST('some text' AS TSVECTOR) AS anon_1 

892 

893The ``func`` namespace is augmented by the PostgreSQL dialect to set up 

894correct argument and return types for most full text search functions. 

895These functions are used automatically by the :attr:`_sql.func` namespace 

896assuming the ``sqlalchemy.dialects.postgresql`` package has been imported, 

897or :func:`_sa.create_engine` has been invoked using a ``postgresql`` 

898dialect. These functions are documented at: 

899 

900* :class:`_postgresql.to_tsvector` 

901* :class:`_postgresql.to_tsquery` 

902* :class:`_postgresql.plainto_tsquery` 

903* :class:`_postgresql.phraseto_tsquery` 

904* :class:`_postgresql.websearch_to_tsquery` 

905* :class:`_postgresql.ts_headline` 

906 

907Specifying the "regconfig" with ``match()`` or custom operators 

908^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

909 

910PostgreSQL's ``plainto_tsquery()`` function accepts an optional 

911"regconfig" argument that is used to instruct PostgreSQL to use a 

912particular pre-computed GIN or GiST index in order to perform the search. 

913When using :meth:`.Operators.match`, this additional parameter may be 

914specified using the ``postgresql_regconfig`` parameter, such as:: 

915 

916 select(mytable.c.id).where( 

917 mytable.c.title.match("somestring", postgresql_regconfig="english") 

918 ) 

919 

920Which would emit: 

921 

922.. sourcecode:: sql 

923 

924 SELECT mytable.id FROM mytable 

925 WHERE mytable.title @@ plainto_tsquery('english', 'somestring') 

926 

927When using other PostgreSQL search functions with :data:`.func`, the 

928"regconfig" parameter may be passed directly as the initial argument:: 

929 

930 select(mytable.c.id).where( 

931 func.to_tsvector("english", mytable.c.title).bool_op("@@")( 

932 func.to_tsquery("english", "somestring") 

933 ) 

934 ) 

935 

936produces a statement equivalent to: 

937 

938.. sourcecode:: sql 

939 

940 SELECT mytable.id FROM mytable 

941 WHERE to_tsvector('english', mytable.title) @@ 

942 to_tsquery('english', 'somestring') 

943 

944It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from 

945PostgreSQL to ensure that you are generating queries with SQLAlchemy that 

946take full advantage of any indexes you may have created for full text search. 

947 

948.. seealso:: 

949 

950 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation 

951 

952 

953FROM ONLY ... 

954------------- 

955 

956The dialect supports PostgreSQL's ONLY keyword for targeting only a particular 

957table in an inheritance hierarchy. This can be used to produce the 

958``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...`` 

959syntaxes. It uses SQLAlchemy's hints mechanism:: 

960 

961 # SELECT ... FROM ONLY ... 

962 result = table.select().with_hint(table, "ONLY", "postgresql") 

963 print(result.fetchall()) 

964 

965 # UPDATE ONLY ... 

966 table.update(values=dict(foo="bar")).with_hint( 

967 "ONLY", dialect_name="postgresql" 

968 ) 

969 

970 # DELETE FROM ONLY ... 

971 table.delete().with_hint("ONLY", dialect_name="postgresql") 

972 

973.. _postgresql_indexes: 

974 

975PostgreSQL-Specific Index Options 

976--------------------------------- 

977 

978Several extensions to the :class:`.Index` construct are available, specific 

979to the PostgreSQL dialect. 

980 

981.. _postgresql_covering_indexes: 

982 

983Covering Indexes 

984^^^^^^^^^^^^^^^^ 

985 

986The ``postgresql_include`` option renders INCLUDE(colname) for the given 

987string names:: 

988 

989 Index("my_index", table.c.x, postgresql_include=["y"]) 

990 

991would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)`` 

992 

993Note that this feature requires PostgreSQL 11 or later. 

994 

995.. seealso:: 

996 

997 :ref:`postgresql_constraint_options` 

998 

999.. versionadded:: 1.4 

1000 

1001.. _postgresql_partial_indexes: 

1002 

1003Partial Indexes 

1004^^^^^^^^^^^^^^^ 

1005 

1006Partial indexes add criterion to the index definition so that the index is 

1007applied to a subset of rows. These can be specified on :class:`.Index` 

1008using the ``postgresql_where`` keyword argument:: 

1009 

1010 Index("my_index", my_table.c.id, postgresql_where=my_table.c.value > 10) 

1011 

1012.. _postgresql_operator_classes: 

1013 

1014Operator Classes 

1015^^^^^^^^^^^^^^^^ 

1016 

1017PostgreSQL allows the specification of an *operator class* for each column of 

1018an index (see 

1019https://www.postgresql.org/docs/current/interactive/indexes-opclass.html). 

1020The :class:`.Index` construct allows these to be specified via the 

1021``postgresql_ops`` keyword argument:: 

1022 

1023 Index( 

1024 "my_index", 

1025 my_table.c.id, 

1026 my_table.c.data, 

1027 postgresql_ops={"data": "text_pattern_ops", "id": "int4_ops"}, 

1028 ) 

1029 

1030Note that the keys in the ``postgresql_ops`` dictionaries are the 

1031"key" name of the :class:`_schema.Column`, i.e. the name used to access it from 

1032the ``.c`` collection of :class:`_schema.Table`, which can be configured to be 

1033different than the actual name of the column as expressed in the database. 

1034 

1035If ``postgresql_ops`` is to be used against a complex SQL expression such 

1036as a function call, then to apply to the column it must be given a label 

1037that is identified in the dictionary by name, e.g.:: 

1038 

1039 Index( 

1040 "my_index", 

1041 my_table.c.id, 

1042 func.lower(my_table.c.data).label("data_lower"), 

1043 postgresql_ops={"data_lower": "text_pattern_ops", "id": "int4_ops"}, 

1044 ) 

1045 

1046Operator classes are also supported by the 

1047:class:`_postgresql.ExcludeConstraint` construct using the 

1048:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for 

1049details. 

1050 

1051.. versionadded:: 1.3.21 added support for operator classes with 

1052 :class:`_postgresql.ExcludeConstraint`. 

1053 

1054 

1055Index Types 

1056^^^^^^^^^^^ 

1057 

1058PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well 

1059as the ability for users to create their own (see 

1060https://www.postgresql.org/docs/current/static/indexes-types.html). These can be 

1061specified on :class:`.Index` using the ``postgresql_using`` keyword argument:: 

1062 

1063 Index("my_index", my_table.c.data, postgresql_using="gin") 

1064 

1065The value passed to the keyword argument will be simply passed through to the 

1066underlying CREATE INDEX command, so it *must* be a valid index type for your 

1067version of PostgreSQL. 

1068 

1069.. _postgresql_index_storage: 

1070 

1071Index Storage Parameters 

1072^^^^^^^^^^^^^^^^^^^^^^^^ 

1073 

1074PostgreSQL allows storage parameters to be set on indexes. The storage 

1075parameters available depend on the index method used by the index. Storage 

1076parameters can be specified on :class:`.Index` using the ``postgresql_with`` 

1077keyword argument:: 

1078 

1079 Index("my_index", my_table.c.data, postgresql_with={"fillfactor": 50}) 

1080 

1081PostgreSQL allows to define the tablespace in which to create the index. 

1082The tablespace can be specified on :class:`.Index` using the 

1083``postgresql_tablespace`` keyword argument:: 

1084 

1085 Index("my_index", my_table.c.data, postgresql_tablespace="my_tablespace") 

1086 

1087Note that the same option is available on :class:`_schema.Table` as well. 

1088 

1089.. _postgresql_index_concurrently: 

1090 

1091Indexes with CONCURRENTLY 

1092^^^^^^^^^^^^^^^^^^^^^^^^^ 

1093 

1094The PostgreSQL index option CONCURRENTLY is supported by passing the 

1095flag ``postgresql_concurrently`` to the :class:`.Index` construct:: 

1096 

1097 tbl = Table("testtbl", m, Column("data", Integer)) 

1098 

1099 idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True) 

1100 

1101The above index construct will render DDL for CREATE INDEX, assuming 

1102PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as: 

1103 

1104.. sourcecode:: sql 

1105 

1106 CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data) 

1107 

1108For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for 

1109a connection-less dialect, it will emit: 

1110 

1111.. sourcecode:: sql 

1112 

1113 DROP INDEX CONCURRENTLY test_idx1 

1114 

1115When using CONCURRENTLY, the PostgreSQL database requires that the statement 

1116be invoked outside of a transaction block. The Python DBAPI enforces that 

1117even for a single statement, a transaction is present, so to use this 

1118construct, the DBAPI's "autocommit" mode must be used:: 

1119 

1120 metadata = MetaData() 

1121 table = Table("foo", metadata, Column("id", String)) 

1122 index = Index("foo_idx", table.c.id, postgresql_concurrently=True) 

1123 

1124 with engine.connect() as conn: 

1125 with conn.execution_options(isolation_level="AUTOCOMMIT"): 

1126 table.create(conn) 

1127 

1128.. seealso:: 

1129 

1130 :ref:`postgresql_isolation_level` 

1131 

1132.. _postgresql_index_reflection: 

1133 

1134PostgreSQL Index Reflection 

1135--------------------------- 

1136 

1137The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the 

1138UNIQUE CONSTRAINT construct is used. When inspecting a table using 

1139:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes` 

1140and the :meth:`_reflection.Inspector.get_unique_constraints` 

1141will report on these 

1142two constructs distinctly; in the case of the index, the key 

1143``duplicates_constraint`` will be present in the index entry if it is 

1144detected as mirroring a constraint. When performing reflection using 

1145``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned 

1146in :attr:`_schema.Table.indexes` when it is detected as mirroring a 

1147:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection 

1148. 

1149 

1150Special Reflection Options 

1151-------------------------- 

1152 

1153The :class:`_reflection.Inspector` 

1154used for the PostgreSQL backend is an instance 

1155of :class:`.PGInspector`, which offers additional methods:: 

1156 

1157 from sqlalchemy import create_engine, inspect 

1158 

1159 engine = create_engine("postgresql+psycopg2://localhost/test") 

1160 insp = inspect(engine) # will be a PGInspector 

1161 

1162 print(insp.get_enums()) 

1163 

1164.. autoclass:: PGInspector 

1165 :members: 

1166 

1167.. _postgresql_table_options: 

1168 

1169PostgreSQL Table Options 

1170------------------------ 

1171 

1172Several options for CREATE TABLE are supported directly by the PostgreSQL 

1173dialect in conjunction with the :class:`_schema.Table` construct: 

1174 

1175* ``INHERITS``:: 

1176 

1177 Table("some_table", metadata, ..., postgresql_inherits="some_supertable") 

1178 

1179 Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...)) 

1180 

1181* ``ON COMMIT``:: 

1182 

1183 Table("some_table", metadata, ..., postgresql_on_commit="PRESERVE ROWS") 

1184 

1185* 

1186 ``PARTITION BY``:: 

1187 

1188 Table( 

1189 "some_table", 

1190 metadata, 

1191 ..., 

1192 postgresql_partition_by="LIST (part_column)", 

1193 ) 

1194 

1195 .. versionadded:: 1.2.6 

1196 

1197* 

1198 ``TABLESPACE``:: 

1199 

1200 Table("some_table", metadata, ..., postgresql_tablespace="some_tablespace") 

1201 

1202 The above option is also available on the :class:`.Index` construct. 

1203 

1204* 

1205 ``USING``:: 

1206 

1207 Table("some_table", metadata, ..., postgresql_using="heap") 

1208 

1209 .. versionadded:: 2.0.26 

1210 

1211* ``WITH OIDS``:: 

1212 

1213 Table("some_table", metadata, ..., postgresql_with_oids=True) 

1214 

1215* ``WITHOUT OIDS``:: 

1216 

1217 Table("some_table", metadata, ..., postgresql_with_oids=False) 

1218 

1219.. seealso:: 

1220 

1221 `PostgreSQL CREATE TABLE options 

1222 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ - 

1223 in the PostgreSQL documentation. 

1224 

1225.. _postgresql_constraint_options: 

1226 

1227PostgreSQL Constraint Options 

1228----------------------------- 

1229 

1230The following option(s) are supported by the PostgreSQL dialect in conjunction 

1231with selected constraint constructs: 

1232 

1233* ``NOT VALID``: This option applies towards CHECK and FOREIGN KEY constraints 

1234 when the constraint is being added to an existing table via ALTER TABLE, 

1235 and has the effect that existing rows are not scanned during the ALTER 

1236 operation against the constraint being added. 

1237 

1238 When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_ 

1239 that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument 

1240 may be specified as an additional keyword argument within the operation 

1241 that creates the constraint, as in the following Alembic example:: 

1242 

1243 def update(): 

1244 op.create_foreign_key( 

1245 "fk_user_address", 

1246 "address", 

1247 "user", 

1248 ["user_id"], 

1249 ["id"], 

1250 postgresql_not_valid=True, 

1251 ) 

1252 

1253 The keyword is ultimately accepted directly by the 

1254 :class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint` 

1255 and :class:`_schema.ForeignKey` constructs; when using a tool like 

1256 Alembic, dialect-specific keyword arguments are passed through to 

1257 these constructs from the migration operation directives:: 

1258 

1259 CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True) 

1260 

1261 ForeignKeyConstraint( 

1262 ["some_id"], ["some_table.some_id"], postgresql_not_valid=True 

1263 ) 

1264 

1265 .. versionadded:: 1.4.32 

1266 

1267 .. seealso:: 

1268 

1269 `PostgreSQL ALTER TABLE options 

1270 <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ - 

1271 in the PostgreSQL documentation. 

1272 

1273* ``INCLUDE``: This option adds one or more columns as a "payload" to the 

1274 unique index created automatically by PostgreSQL for the constraint. 

1275 For example, the following table definition:: 

1276 

1277 Table( 

1278 "mytable", 

1279 metadata, 

1280 Column("id", Integer, nullable=False), 

1281 Column("value", Integer, nullable=False), 

1282 UniqueConstraint("id", postgresql_include=["value"]), 

1283 ) 

1284 

1285 would produce the DDL statement 

1286 

1287 .. sourcecode:: sql 

1288 

1289 CREATE TABLE mytable ( 

1290 id INTEGER NOT NULL, 

1291 value INTEGER NOT NULL, 

1292 UNIQUE (id) INCLUDE (value) 

1293 ) 

1294 

1295 Note that this feature requires PostgreSQL 11 or later. 

1296 

1297 .. versionadded:: 2.0.41 

1298 

1299 .. seealso:: 

1300 

1301 :ref:`postgresql_covering_indexes` 

1302 

1303 .. seealso:: 

1304 

1305 `PostgreSQL CREATE TABLE options 

1306 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ - 

1307 in the PostgreSQL documentation. 

1308 

1309* Column list with foreign key ``ON DELETE SET`` actions: This applies to 

1310 :class:`.ForeignKey` and :class:`.ForeignKeyConstraint`, the :paramref:`.ForeignKey.ondelete` 

1311 parameter will accept on the PostgreSQL backend only a string list of column 

1312 names inside parenthesis, following the ``SET NULL`` or ``SET DEFAULT`` 

1313 phrases, which will limit the set of columns that are subject to the 

1314 action:: 

1315 

1316 fktable = Table( 

1317 "fktable", 

1318 metadata, 

1319 Column("tid", Integer), 

1320 Column("id", Integer), 

1321 Column("fk_id_del_set_null", Integer), 

1322 ForeignKeyConstraint( 

1323 columns=["tid", "fk_id_del_set_null"], 

1324 refcolumns=[pktable.c.tid, pktable.c.id], 

1325 ondelete="SET NULL (fk_id_del_set_null)", 

1326 ), 

1327 ) 

1328 

1329 .. versionadded:: 2.0.40 

1330 

1331 

1332.. _postgresql_table_valued_overview: 

1333 

1334Table values, Table and Column valued functions, Row and Tuple objects 

1335----------------------------------------------------------------------- 

1336 

1337PostgreSQL makes great use of modern SQL forms such as table-valued functions, 

1338tables and rows as values. These constructs are commonly used as part 

1339of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other 

1340datatypes. SQLAlchemy's SQL expression language has native support for 

1341most table-valued and row-valued forms. 

1342 

1343.. _postgresql_table_valued: 

1344 

1345Table-Valued Functions 

1346^^^^^^^^^^^^^^^^^^^^^^^ 

1347 

1348Many PostgreSQL built-in functions are intended to be used in the FROM clause 

1349of a SELECT statement, and are capable of returning table rows or sets of table 

1350rows. A large portion of PostgreSQL's JSON functions for example such as 

1351``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``, 

1352``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such 

1353forms. These classes of SQL function calling forms in SQLAlchemy are available 

1354using the :meth:`_functions.FunctionElement.table_valued` method in conjunction 

1355with :class:`_functions.Function` objects generated from the :data:`_sql.func` 

1356namespace. 

1357 

1358Examples from PostgreSQL's reference documentation follow below: 

1359 

1360* ``json_each()``: 

1361 

1362 .. sourcecode:: pycon+sql 

1363 

1364 >>> from sqlalchemy import select, func 

1365 >>> stmt = select( 

1366 ... func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value") 

1367 ... ) 

1368 >>> print(stmt) 

1369 {printsql}SELECT anon_1.key, anon_1.value 

1370 FROM json_each(:json_each_1) AS anon_1 

1371 

1372* ``json_populate_record()``: 

1373 

1374 .. sourcecode:: pycon+sql 

1375 

1376 >>> from sqlalchemy import select, func, literal_column 

1377 >>> stmt = select( 

1378 ... func.json_populate_record( 

1379 ... literal_column("null::myrowtype"), '{"a":1,"b":2}' 

1380 ... ).table_valued("a", "b", name="x") 

1381 ... ) 

1382 >>> print(stmt) 

1383 {printsql}SELECT x.a, x.b 

1384 FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x 

1385 

1386* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived 

1387 columns in the alias, where we may make use of :func:`_sql.column` elements with 

1388 types to produce them. The :meth:`_functions.FunctionElement.table_valued` 

1389 method produces a :class:`_sql.TableValuedAlias` construct, and the method 

1390 :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived 

1391 columns specification: 

1392 

1393 .. sourcecode:: pycon+sql 

1394 

1395 >>> from sqlalchemy import select, func, column, Integer, Text 

1396 >>> stmt = select( 

1397 ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}') 

1398 ... .table_valued( 

1399 ... column("a", Integer), 

1400 ... column("b", Text), 

1401 ... column("d", Text), 

1402 ... ) 

1403 ... .render_derived(name="x", with_types=True) 

1404 ... ) 

1405 >>> print(stmt) 

1406 {printsql}SELECT x.a, x.b, x.d 

1407 FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT) 

1408 

1409* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an 

1410 ordinal counter to the output of a function and is accepted by a limited set 

1411 of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The 

1412 :meth:`_functions.FunctionElement.table_valued` method accepts a keyword 

1413 parameter ``with_ordinality`` for this purpose, which accepts the string name 

1414 that will be applied to the "ordinality" column: 

1415 

1416 .. sourcecode:: pycon+sql 

1417 

1418 >>> from sqlalchemy import select, func 

1419 >>> stmt = select( 

1420 ... func.generate_series(4, 1, -1) 

1421 ... .table_valued("value", with_ordinality="ordinality") 

1422 ... .render_derived() 

1423 ... ) 

1424 >>> print(stmt) 

1425 {printsql}SELECT anon_1.value, anon_1.ordinality 

1426 FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3) 

1427 WITH ORDINALITY AS anon_1(value, ordinality) 

1428 

1429.. versionadded:: 1.4.0b2 

1430 

1431.. seealso:: 

1432 

1433 :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial` 

1434 

1435.. _postgresql_column_valued: 

1436 

1437Column Valued Functions 

1438^^^^^^^^^^^^^^^^^^^^^^^ 

1439 

1440Similar to the table valued function, a column valued function is present 

1441in the FROM clause, but delivers itself to the columns clause as a single 

1442scalar value. PostgreSQL functions such as ``json_array_elements()``, 

1443``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the 

1444:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`: 

1445 

1446* ``json_array_elements()``: 

1447 

1448 .. sourcecode:: pycon+sql 

1449 

1450 >>> from sqlalchemy import select, func 

1451 >>> stmt = select( 

1452 ... func.json_array_elements('["one", "two"]').column_valued("x") 

1453 ... ) 

1454 >>> print(stmt) 

1455 {printsql}SELECT x 

1456 FROM json_array_elements(:json_array_elements_1) AS x 

1457 

1458* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the 

1459 :func:`_postgresql.array` construct may be used: 

1460 

1461 .. sourcecode:: pycon+sql 

1462 

1463 >>> from sqlalchemy.dialects.postgresql import array 

1464 >>> from sqlalchemy import select, func 

1465 >>> stmt = select(func.unnest(array([1, 2])).column_valued()) 

1466 >>> print(stmt) 

1467 {printsql}SELECT anon_1 

1468 FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1 

1469 

1470 The function can of course be used against an existing table-bound column 

1471 that's of type :class:`_types.ARRAY`: 

1472 

1473 .. sourcecode:: pycon+sql 

1474 

1475 >>> from sqlalchemy import table, column, ARRAY, Integer 

1476 >>> from sqlalchemy import select, func 

1477 >>> t = table("t", column("value", ARRAY(Integer))) 

1478 >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value")) 

1479 >>> print(stmt) 

1480 {printsql}SELECT unnested_value 

1481 FROM unnest(t.value) AS unnested_value 

1482 

1483.. seealso:: 

1484 

1485 :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial` 

1486 

1487 

1488Row Types 

1489^^^^^^^^^ 

1490 

1491Built-in support for rendering a ``ROW`` may be approximated using 

1492``func.ROW`` with the :attr:`_sa.func` namespace, or by using the 

1493:func:`_sql.tuple_` construct: 

1494 

1495.. sourcecode:: pycon+sql 

1496 

1497 >>> from sqlalchemy import table, column, func, tuple_ 

1498 >>> t = table("t", column("id"), column("fk")) 

1499 >>> stmt = ( 

1500 ... t.select() 

1501 ... .where(tuple_(t.c.id, t.c.fk) > (1, 2)) 

1502 ... .where(func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7)) 

1503 ... ) 

1504 >>> print(stmt) 

1505 {printsql}SELECT t.id, t.fk 

1506 FROM t 

1507 WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2) 

1508 

1509.. seealso:: 

1510 

1511 `PostgreSQL Row Constructors 

1512 <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_ 

1513 

1514 `PostgreSQL Row Constructor Comparison 

1515 <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_ 

1516 

1517Table Types passed to Functions 

1518^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ 

1519 

1520PostgreSQL supports passing a table as an argument to a function, which is 

1521known as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects 

1522such as :class:`_schema.Table` support this special form using the 

1523:meth:`_sql.FromClause.table_valued` method, which is comparable to the 

1524:meth:`_functions.FunctionElement.table_valued` method except that the collection 

1525of columns is already established by that of the :class:`_sql.FromClause` 

1526itself: 

1527 

1528.. sourcecode:: pycon+sql 

1529 

1530 >>> from sqlalchemy import table, column, func, select 

1531 >>> a = table("a", column("id"), column("x"), column("y")) 

1532 >>> stmt = select(func.row_to_json(a.table_valued())) 

1533 >>> print(stmt) 

1534 {printsql}SELECT row_to_json(a) AS row_to_json_1 

1535 FROM a 

1536 

1537.. versionadded:: 1.4.0b2 

1538 

1539 

1540 

1541""" # noqa: E501 

1542 

1543from __future__ import annotations 

1544 

1545from collections import defaultdict 

1546from functools import lru_cache 

1547import re 

1548from typing import Any 

1549from typing import cast 

1550from typing import Dict 

1551from typing import List 

1552from typing import Optional 

1553from typing import Tuple 

1554from typing import TYPE_CHECKING 

1555from typing import Union 

1556 

1557from . import arraylib as _array 

1558from . import json as _json 

1559from . import pg_catalog 

1560from . import ranges as _ranges 

1561from .ext import _regconfig_fn 

1562from .ext import aggregate_order_by 

1563from .hstore import HSTORE 

1564from .named_types import CreateDomainType as CreateDomainType # noqa: F401 

1565from .named_types import CreateEnumType as CreateEnumType # noqa: F401 

1566from .named_types import DOMAIN as DOMAIN # noqa: F401 

1567from .named_types import DropDomainType as DropDomainType # noqa: F401 

1568from .named_types import DropEnumType as DropEnumType # noqa: F401 

1569from .named_types import ENUM as ENUM # noqa: F401 

1570from .named_types import NamedType as NamedType # noqa: F401 

1571from .types import _DECIMAL_TYPES # noqa: F401 

1572from .types import _FLOAT_TYPES # noqa: F401 

1573from .types import _INT_TYPES # noqa: F401 

1574from .types import BIT as BIT 

1575from .types import BYTEA as BYTEA 

1576from .types import CIDR as CIDR 

1577from .types import CITEXT as CITEXT 

1578from .types import INET as INET 

1579from .types import INTERVAL as INTERVAL 

1580from .types import MACADDR as MACADDR 

1581from .types import MACADDR8 as MACADDR8 

1582from .types import MONEY as MONEY 

1583from .types import OID as OID 

1584from .types import PGBit as PGBit # noqa: F401 

1585from .types import PGCidr as PGCidr # noqa: F401 

1586from .types import PGInet as PGInet # noqa: F401 

1587from .types import PGInterval as PGInterval # noqa: F401 

1588from .types import PGMacAddr as PGMacAddr # noqa: F401 

1589from .types import PGMacAddr8 as PGMacAddr8 # noqa: F401 

1590from .types import PGUuid as PGUuid 

1591from .types import REGCLASS as REGCLASS 

1592from .types import REGCONFIG as REGCONFIG # noqa: F401 

1593from .types import TIME as TIME 

1594from .types import TIMESTAMP as TIMESTAMP 

1595from .types import TSVECTOR as TSVECTOR 

1596from ... import exc 

1597from ... import schema 

1598from ... import select 

1599from ... import sql 

1600from ... import util 

1601from ...engine import characteristics 

1602from ...engine import default 

1603from ...engine import interfaces 

1604from ...engine import ObjectKind 

1605from ...engine import ObjectScope 

1606from ...engine import reflection 

1607from ...engine import URL 

1608from ...engine.reflection import ReflectionDefaults 

1609from ...sql import bindparam 

1610from ...sql import coercions 

1611from ...sql import compiler 

1612from ...sql import elements 

1613from ...sql import expression 

1614from ...sql import roles 

1615from ...sql import sqltypes 

1616from ...sql import util as sql_util 

1617from ...sql.compiler import InsertmanyvaluesSentinelOpts 

1618from ...sql.visitors import InternalTraversal 

1619from ...types import BIGINT 

1620from ...types import BOOLEAN 

1621from ...types import CHAR 

1622from ...types import DATE 

1623from ...types import DOUBLE_PRECISION 

1624from ...types import FLOAT 

1625from ...types import INTEGER 

1626from ...types import NUMERIC 

1627from ...types import REAL 

1628from ...types import SMALLINT 

1629from ...types import TEXT 

1630from ...types import UUID as UUID 

1631from ...types import VARCHAR 

1632from ...util.typing import TypedDict 

1633 

1634IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I) 

1635 

1636RESERVED_WORDS = { 

1637 "all", 

1638 "analyse", 

1639 "analyze", 

1640 "and", 

1641 "any", 

1642 "array", 

1643 "as", 

1644 "asc", 

1645 "asymmetric", 

1646 "both", 

1647 "case", 

1648 "cast", 

1649 "check", 

1650 "collate", 

1651 "column", 

1652 "constraint", 

1653 "create", 

1654 "current_catalog", 

1655 "current_date", 

1656 "current_role", 

1657 "current_time", 

1658 "current_timestamp", 

1659 "current_user", 

1660 "default", 

1661 "deferrable", 

1662 "desc", 

1663 "distinct", 

1664 "do", 

1665 "else", 

1666 "end", 

1667 "except", 

1668 "false", 

1669 "fetch", 

1670 "for", 

1671 "foreign", 

1672 "from", 

1673 "grant", 

1674 "group", 

1675 "having", 

1676 "in", 

1677 "initially", 

1678 "intersect", 

1679 "into", 

1680 "leading", 

1681 "limit", 

1682 "localtime", 

1683 "localtimestamp", 

1684 "new", 

1685 "not", 

1686 "null", 

1687 "of", 

1688 "off", 

1689 "offset", 

1690 "old", 

1691 "on", 

1692 "only", 

1693 "or", 

1694 "order", 

1695 "placing", 

1696 "primary", 

1697 "references", 

1698 "returning", 

1699 "select", 

1700 "session_user", 

1701 "some", 

1702 "symmetric", 

1703 "table", 

1704 "then", 

1705 "to", 

1706 "trailing", 

1707 "true", 

1708 "union", 

1709 "unique", 

1710 "user", 

1711 "using", 

1712 "variadic", 

1713 "when", 

1714 "where", 

1715 "window", 

1716 "with", 

1717 "authorization", 

1718 "between", 

1719 "binary", 

1720 "cross", 

1721 "current_schema", 

1722 "freeze", 

1723 "full", 

1724 "ilike", 

1725 "inner", 

1726 "is", 

1727 "isnull", 

1728 "join", 

1729 "left", 

1730 "like", 

1731 "natural", 

1732 "notnull", 

1733 "outer", 

1734 "over", 

1735 "overlaps", 

1736 "right", 

1737 "similar", 

1738 "verbose", 

1739} 

1740 

1741 

1742colspecs = { 

1743 sqltypes.ARRAY: _array.ARRAY, 

1744 sqltypes.Interval: INTERVAL, 

1745 sqltypes.Enum: ENUM, 

1746 sqltypes.JSON.JSONPathType: _json.JSONPATH, 

1747 sqltypes.JSON: _json.JSON, 

1748 sqltypes.Uuid: PGUuid, 

1749} 

1750 

1751 

1752ischema_names = { 

1753 "_array": _array.ARRAY, 

1754 "hstore": HSTORE, 

1755 "json": _json.JSON, 

1756 "jsonb": _json.JSONB, 

1757 "int4range": _ranges.INT4RANGE, 

1758 "int8range": _ranges.INT8RANGE, 

1759 "numrange": _ranges.NUMRANGE, 

1760 "daterange": _ranges.DATERANGE, 

1761 "tsrange": _ranges.TSRANGE, 

1762 "tstzrange": _ranges.TSTZRANGE, 

1763 "int4multirange": _ranges.INT4MULTIRANGE, 

1764 "int8multirange": _ranges.INT8MULTIRANGE, 

1765 "nummultirange": _ranges.NUMMULTIRANGE, 

1766 "datemultirange": _ranges.DATEMULTIRANGE, 

1767 "tsmultirange": _ranges.TSMULTIRANGE, 

1768 "tstzmultirange": _ranges.TSTZMULTIRANGE, 

1769 "integer": INTEGER, 

1770 "bigint": BIGINT, 

1771 "smallint": SMALLINT, 

1772 "character varying": VARCHAR, 

1773 "character": CHAR, 

1774 '"char"': sqltypes.String, 

1775 "name": sqltypes.String, 

1776 "text": TEXT, 

1777 "numeric": NUMERIC, 

1778 "float": FLOAT, 

1779 "real": REAL, 

1780 "inet": INET, 

1781 "cidr": CIDR, 

1782 "citext": CITEXT, 

1783 "uuid": UUID, 

1784 "bit": BIT, 

1785 "bit varying": BIT, 

1786 "macaddr": MACADDR, 

1787 "macaddr8": MACADDR8, 

1788 "money": MONEY, 

1789 "oid": OID, 

1790 "regclass": REGCLASS, 

1791 "double precision": DOUBLE_PRECISION, 

1792 "timestamp": TIMESTAMP, 

1793 "timestamp with time zone": TIMESTAMP, 

1794 "timestamp without time zone": TIMESTAMP, 

1795 "time with time zone": TIME, 

1796 "time without time zone": TIME, 

1797 "date": DATE, 

1798 "time": TIME, 

1799 "bytea": BYTEA, 

1800 "boolean": BOOLEAN, 

1801 "interval": INTERVAL, 

1802 "tsvector": TSVECTOR, 

1803} 

1804 

1805 

1806class PGCompiler(compiler.SQLCompiler): 

1807 def visit_to_tsvector_func(self, element, **kw): 

1808 return self._assert_pg_ts_ext(element, **kw) 

1809 

1810 def visit_to_tsquery_func(self, element, **kw): 

1811 return self._assert_pg_ts_ext(element, **kw) 

1812 

1813 def visit_plainto_tsquery_func(self, element, **kw): 

1814 return self._assert_pg_ts_ext(element, **kw) 

1815 

1816 def visit_phraseto_tsquery_func(self, element, **kw): 

1817 return self._assert_pg_ts_ext(element, **kw) 

1818 

1819 def visit_websearch_to_tsquery_func(self, element, **kw): 

1820 return self._assert_pg_ts_ext(element, **kw) 

1821 

1822 def visit_ts_headline_func(self, element, **kw): 

1823 return self._assert_pg_ts_ext(element, **kw) 

1824 

1825 def _assert_pg_ts_ext(self, element, **kw): 

1826 if not isinstance(element, _regconfig_fn): 

1827 # other options here include trying to rewrite the function 

1828 # with the correct types. however, that means we have to 

1829 # "un-SQL-ize" the first argument, which can't work in a 

1830 # generalized way. Also, parent compiler class has already added 

1831 # the incorrect return type to the result map. So let's just 

1832 # make sure the function we want is used up front. 

1833 

1834 raise exc.CompileError( 

1835 f'Can\'t compile "{element.name}()" full text search ' 

1836 f"function construct that does not originate from the " 

1837 f'"sqlalchemy.dialects.postgresql" package. ' 

1838 f'Please ensure "import sqlalchemy.dialects.postgresql" is ' 

1839 f"called before constructing " 

1840 f'"sqlalchemy.func.{element.name}()" to ensure registration ' 

1841 f"of the correct argument and return types." 

1842 ) 

1843 

1844 return f"{element.name}{self.function_argspec(element, **kw)}" 

1845 

1846 def render_bind_cast(self, type_, dbapi_type, sqltext): 

1847 if dbapi_type._type_affinity is sqltypes.String and dbapi_type.length: 

1848 # use VARCHAR with no length for VARCHAR cast. 

1849 # see #9511 

1850 dbapi_type = sqltypes.STRINGTYPE 

1851 return f"""{sqltext}::{ 

1852 self.dialect.type_compiler_instance.process( 

1853 dbapi_type, identifier_preparer=self.preparer 

1854 ) 

1855 }""" 

1856 

1857 def visit_array(self, element, **kw): 

1858 if not element.clauses and not element.type.item_type._isnull: 

1859 return "ARRAY[]::%s" % element.type.compile(self.dialect) 

1860 return "ARRAY[%s]" % self.visit_clauselist(element, **kw) 

1861 

1862 def visit_slice(self, element, **kw): 

1863 return "%s:%s" % ( 

1864 self.process(element.start, **kw), 

1865 self.process(element.stop, **kw), 

1866 ) 

1867 

1868 def visit_bitwise_xor_op_binary(self, binary, operator, **kw): 

1869 return self._generate_generic_binary(binary, " # ", **kw) 

1870 

1871 def visit_json_getitem_op_binary( 

1872 self, binary, operator, _cast_applied=False, **kw 

1873 ): 

1874 if ( 

1875 not _cast_applied 

1876 and binary.type._type_affinity is not sqltypes.JSON 

1877 ): 

1878 kw["_cast_applied"] = True 

1879 return self.process(sql.cast(binary, binary.type), **kw) 

1880 

1881 kw["eager_grouping"] = True 

1882 

1883 if ( 

1884 not _cast_applied 

1885 and isinstance(binary.left.type, _json.JSONB) 

1886 and self.dialect._supports_jsonb_subscripting 

1887 ): 

1888 # for pg14+JSONB use subscript notation: col['key'] instead 

1889 # of col -> 'key' 

1890 return "%s[%s]" % ( 

1891 self.process(binary.left, **kw), 

1892 self.process(binary.right, **kw), 

1893 ) 

1894 else: 

1895 # Fall back to arrow notation for older versions or when cast 

1896 # is applied 

1897 return self._generate_generic_binary( 

1898 binary, " -> " if not _cast_applied else " ->> ", **kw 

1899 ) 

1900 

1901 def visit_json_path_getitem_op_binary( 

1902 self, binary, operator, _cast_applied=False, **kw 

1903 ): 

1904 if ( 

1905 not _cast_applied 

1906 and binary.type._type_affinity is not sqltypes.JSON 

1907 ): 

1908 kw["_cast_applied"] = True 

1909 return self.process(sql.cast(binary, binary.type), **kw) 

1910 

1911 kw["eager_grouping"] = True 

1912 return self._generate_generic_binary( 

1913 binary, " #> " if not _cast_applied else " #>> ", **kw 

1914 ) 

1915 

1916 def visit_getitem_binary(self, binary, operator, **kw): 

1917 return "%s[%s]" % ( 

1918 self.process(binary.left, **kw), 

1919 self.process(binary.right, **kw), 

1920 ) 

1921 

1922 def visit_aggregate_order_by(self, element, **kw): 

1923 return "%s ORDER BY %s" % ( 

1924 self.process(element.target, **kw), 

1925 self.process(element.order_by, **kw), 

1926 ) 

1927 

1928 def visit_match_op_binary(self, binary, operator, **kw): 

1929 if "postgresql_regconfig" in binary.modifiers: 

1930 regconfig = self.render_literal_value( 

1931 binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE 

1932 ) 

1933 if regconfig: 

1934 return "%s @@ plainto_tsquery(%s, %s)" % ( 

1935 self.process(binary.left, **kw), 

1936 regconfig, 

1937 self.process(binary.right, **kw), 

1938 ) 

1939 return "%s @@ plainto_tsquery(%s)" % ( 

1940 self.process(binary.left, **kw), 

1941 self.process(binary.right, **kw), 

1942 ) 

1943 

1944 def visit_ilike_case_insensitive_operand(self, element, **kw): 

1945 return element.element._compiler_dispatch(self, **kw) 

1946 

1947 def visit_ilike_op_binary(self, binary, operator, **kw): 

1948 escape = binary.modifiers.get("escape", None) 

1949 

1950 return "%s ILIKE %s" % ( 

1951 self.process(binary.left, **kw), 

1952 self.process(binary.right, **kw), 

1953 ) + ( 

1954 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

1955 if escape is not None 

1956 else "" 

1957 ) 

1958 

1959 def visit_not_ilike_op_binary(self, binary, operator, **kw): 

1960 escape = binary.modifiers.get("escape", None) 

1961 return "%s NOT ILIKE %s" % ( 

1962 self.process(binary.left, **kw), 

1963 self.process(binary.right, **kw), 

1964 ) + ( 

1965 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE) 

1966 if escape is not None 

1967 else "" 

1968 ) 

1969 

1970 def _regexp_match(self, base_op, binary, operator, kw): 

1971 flags = binary.modifiers["flags"] 

1972 if flags is None: 

1973 return self._generate_generic_binary( 

1974 binary, " %s " % base_op, **kw 

1975 ) 

1976 if flags == "i": 

1977 return self._generate_generic_binary( 

1978 binary, " %s* " % base_op, **kw 

1979 ) 

1980 return "%s %s CONCAT('(?', %s, ')', %s)" % ( 

1981 self.process(binary.left, **kw), 

1982 base_op, 

1983 self.render_literal_value(flags, sqltypes.STRINGTYPE), 

1984 self.process(binary.right, **kw), 

1985 ) 

1986 

1987 def visit_regexp_match_op_binary(self, binary, operator, **kw): 

1988 return self._regexp_match("~", binary, operator, kw) 

1989 

1990 def visit_not_regexp_match_op_binary(self, binary, operator, **kw): 

1991 return self._regexp_match("!~", binary, operator, kw) 

1992 

1993 def visit_regexp_replace_op_binary(self, binary, operator, **kw): 

1994 string = self.process(binary.left, **kw) 

1995 pattern_replace = self.process(binary.right, **kw) 

1996 flags = binary.modifiers["flags"] 

1997 if flags is None: 

1998 return "REGEXP_REPLACE(%s, %s)" % ( 

1999 string, 

2000 pattern_replace, 

2001 ) 

2002 else: 

2003 return "REGEXP_REPLACE(%s, %s, %s)" % ( 

2004 string, 

2005 pattern_replace, 

2006 self.render_literal_value(flags, sqltypes.STRINGTYPE), 

2007 ) 

2008 

2009 def visit_empty_set_expr(self, element_types, **kw): 

2010 # cast the empty set to the type we are comparing against. if 

2011 # we are comparing against the null type, pick an arbitrary 

2012 # datatype for the empty set 

2013 return "SELECT %s WHERE 1!=1" % ( 

2014 ", ".join( 

2015 "CAST(NULL AS %s)" 

2016 % self.dialect.type_compiler_instance.process( 

2017 INTEGER() if type_._isnull else type_ 

2018 ) 

2019 for type_ in element_types or [INTEGER()] 

2020 ), 

2021 ) 

2022 

2023 def render_literal_value(self, value, type_): 

2024 value = super().render_literal_value(value, type_) 

2025 

2026 if self.dialect._backslash_escapes: 

2027 value = value.replace("\\", "\\\\") 

2028 return value 

2029 

2030 def visit_aggregate_strings_func(self, fn, **kw): 

2031 return "string_agg%s" % self.function_argspec(fn) 

2032 

2033 def visit_sequence(self, seq, **kw): 

2034 return "nextval('%s')" % self.preparer.format_sequence(seq) 

2035 

2036 def limit_clause(self, select, **kw): 

2037 text = "" 

2038 if select._limit_clause is not None: 

2039 text += " \n LIMIT " + self.process(select._limit_clause, **kw) 

2040 if select._offset_clause is not None: 

2041 if select._limit_clause is None: 

2042 text += "\n LIMIT ALL" 

2043 text += " OFFSET " + self.process(select._offset_clause, **kw) 

2044 return text 

2045 

2046 def format_from_hint_text(self, sqltext, table, hint, iscrud): 

2047 if hint.upper() != "ONLY": 

2048 raise exc.CompileError("Unrecognized hint: %r" % hint) 

2049 return "ONLY " + sqltext 

2050 

2051 def get_select_precolumns(self, select, **kw): 

2052 # Do not call super().get_select_precolumns because 

2053 # it will warn/raise when distinct on is present 

2054 if select._distinct or select._distinct_on: 

2055 if select._distinct_on: 

2056 return ( 

2057 "DISTINCT ON (" 

2058 + ", ".join( 

2059 [ 

2060 self.process(col, **kw) 

2061 for col in select._distinct_on 

2062 ] 

2063 ) 

2064 + ") " 

2065 ) 

2066 else: 

2067 return "DISTINCT " 

2068 else: 

2069 return "" 

2070 

2071 def for_update_clause(self, select, **kw): 

2072 if select._for_update_arg.read: 

2073 if select._for_update_arg.key_share: 

2074 tmp = " FOR KEY SHARE" 

2075 else: 

2076 tmp = " FOR SHARE" 

2077 elif select._for_update_arg.key_share: 

2078 tmp = " FOR NO KEY UPDATE" 

2079 else: 

2080 tmp = " FOR UPDATE" 

2081 

2082 if select._for_update_arg.of: 

2083 tables = util.OrderedSet() 

2084 for c in select._for_update_arg.of: 

2085 tables.update(sql_util.surface_selectables_only(c)) 

2086 

2087 of_kw = dict(kw) 

2088 of_kw.update(ashint=True, use_schema=False) 

2089 tmp += " OF " + ", ".join( 

2090 self.process(table, **of_kw) for table in tables 

2091 ) 

2092 

2093 if select._for_update_arg.nowait: 

2094 tmp += " NOWAIT" 

2095 if select._for_update_arg.skip_locked: 

2096 tmp += " SKIP LOCKED" 

2097 

2098 return tmp 

2099 

2100 def visit_substring_func(self, func, **kw): 

2101 s = self.process(func.clauses.clauses[0], **kw) 

2102 start = self.process(func.clauses.clauses[1], **kw) 

2103 if len(func.clauses.clauses) > 2: 

2104 length = self.process(func.clauses.clauses[2], **kw) 

2105 return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length) 

2106 else: 

2107 return "SUBSTRING(%s FROM %s)" % (s, start) 

2108 

2109 def _on_conflict_target(self, clause, **kw): 

2110 if clause.constraint_target is not None: 

2111 # target may be a name of an Index, UniqueConstraint or 

2112 # ExcludeConstraint. While there is a separate 

2113 # "max_identifier_length" for indexes, PostgreSQL uses the same 

2114 # length for all objects so we can use 

2115 # truncate_and_render_constraint_name 

2116 target_text = ( 

2117 "ON CONSTRAINT %s" 

2118 % self.preparer.truncate_and_render_constraint_name( 

2119 clause.constraint_target 

2120 ) 

2121 ) 

2122 elif clause.inferred_target_elements is not None: 

2123 target_text = "(%s)" % ", ".join( 

2124 ( 

2125 self.preparer.quote(c) 

2126 if isinstance(c, str) 

2127 else self.process(c, include_table=False, use_schema=False) 

2128 ) 

2129 for c in clause.inferred_target_elements 

2130 ) 

2131 if clause.inferred_target_whereclause is not None: 

2132 target_text += " WHERE %s" % self.process( 

2133 clause.inferred_target_whereclause, 

2134 include_table=False, 

2135 use_schema=False, 

2136 ) 

2137 else: 

2138 target_text = "" 

2139 

2140 return target_text 

2141 

2142 def visit_on_conflict_do_nothing(self, on_conflict, **kw): 

2143 target_text = self._on_conflict_target(on_conflict, **kw) 

2144 

2145 if target_text: 

2146 return "ON CONFLICT %s DO NOTHING" % target_text 

2147 else: 

2148 return "ON CONFLICT DO NOTHING" 

2149 

2150 def visit_on_conflict_do_update(self, on_conflict, **kw): 

2151 clause = on_conflict 

2152 

2153 target_text = self._on_conflict_target(on_conflict, **kw) 

2154 

2155 action_set_ops = [] 

2156 

2157 set_parameters = dict(clause.update_values_to_set) 

2158 # create a list of column assignment clauses as tuples 

2159 

2160 insert_statement = self.stack[-1]["selectable"] 

2161 cols = insert_statement.table.c 

2162 for c in cols: 

2163 col_key = c.key 

2164 

2165 if col_key in set_parameters: 

2166 value = set_parameters.pop(col_key) 

2167 elif c in set_parameters: 

2168 value = set_parameters.pop(c) 

2169 else: 

2170 continue 

2171 

2172 # TODO: this coercion should be up front. we can't cache 

2173 # SQL constructs with non-bound literals buried in them 

2174 if coercions._is_literal(value): 

2175 value = elements.BindParameter(None, value, type_=c.type) 

2176 

2177 else: 

2178 if ( 

2179 isinstance(value, elements.BindParameter) 

2180 and value.type._isnull 

2181 ): 

2182 value = value._clone() 

2183 value.type = c.type 

2184 value_text = self.process(value.self_group(), use_schema=False) 

2185 

2186 key_text = self.preparer.quote(c.name) 

2187 action_set_ops.append("%s = %s" % (key_text, value_text)) 

2188 

2189 # check for names that don't match columns 

2190 if set_parameters: 

2191 util.warn( 

2192 "Additional column names not matching " 

2193 "any column keys in table '%s': %s" 

2194 % ( 

2195 self.current_executable.table.name, 

2196 (", ".join("'%s'" % c for c in set_parameters)), 

2197 ) 

2198 ) 

2199 for k, v in set_parameters.items(): 

2200 key_text = ( 

2201 self.preparer.quote(k) 

2202 if isinstance(k, str) 

2203 else self.process(k, use_schema=False) 

2204 ) 

2205 value_text = self.process( 

2206 coercions.expect(roles.ExpressionElementRole, v), 

2207 use_schema=False, 

2208 ) 

2209 action_set_ops.append("%s = %s" % (key_text, value_text)) 

2210 

2211 action_text = ", ".join(action_set_ops) 

2212 if clause.update_whereclause is not None: 

2213 action_text += " WHERE %s" % self.process( 

2214 clause.update_whereclause, include_table=True, use_schema=False 

2215 ) 

2216 

2217 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text) 

2218 

2219 def update_from_clause( 

2220 self, update_stmt, from_table, extra_froms, from_hints, **kw 

2221 ): 

2222 kw["asfrom"] = True 

2223 return "FROM " + ", ".join( 

2224 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

2225 for t in extra_froms 

2226 ) 

2227 

2228 def delete_extra_from_clause( 

2229 self, delete_stmt, from_table, extra_froms, from_hints, **kw 

2230 ): 

2231 """Render the DELETE .. USING clause specific to PostgreSQL.""" 

2232 kw["asfrom"] = True 

2233 return "USING " + ", ".join( 

2234 t._compiler_dispatch(self, fromhints=from_hints, **kw) 

2235 for t in extra_froms 

2236 ) 

2237 

2238 def fetch_clause(self, select, **kw): 

2239 # pg requires parens for non literal clauses. It's also required for 

2240 # bind parameters if a ::type casts is used by the driver (asyncpg), 

2241 # so it's easiest to just always add it 

2242 text = "" 

2243 if select._offset_clause is not None: 

2244 text += "\n OFFSET (%s) ROWS" % self.process( 

2245 select._offset_clause, **kw 

2246 ) 

2247 if select._fetch_clause is not None: 

2248 text += "\n FETCH FIRST (%s)%s ROWS %s" % ( 

2249 self.process(select._fetch_clause, **kw), 

2250 " PERCENT" if select._fetch_clause_options["percent"] else "", 

2251 ( 

2252 "WITH TIES" 

2253 if select._fetch_clause_options["with_ties"] 

2254 else "ONLY" 

2255 ), 

2256 ) 

2257 return text 

2258 

2259 

2260class PGDDLCompiler(compiler.DDLCompiler): 

2261 def get_column_specification(self, column, **kwargs): 

2262 colspec = self.preparer.format_column(column) 

2263 impl_type = column.type.dialect_impl(self.dialect) 

2264 if isinstance(impl_type, sqltypes.TypeDecorator): 

2265 impl_type = impl_type.impl 

2266 

2267 has_identity = ( 

2268 column.identity is not None 

2269 and self.dialect.supports_identity_columns 

2270 ) 

2271 

2272 if ( 

2273 column.primary_key 

2274 and column is column.table._autoincrement_column 

2275 and ( 

2276 self.dialect.supports_smallserial 

2277 or not isinstance(impl_type, sqltypes.SmallInteger) 

2278 ) 

2279 and not has_identity 

2280 and ( 

2281 column.default is None 

2282 or ( 

2283 isinstance(column.default, schema.Sequence) 

2284 and column.default.optional 

2285 ) 

2286 ) 

2287 ): 

2288 if isinstance(impl_type, sqltypes.BigInteger): 

2289 colspec += " BIGSERIAL" 

2290 elif isinstance(impl_type, sqltypes.SmallInteger): 

2291 colspec += " SMALLSERIAL" 

2292 else: 

2293 colspec += " SERIAL" 

2294 else: 

2295 colspec += " " + self.dialect.type_compiler_instance.process( 

2296 column.type, 

2297 type_expression=column, 

2298 identifier_preparer=self.preparer, 

2299 ) 

2300 default = self.get_column_default_string(column) 

2301 if default is not None: 

2302 colspec += " DEFAULT " + default 

2303 

2304 if column.computed is not None: 

2305 colspec += " " + self.process(column.computed) 

2306 if has_identity: 

2307 colspec += " " + self.process(column.identity) 

2308 

2309 if not column.nullable and not has_identity: 

2310 colspec += " NOT NULL" 

2311 elif column.nullable and has_identity: 

2312 colspec += " NULL" 

2313 return colspec 

2314 

2315 def _define_constraint_validity(self, constraint): 

2316 not_valid = constraint.dialect_options["postgresql"]["not_valid"] 

2317 return " NOT VALID" if not_valid else "" 

2318 

2319 def _define_include(self, obj): 

2320 includeclause = obj.dialect_options["postgresql"]["include"] 

2321 if not includeclause: 

2322 return "" 

2323 inclusions = [ 

2324 obj.table.c[col] if isinstance(col, str) else col 

2325 for col in includeclause 

2326 ] 

2327 return " INCLUDE (%s)" % ", ".join( 

2328 [self.preparer.quote(c.name) for c in inclusions] 

2329 ) 

2330 

2331 def visit_check_constraint(self, constraint, **kw): 

2332 if constraint._type_bound: 

2333 typ = list(constraint.columns)[0].type 

2334 if ( 

2335 isinstance(typ, sqltypes.ARRAY) 

2336 and isinstance(typ.item_type, sqltypes.Enum) 

2337 and not typ.item_type.native_enum 

2338 ): 

2339 raise exc.CompileError( 

2340 "PostgreSQL dialect cannot produce the CHECK constraint " 

2341 "for ARRAY of non-native ENUM; please specify " 

2342 "create_constraint=False on this Enum datatype." 

2343 ) 

2344 

2345 text = super().visit_check_constraint(constraint) 

2346 text += self._define_constraint_validity(constraint) 

2347 return text 

2348 

2349 def visit_foreign_key_constraint(self, constraint, **kw): 

2350 text = super().visit_foreign_key_constraint(constraint) 

2351 text += self._define_constraint_validity(constraint) 

2352 return text 

2353 

2354 def visit_primary_key_constraint(self, constraint, **kw): 

2355 text = super().visit_primary_key_constraint(constraint) 

2356 text += self._define_include(constraint) 

2357 return text 

2358 

2359 def visit_unique_constraint(self, constraint, **kw): 

2360 text = super().visit_unique_constraint(constraint) 

2361 text += self._define_include(constraint) 

2362 return text 

2363 

2364 @util.memoized_property 

2365 def _fk_ondelete_pattern(self): 

2366 return re.compile( 

2367 r"^(?:RESTRICT|CASCADE|SET (?:NULL|DEFAULT)(?:\s*\(.+\))?" 

2368 r"|NO ACTION)$", 

2369 re.I, 

2370 ) 

2371 

2372 def define_constraint_ondelete_cascade(self, constraint): 

2373 return " ON DELETE %s" % self.preparer.validate_sql_phrase( 

2374 constraint.ondelete, self._fk_ondelete_pattern 

2375 ) 

2376 

2377 def visit_create_enum_type(self, create, **kw): 

2378 type_ = create.element 

2379 

2380 return "CREATE TYPE %s AS ENUM (%s)" % ( 

2381 self.preparer.format_type(type_), 

2382 ", ".join( 

2383 self.sql_compiler.process(sql.literal(e), literal_binds=True) 

2384 for e in type_.enums 

2385 ), 

2386 ) 

2387 

2388 def visit_drop_enum_type(self, drop, **kw): 

2389 type_ = drop.element 

2390 

2391 return "DROP TYPE %s" % (self.preparer.format_type(type_)) 

2392 

2393 def visit_create_domain_type(self, create, **kw): 

2394 domain: DOMAIN = create.element 

2395 

2396 options = [] 

2397 if domain.collation is not None: 

2398 options.append(f"COLLATE {self.preparer.quote(domain.collation)}") 

2399 if domain.default is not None: 

2400 default = self.render_default_string(domain.default) 

2401 options.append(f"DEFAULT {default}") 

2402 if domain.constraint_name is not None: 

2403 name = self.preparer.truncate_and_render_constraint_name( 

2404 domain.constraint_name 

2405 ) 

2406 options.append(f"CONSTRAINT {name}") 

2407 if domain.not_null: 

2408 options.append("NOT NULL") 

2409 if domain.check is not None: 

2410 check = self.sql_compiler.process( 

2411 domain.check, include_table=False, literal_binds=True 

2412 ) 

2413 options.append(f"CHECK ({check})") 

2414 

2415 return ( 

2416 f"CREATE DOMAIN {self.preparer.format_type(domain)} AS " 

2417 f"{self.type_compiler.process(domain.data_type)} " 

2418 f"{' '.join(options)}" 

2419 ) 

2420 

2421 def visit_drop_domain_type(self, drop, **kw): 

2422 domain = drop.element 

2423 return f"DROP DOMAIN {self.preparer.format_type(domain)}" 

2424 

2425 def visit_create_index(self, create, **kw): 

2426 preparer = self.preparer 

2427 index = create.element 

2428 self._verify_index_table(index) 

2429 text = "CREATE " 

2430 if index.unique: 

2431 text += "UNIQUE " 

2432 

2433 text += "INDEX " 

2434 

2435 if self.dialect._supports_create_index_concurrently: 

2436 concurrently = index.dialect_options["postgresql"]["concurrently"] 

2437 if concurrently: 

2438 text += "CONCURRENTLY " 

2439 

2440 if create.if_not_exists: 

2441 text += "IF NOT EXISTS " 

2442 

2443 text += "%s ON %s " % ( 

2444 self._prepared_index_name(index, include_schema=False), 

2445 preparer.format_table(index.table), 

2446 ) 

2447 

2448 using = index.dialect_options["postgresql"]["using"] 

2449 if using: 

2450 text += ( 

2451 "USING %s " 

2452 % self.preparer.validate_sql_phrase(using, IDX_USING).lower() 

2453 ) 

2454 

2455 ops = index.dialect_options["postgresql"]["ops"] 

2456 text += "(%s)" % ( 

2457 ", ".join( 

2458 [ 

2459 self.sql_compiler.process( 

2460 ( 

2461 expr.self_group() 

2462 if not isinstance(expr, expression.ColumnClause) 

2463 else expr 

2464 ), 

2465 include_table=False, 

2466 literal_binds=True, 

2467 ) 

2468 + ( 

2469 (" " + ops[expr.key]) 

2470 if hasattr(expr, "key") and expr.key in ops 

2471 else "" 

2472 ) 

2473 for expr in index.expressions 

2474 ] 

2475 ) 

2476 ) 

2477 

2478 text += self._define_include(index) 

2479 

2480 nulls_not_distinct = index.dialect_options["postgresql"][ 

2481 "nulls_not_distinct" 

2482 ] 

2483 if nulls_not_distinct is True: 

2484 text += " NULLS NOT DISTINCT" 

2485 elif nulls_not_distinct is False: 

2486 text += " NULLS DISTINCT" 

2487 

2488 withclause = index.dialect_options["postgresql"]["with"] 

2489 if withclause: 

2490 text += " WITH (%s)" % ( 

2491 ", ".join( 

2492 [ 

2493 "%s = %s" % storage_parameter 

2494 for storage_parameter in withclause.items() 

2495 ] 

2496 ) 

2497 ) 

2498 

2499 tablespace_name = index.dialect_options["postgresql"]["tablespace"] 

2500 if tablespace_name: 

2501 text += " TABLESPACE %s" % preparer.quote(tablespace_name) 

2502 

2503 whereclause = index.dialect_options["postgresql"]["where"] 

2504 if whereclause is not None: 

2505 whereclause = coercions.expect( 

2506 roles.DDLExpressionRole, whereclause 

2507 ) 

2508 

2509 where_compiled = self.sql_compiler.process( 

2510 whereclause, include_table=False, literal_binds=True 

2511 ) 

2512 text += " WHERE " + where_compiled 

2513 

2514 return text 

2515 

2516 def define_unique_constraint_distinct(self, constraint, **kw): 

2517 nulls_not_distinct = constraint.dialect_options["postgresql"][ 

2518 "nulls_not_distinct" 

2519 ] 

2520 if nulls_not_distinct is True: 

2521 nulls_not_distinct_param = "NULLS NOT DISTINCT " 

2522 elif nulls_not_distinct is False: 

2523 nulls_not_distinct_param = "NULLS DISTINCT " 

2524 else: 

2525 nulls_not_distinct_param = "" 

2526 return nulls_not_distinct_param 

2527 

2528 def visit_drop_index(self, drop, **kw): 

2529 index = drop.element 

2530 

2531 text = "\nDROP INDEX " 

2532 

2533 if self.dialect._supports_drop_index_concurrently: 

2534 concurrently = index.dialect_options["postgresql"]["concurrently"] 

2535 if concurrently: 

2536 text += "CONCURRENTLY " 

2537 

2538 if drop.if_exists: 

2539 text += "IF EXISTS " 

2540 

2541 text += self._prepared_index_name(index, include_schema=True) 

2542 return text 

2543 

2544 def visit_exclude_constraint(self, constraint, **kw): 

2545 text = "" 

2546 if constraint.name is not None: 

2547 text += "CONSTRAINT %s " % self.preparer.format_constraint( 

2548 constraint 

2549 ) 

2550 elements = [] 

2551 kw["include_table"] = False 

2552 kw["literal_binds"] = True 

2553 for expr, name, op in constraint._render_exprs: 

2554 exclude_element = self.sql_compiler.process(expr, **kw) + ( 

2555 (" " + constraint.ops[expr.key]) 

2556 if hasattr(expr, "key") and expr.key in constraint.ops 

2557 else "" 

2558 ) 

2559 

2560 elements.append("%s WITH %s" % (exclude_element, op)) 

2561 text += "EXCLUDE USING %s (%s)" % ( 

2562 self.preparer.validate_sql_phrase( 

2563 constraint.using, IDX_USING 

2564 ).lower(), 

2565 ", ".join(elements), 

2566 ) 

2567 if constraint.where is not None: 

2568 text += " WHERE (%s)" % self.sql_compiler.process( 

2569 constraint.where, literal_binds=True 

2570 ) 

2571 text += self.define_constraint_deferrability(constraint) 

2572 return text 

2573 

2574 def post_create_table(self, table): 

2575 table_opts = [] 

2576 pg_opts = table.dialect_options["postgresql"] 

2577 

2578 inherits = pg_opts.get("inherits") 

2579 if inherits is not None: 

2580 if not isinstance(inherits, (list, tuple)): 

2581 inherits = (inherits,) 

2582 table_opts.append( 

2583 "\n INHERITS ( " 

2584 + ", ".join(self.preparer.quote(name) for name in inherits) 

2585 + " )" 

2586 ) 

2587 

2588 if pg_opts["partition_by"]: 

2589 table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"]) 

2590 

2591 if pg_opts["using"]: 

2592 table_opts.append("\n USING %s" % pg_opts["using"]) 

2593 

2594 if pg_opts["with_oids"] is True: 

2595 table_opts.append("\n WITH OIDS") 

2596 elif pg_opts["with_oids"] is False: 

2597 table_opts.append("\n WITHOUT OIDS") 

2598 

2599 if pg_opts["on_commit"]: 

2600 on_commit_options = pg_opts["on_commit"].replace("_", " ").upper() 

2601 table_opts.append("\n ON COMMIT %s" % on_commit_options) 

2602 

2603 if pg_opts["tablespace"]: 

2604 tablespace_name = pg_opts["tablespace"] 

2605 table_opts.append( 

2606 "\n TABLESPACE %s" % self.preparer.quote(tablespace_name) 

2607 ) 

2608 

2609 return "".join(table_opts) 

2610 

2611 def visit_computed_column(self, generated, **kw): 

2612 if generated.persisted is False: 

2613 raise exc.CompileError( 

2614 "PostrgreSQL computed columns do not support 'virtual' " 

2615 "persistence; set the 'persisted' flag to None or True for " 

2616 "PostgreSQL support." 

2617 ) 

2618 

2619 return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process( 

2620 generated.sqltext, include_table=False, literal_binds=True 

2621 ) 

2622 

2623 def visit_create_sequence(self, create, **kw): 

2624 prefix = None 

2625 if create.element.data_type is not None: 

2626 prefix = " AS %s" % self.type_compiler.process( 

2627 create.element.data_type 

2628 ) 

2629 

2630 return super().visit_create_sequence(create, prefix=prefix, **kw) 

2631 

2632 def _can_comment_on_constraint(self, ddl_instance): 

2633 constraint = ddl_instance.element 

2634 if constraint.name is None: 

2635 raise exc.CompileError( 

2636 f"Can't emit COMMENT ON for constraint {constraint!r}: " 

2637 "it has no name" 

2638 ) 

2639 if constraint.table is None: 

2640 raise exc.CompileError( 

2641 f"Can't emit COMMENT ON for constraint {constraint!r}: " 

2642 "it has no associated table" 

2643 ) 

2644 

2645 def visit_set_constraint_comment(self, create, **kw): 

2646 self._can_comment_on_constraint(create) 

2647 return "COMMENT ON CONSTRAINT %s ON %s IS %s" % ( 

2648 self.preparer.format_constraint(create.element), 

2649 self.preparer.format_table(create.element.table), 

2650 self.sql_compiler.render_literal_value( 

2651 create.element.comment, sqltypes.String() 

2652 ), 

2653 ) 

2654 

2655 def visit_drop_constraint_comment(self, drop, **kw): 

2656 self._can_comment_on_constraint(drop) 

2657 return "COMMENT ON CONSTRAINT %s ON %s IS NULL" % ( 

2658 self.preparer.format_constraint(drop.element), 

2659 self.preparer.format_table(drop.element.table), 

2660 ) 

2661 

2662 

2663class PGTypeCompiler(compiler.GenericTypeCompiler): 

2664 def visit_TSVECTOR(self, type_, **kw): 

2665 return "TSVECTOR" 

2666 

2667 def visit_TSQUERY(self, type_, **kw): 

2668 return "TSQUERY" 

2669 

2670 def visit_INET(self, type_, **kw): 

2671 return "INET" 

2672 

2673 def visit_CIDR(self, type_, **kw): 

2674 return "CIDR" 

2675 

2676 def visit_CITEXT(self, type_, **kw): 

2677 return "CITEXT" 

2678 

2679 def visit_MACADDR(self, type_, **kw): 

2680 return "MACADDR" 

2681 

2682 def visit_MACADDR8(self, type_, **kw): 

2683 return "MACADDR8" 

2684 

2685 def visit_MONEY(self, type_, **kw): 

2686 return "MONEY" 

2687 

2688 def visit_OID(self, type_, **kw): 

2689 return "OID" 

2690 

2691 def visit_REGCONFIG(self, type_, **kw): 

2692 return "REGCONFIG" 

2693 

2694 def visit_REGCLASS(self, type_, **kw): 

2695 return "REGCLASS" 

2696 

2697 def visit_FLOAT(self, type_, **kw): 

2698 if not type_.precision: 

2699 return "FLOAT" 

2700 else: 

2701 return "FLOAT(%(precision)s)" % {"precision": type_.precision} 

2702 

2703 def visit_double(self, type_, **kw): 

2704 return self.visit_DOUBLE_PRECISION(type, **kw) 

2705 

2706 def visit_BIGINT(self, type_, **kw): 

2707 return "BIGINT" 

2708 

2709 def visit_HSTORE(self, type_, **kw): 

2710 return "HSTORE" 

2711 

2712 def visit_JSON(self, type_, **kw): 

2713 return "JSON" 

2714 

2715 def visit_JSONB(self, type_, **kw): 

2716 return "JSONB" 

2717 

2718 def visit_INT4MULTIRANGE(self, type_, **kw): 

2719 return "INT4MULTIRANGE" 

2720 

2721 def visit_INT8MULTIRANGE(self, type_, **kw): 

2722 return "INT8MULTIRANGE" 

2723 

2724 def visit_NUMMULTIRANGE(self, type_, **kw): 

2725 return "NUMMULTIRANGE" 

2726 

2727 def visit_DATEMULTIRANGE(self, type_, **kw): 

2728 return "DATEMULTIRANGE" 

2729 

2730 def visit_TSMULTIRANGE(self, type_, **kw): 

2731 return "TSMULTIRANGE" 

2732 

2733 def visit_TSTZMULTIRANGE(self, type_, **kw): 

2734 return "TSTZMULTIRANGE" 

2735 

2736 def visit_INT4RANGE(self, type_, **kw): 

2737 return "INT4RANGE" 

2738 

2739 def visit_INT8RANGE(self, type_, **kw): 

2740 return "INT8RANGE" 

2741 

2742 def visit_NUMRANGE(self, type_, **kw): 

2743 return "NUMRANGE" 

2744 

2745 def visit_DATERANGE(self, type_, **kw): 

2746 return "DATERANGE" 

2747 

2748 def visit_TSRANGE(self, type_, **kw): 

2749 return "TSRANGE" 

2750 

2751 def visit_TSTZRANGE(self, type_, **kw): 

2752 return "TSTZRANGE" 

2753 

2754 def visit_json_int_index(self, type_, **kw): 

2755 return "INT" 

2756 

2757 def visit_json_str_index(self, type_, **kw): 

2758 return "TEXT" 

2759 

2760 def visit_datetime(self, type_, **kw): 

2761 return self.visit_TIMESTAMP(type_, **kw) 

2762 

2763 def visit_enum(self, type_, **kw): 

2764 if not type_.native_enum or not self.dialect.supports_native_enum: 

2765 return super().visit_enum(type_, **kw) 

2766 else: 

2767 return self.visit_ENUM(type_, **kw) 

2768 

2769 def visit_ENUM(self, type_, identifier_preparer=None, **kw): 

2770 if identifier_preparer is None: 

2771 identifier_preparer = self.dialect.identifier_preparer 

2772 return identifier_preparer.format_type(type_) 

2773 

2774 def visit_DOMAIN(self, type_, identifier_preparer=None, **kw): 

2775 if identifier_preparer is None: 

2776 identifier_preparer = self.dialect.identifier_preparer 

2777 return identifier_preparer.format_type(type_) 

2778 

2779 def visit_TIMESTAMP(self, type_, **kw): 

2780 return "TIMESTAMP%s %s" % ( 

2781 ( 

2782 "(%d)" % type_.precision 

2783 if getattr(type_, "precision", None) is not None 

2784 else "" 

2785 ), 

2786 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", 

2787 ) 

2788 

2789 def visit_TIME(self, type_, **kw): 

2790 return "TIME%s %s" % ( 

2791 ( 

2792 "(%d)" % type_.precision 

2793 if getattr(type_, "precision", None) is not None 

2794 else "" 

2795 ), 

2796 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE", 

2797 ) 

2798 

2799 def visit_INTERVAL(self, type_, **kw): 

2800 text = "INTERVAL" 

2801 if type_.fields is not None: 

2802 text += " " + type_.fields 

2803 if type_.precision is not None: 

2804 text += " (%d)" % type_.precision 

2805 return text 

2806 

2807 def visit_BIT(self, type_, **kw): 

2808 if type_.varying: 

2809 compiled = "BIT VARYING" 

2810 if type_.length is not None: 

2811 compiled += "(%d)" % type_.length 

2812 else: 

2813 compiled = "BIT(%d)" % type_.length 

2814 return compiled 

2815 

2816 def visit_uuid(self, type_, **kw): 

2817 if type_.native_uuid: 

2818 return self.visit_UUID(type_, **kw) 

2819 else: 

2820 return super().visit_uuid(type_, **kw) 

2821 

2822 def visit_UUID(self, type_, **kw): 

2823 return "UUID" 

2824 

2825 def visit_large_binary(self, type_, **kw): 

2826 return self.visit_BYTEA(type_, **kw) 

2827 

2828 def visit_BYTEA(self, type_, **kw): 

2829 return "BYTEA" 

2830 

2831 def visit_ARRAY(self, type_, **kw): 

2832 inner = self.process(type_.item_type, **kw) 

2833 return re.sub( 

2834 r"((?: COLLATE.*)?)$", 

2835 ( 

2836 r"%s\1" 

2837 % ( 

2838 "[]" 

2839 * (type_.dimensions if type_.dimensions is not None else 1) 

2840 ) 

2841 ), 

2842 inner, 

2843 count=1, 

2844 ) 

2845 

2846 def visit_json_path(self, type_, **kw): 

2847 return self.visit_JSONPATH(type_, **kw) 

2848 

2849 def visit_JSONPATH(self, type_, **kw): 

2850 return "JSONPATH" 

2851 

2852 

2853class PGIdentifierPreparer(compiler.IdentifierPreparer): 

2854 reserved_words = RESERVED_WORDS 

2855 

2856 def _unquote_identifier(self, value): 

2857 if value[0] == self.initial_quote: 

2858 value = value[1:-1].replace( 

2859 self.escape_to_quote, self.escape_quote 

2860 ) 

2861 return value 

2862 

2863 def format_type(self, type_, use_schema=True): 

2864 if not type_.name: 

2865 raise exc.CompileError( 

2866 f"PostgreSQL {type_.__class__.__name__} type requires a name." 

2867 ) 

2868 

2869 name = self.quote(type_.name) 

2870 effective_schema = self.schema_for_object(type_) 

2871 

2872 if ( 

2873 not self.omit_schema 

2874 and use_schema 

2875 and effective_schema is not None 

2876 ): 

2877 name = f"{self.quote_schema(effective_schema)}.{name}" 

2878 return name 

2879 

2880 

2881class ReflectedNamedType(TypedDict): 

2882 """Represents a reflected named type.""" 

2883 

2884 name: str 

2885 """Name of the type.""" 

2886 schema: str 

2887 """The schema of the type.""" 

2888 visible: bool 

2889 """Indicates if this type is in the current search path.""" 

2890 

2891 

2892class ReflectedDomainConstraint(TypedDict): 

2893 """Represents a reflect check constraint of a domain.""" 

2894 

2895 name: str 

2896 """Name of the constraint.""" 

2897 check: str 

2898 """The check constraint text.""" 

2899 

2900 

2901class ReflectedDomain(ReflectedNamedType): 

2902 """Represents a reflected enum.""" 

2903 

2904 type: str 

2905 """The string name of the underlying data type of the domain.""" 

2906 nullable: bool 

2907 """Indicates if the domain allows null or not.""" 

2908 default: Optional[str] 

2909 """The string representation of the default value of this domain 

2910 or ``None`` if none present. 

2911 """ 

2912 constraints: List[ReflectedDomainConstraint] 

2913 """The constraints defined in the domain, if any. 

2914 The constraint are in order of evaluation by postgresql. 

2915 """ 

2916 collation: Optional[str] 

2917 """The collation for the domain.""" 

2918 

2919 

2920class ReflectedEnum(ReflectedNamedType): 

2921 """Represents a reflected enum.""" 

2922 

2923 labels: List[str] 

2924 """The labels that compose the enum.""" 

2925 

2926 

2927class PGInspector(reflection.Inspector): 

2928 dialect: PGDialect 

2929 

2930 def get_table_oid( 

2931 self, table_name: str, schema: Optional[str] = None 

2932 ) -> int: 

2933 """Return the OID for the given table name. 

2934 

2935 :param table_name: string name of the table. For special quoting, 

2936 use :class:`.quoted_name`. 

2937 

2938 :param schema: string schema name; if omitted, uses the default schema 

2939 of the database connection. For special quoting, 

2940 use :class:`.quoted_name`. 

2941 

2942 """ 

2943 

2944 with self._operation_context() as conn: 

2945 return self.dialect.get_table_oid( 

2946 conn, table_name, schema, info_cache=self.info_cache 

2947 ) 

2948 

2949 def get_domains( 

2950 self, schema: Optional[str] = None 

2951 ) -> List[ReflectedDomain]: 

2952 """Return a list of DOMAIN objects. 

2953 

2954 Each member is a dictionary containing these fields: 

2955 

2956 * name - name of the domain 

2957 * schema - the schema name for the domain. 

2958 * visible - boolean, whether or not this domain is visible 

2959 in the default search path. 

2960 * type - the type defined by this domain. 

2961 * nullable - Indicates if this domain can be ``NULL``. 

2962 * default - The default value of the domain or ``None`` if the 

2963 domain has no default. 

2964 * constraints - A list of dict wit the constraint defined by this 

2965 domain. Each element constaints two keys: ``name`` of the 

2966 constraint and ``check`` with the constraint text. 

2967 

2968 :param schema: schema name. If None, the default schema 

2969 (typically 'public') is used. May also be set to ``'*'`` to 

2970 indicate load domains for all schemas. 

2971 

2972 .. versionadded:: 2.0 

2973 

2974 """ 

2975 with self._operation_context() as conn: 

2976 return self.dialect._load_domains( 

2977 conn, schema, info_cache=self.info_cache 

2978 ) 

2979 

2980 def get_enums(self, schema: Optional[str] = None) -> List[ReflectedEnum]: 

2981 """Return a list of ENUM objects. 

2982 

2983 Each member is a dictionary containing these fields: 

2984 

2985 * name - name of the enum 

2986 * schema - the schema name for the enum. 

2987 * visible - boolean, whether or not this enum is visible 

2988 in the default search path. 

2989 * labels - a list of string labels that apply to the enum. 

2990 

2991 :param schema: schema name. If None, the default schema 

2992 (typically 'public') is used. May also be set to ``'*'`` to 

2993 indicate load enums for all schemas. 

2994 

2995 """ 

2996 with self._operation_context() as conn: 

2997 return self.dialect._load_enums( 

2998 conn, schema, info_cache=self.info_cache 

2999 ) 

3000 

3001 def get_foreign_table_names( 

3002 self, schema: Optional[str] = None 

3003 ) -> List[str]: 

3004 """Return a list of FOREIGN TABLE names. 

3005 

3006 Behavior is similar to that of 

3007 :meth:`_reflection.Inspector.get_table_names`, 

3008 except that the list is limited to those tables that report a 

3009 ``relkind`` value of ``f``. 

3010 

3011 """ 

3012 with self._operation_context() as conn: 

3013 return self.dialect._get_foreign_table_names( 

3014 conn, schema, info_cache=self.info_cache 

3015 ) 

3016 

3017 def has_type( 

3018 self, type_name: str, schema: Optional[str] = None, **kw: Any 

3019 ) -> bool: 

3020 """Return if the database has the specified type in the provided 

3021 schema. 

3022 

3023 :param type_name: the type to check. 

3024 :param schema: schema name. If None, the default schema 

3025 (typically 'public') is used. May also be set to ``'*'`` to 

3026 check in all schemas. 

3027 

3028 .. versionadded:: 2.0 

3029 

3030 """ 

3031 with self._operation_context() as conn: 

3032 return self.dialect.has_type( 

3033 conn, type_name, schema, info_cache=self.info_cache 

3034 ) 

3035 

3036 

3037class PGExecutionContext(default.DefaultExecutionContext): 

3038 def fire_sequence(self, seq, type_): 

3039 return self._execute_scalar( 

3040 ( 

3041 "select nextval('%s')" 

3042 % self.identifier_preparer.format_sequence(seq) 

3043 ), 

3044 type_, 

3045 ) 

3046 

3047 def get_insert_default(self, column): 

3048 if column.primary_key and column is column.table._autoincrement_column: 

3049 if column.server_default and column.server_default.has_argument: 

3050 # pre-execute passive defaults on primary key columns 

3051 return self._execute_scalar( 

3052 "select %s" % column.server_default.arg, column.type 

3053 ) 

3054 

3055 elif column.default is None or ( 

3056 column.default.is_sequence and column.default.optional 

3057 ): 

3058 # execute the sequence associated with a SERIAL primary 

3059 # key column. for non-primary-key SERIAL, the ID just 

3060 # generates server side. 

3061 

3062 try: 

3063 seq_name = column._postgresql_seq_name 

3064 except AttributeError: 

3065 tab = column.table.name 

3066 col = column.name 

3067 tab = tab[0 : 29 + max(0, (29 - len(col)))] 

3068 col = col[0 : 29 + max(0, (29 - len(tab)))] 

3069 name = "%s_%s_seq" % (tab, col) 

3070 column._postgresql_seq_name = seq_name = name 

3071 

3072 if column.table is not None: 

3073 effective_schema = self.connection.schema_for_object( 

3074 column.table 

3075 ) 

3076 else: 

3077 effective_schema = None 

3078 

3079 if effective_schema is not None: 

3080 exc = 'select nextval(\'"%s"."%s"\')' % ( 

3081 effective_schema, 

3082 seq_name, 

3083 ) 

3084 else: 

3085 exc = "select nextval('\"%s\"')" % (seq_name,) 

3086 

3087 return self._execute_scalar(exc, column.type) 

3088 

3089 return super().get_insert_default(column) 

3090 

3091 

3092class PGReadOnlyConnectionCharacteristic( 

3093 characteristics.ConnectionCharacteristic 

3094): 

3095 transactional = True 

3096 

3097 def reset_characteristic(self, dialect, dbapi_conn): 

3098 dialect.set_readonly(dbapi_conn, False) 

3099 

3100 def set_characteristic(self, dialect, dbapi_conn, value): 

3101 dialect.set_readonly(dbapi_conn, value) 

3102 

3103 def get_characteristic(self, dialect, dbapi_conn): 

3104 return dialect.get_readonly(dbapi_conn) 

3105 

3106 

3107class PGDeferrableConnectionCharacteristic( 

3108 characteristics.ConnectionCharacteristic 

3109): 

3110 transactional = True 

3111 

3112 def reset_characteristic(self, dialect, dbapi_conn): 

3113 dialect.set_deferrable(dbapi_conn, False) 

3114 

3115 def set_characteristic(self, dialect, dbapi_conn, value): 

3116 dialect.set_deferrable(dbapi_conn, value) 

3117 

3118 def get_characteristic(self, dialect, dbapi_conn): 

3119 return dialect.get_deferrable(dbapi_conn) 

3120 

3121 

3122class PGDialect(default.DefaultDialect): 

3123 name = "postgresql" 

3124 supports_statement_cache = True 

3125 supports_alter = True 

3126 max_identifier_length = 63 

3127 supports_sane_rowcount = True 

3128 

3129 bind_typing = interfaces.BindTyping.RENDER_CASTS 

3130 

3131 supports_native_enum = True 

3132 supports_native_boolean = True 

3133 supports_native_uuid = True 

3134 supports_smallserial = True 

3135 

3136 supports_sequences = True 

3137 sequences_optional = True 

3138 preexecute_autoincrement_sequences = True 

3139 postfetch_lastrowid = False 

3140 use_insertmanyvalues = True 

3141 

3142 returns_native_bytes = True 

3143 

3144 insertmanyvalues_implicit_sentinel = ( 

3145 InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT 

3146 | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT 

3147 | InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS 

3148 ) 

3149 

3150 supports_comments = True 

3151 supports_constraint_comments = True 

3152 supports_default_values = True 

3153 

3154 supports_default_metavalue = True 

3155 

3156 supports_empty_insert = False 

3157 supports_multivalues_insert = True 

3158 

3159 supports_identity_columns = True 

3160 

3161 default_paramstyle = "pyformat" 

3162 ischema_names = ischema_names 

3163 colspecs = colspecs 

3164 

3165 statement_compiler = PGCompiler 

3166 ddl_compiler = PGDDLCompiler 

3167 type_compiler_cls = PGTypeCompiler 

3168 preparer = PGIdentifierPreparer 

3169 execution_ctx_cls = PGExecutionContext 

3170 inspector = PGInspector 

3171 

3172 update_returning = True 

3173 delete_returning = True 

3174 insert_returning = True 

3175 update_returning_multifrom = True 

3176 delete_returning_multifrom = True 

3177 

3178 connection_characteristics = ( 

3179 default.DefaultDialect.connection_characteristics 

3180 ) 

3181 connection_characteristics = connection_characteristics.union( 

3182 { 

3183 "postgresql_readonly": PGReadOnlyConnectionCharacteristic(), 

3184 "postgresql_deferrable": PGDeferrableConnectionCharacteristic(), 

3185 } 

3186 ) 

3187 

3188 construct_arguments = [ 

3189 ( 

3190 schema.Index, 

3191 { 

3192 "using": False, 

3193 "include": None, 

3194 "where": None, 

3195 "ops": {}, 

3196 "concurrently": False, 

3197 "with": {}, 

3198 "tablespace": None, 

3199 "nulls_not_distinct": None, 

3200 }, 

3201 ), 

3202 ( 

3203 schema.Table, 

3204 { 

3205 "ignore_search_path": False, 

3206 "tablespace": None, 

3207 "partition_by": None, 

3208 "with_oids": None, 

3209 "on_commit": None, 

3210 "inherits": None, 

3211 "using": None, 

3212 }, 

3213 ), 

3214 ( 

3215 schema.CheckConstraint, 

3216 { 

3217 "not_valid": False, 

3218 }, 

3219 ), 

3220 ( 

3221 schema.ForeignKeyConstraint, 

3222 { 

3223 "not_valid": False, 

3224 }, 

3225 ), 

3226 ( 

3227 schema.PrimaryKeyConstraint, 

3228 {"include": None}, 

3229 ), 

3230 ( 

3231 schema.UniqueConstraint, 

3232 { 

3233 "include": None, 

3234 "nulls_not_distinct": None, 

3235 }, 

3236 ), 

3237 ] 

3238 

3239 reflection_options = ("postgresql_ignore_search_path",) 

3240 

3241 _backslash_escapes = True 

3242 _supports_create_index_concurrently = True 

3243 _supports_drop_index_concurrently = True 

3244 _supports_jsonb_subscripting = True 

3245 

3246 def __init__( 

3247 self, 

3248 native_inet_types=None, 

3249 json_serializer=None, 

3250 json_deserializer=None, 

3251 **kwargs, 

3252 ): 

3253 default.DefaultDialect.__init__(self, **kwargs) 

3254 

3255 self._native_inet_types = native_inet_types 

3256 self._json_deserializer = json_deserializer 

3257 self._json_serializer = json_serializer 

3258 

3259 def initialize(self, connection): 

3260 super().initialize(connection) 

3261 

3262 # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689 

3263 self.supports_smallserial = self.server_version_info >= (9, 2) 

3264 

3265 self._set_backslash_escapes(connection) 

3266 

3267 self._supports_drop_index_concurrently = self.server_version_info >= ( 

3268 9, 

3269 2, 

3270 ) 

3271 self.supports_identity_columns = self.server_version_info >= (10,) 

3272 

3273 self._supports_jsonb_subscripting = self.server_version_info >= (14,) 

3274 

3275 def get_isolation_level_values(self, dbapi_conn): 

3276 # note the generic dialect doesn't have AUTOCOMMIT, however 

3277 # all postgresql dialects should include AUTOCOMMIT. 

3278 return ( 

3279 "SERIALIZABLE", 

3280 "READ UNCOMMITTED", 

3281 "READ COMMITTED", 

3282 "REPEATABLE READ", 

3283 ) 

3284 

3285 def set_isolation_level(self, dbapi_connection, level): 

3286 cursor = dbapi_connection.cursor() 

3287 cursor.execute( 

3288 "SET SESSION CHARACTERISTICS AS TRANSACTION " 

3289 f"ISOLATION LEVEL {level}" 

3290 ) 

3291 cursor.execute("COMMIT") 

3292 cursor.close() 

3293 

3294 def get_isolation_level(self, dbapi_connection): 

3295 cursor = dbapi_connection.cursor() 

3296 cursor.execute("show transaction isolation level") 

3297 val = cursor.fetchone()[0] 

3298 cursor.close() 

3299 return val.upper() 

3300 

3301 def set_readonly(self, connection, value): 

3302 raise NotImplementedError() 

3303 

3304 def get_readonly(self, connection): 

3305 raise NotImplementedError() 

3306 

3307 def set_deferrable(self, connection, value): 

3308 raise NotImplementedError() 

3309 

3310 def get_deferrable(self, connection): 

3311 raise NotImplementedError() 

3312 

3313 def _split_multihost_from_url(self, url: URL) -> Union[ 

3314 Tuple[None, None], 

3315 Tuple[Tuple[Optional[str], ...], Tuple[Optional[int], ...]], 

3316 ]: 

3317 hosts: Optional[Tuple[Optional[str], ...]] = None 

3318 ports_str: Union[str, Tuple[Optional[str], ...], None] = None 

3319 

3320 integrated_multihost = False 

3321 

3322 if "host" in url.query: 

3323 if isinstance(url.query["host"], (list, tuple)): 

3324 integrated_multihost = True 

3325 hosts, ports_str = zip( 

3326 *[ 

3327 token.split(":") if ":" in token else (token, None) 

3328 for token in url.query["host"] 

3329 ] 

3330 ) 

3331 

3332 elif isinstance(url.query["host"], str): 

3333 hosts = tuple(url.query["host"].split(",")) 

3334 

3335 if ( 

3336 "port" not in url.query 

3337 and len(hosts) == 1 

3338 and ":" in hosts[0] 

3339 ): 

3340 # internet host is alphanumeric plus dots or hyphens. 

3341 # this is essentially rfc1123, which refers to rfc952. 

3342 # https://stackoverflow.com/questions/3523028/ 

3343 # valid-characters-of-a-hostname 

3344 host_port_match = re.match( 

3345 r"^([a-zA-Z0-9\-\.]*)(?:\:(\d*))?$", hosts[0] 

3346 ) 

3347 if host_port_match: 

3348 integrated_multihost = True 

3349 h, p = host_port_match.group(1, 2) 

3350 if TYPE_CHECKING: 

3351 assert isinstance(h, str) 

3352 assert isinstance(p, str) 

3353 hosts = (h,) 

3354 ports_str = cast( 

3355 "Tuple[Optional[str], ...]", (p,) if p else (None,) 

3356 ) 

3357 

3358 if "port" in url.query: 

3359 if integrated_multihost: 

3360 raise exc.ArgumentError( 

3361 "Can't mix 'multihost' formats together; use " 

3362 '"host=h1,h2,h3&port=p1,p2,p3" or ' 

3363 '"host=h1:p1&host=h2:p2&host=h3:p3" separately' 

3364 ) 

3365 if isinstance(url.query["port"], (list, tuple)): 

3366 ports_str = url.query["port"] 

3367 elif isinstance(url.query["port"], str): 

3368 ports_str = tuple(url.query["port"].split(",")) 

3369 

3370 ports: Optional[Tuple[Optional[int], ...]] = None 

3371 

3372 if ports_str: 

3373 try: 

3374 ports = tuple(int(x) if x else None for x in ports_str) 

3375 except ValueError: 

3376 raise exc.ArgumentError( 

3377 f"Received non-integer port arguments: {ports_str}" 

3378 ) from None 

3379 

3380 if ports and ( 

3381 (not hosts and len(ports) > 1) 

3382 or ( 

3383 hosts 

3384 and ports 

3385 and len(hosts) != len(ports) 

3386 and (len(hosts) > 1 or len(ports) > 1) 

3387 ) 

3388 ): 

3389 raise exc.ArgumentError("number of hosts and ports don't match") 

3390 

3391 if hosts is not None: 

3392 if ports is None: 

3393 ports = tuple(None for _ in hosts) 

3394 

3395 return hosts, ports # type: ignore 

3396 

3397 def do_begin_twophase(self, connection, xid): 

3398 self.do_begin(connection.connection) 

3399 

3400 def do_prepare_twophase(self, connection, xid): 

3401 connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid) 

3402 

3403 def do_rollback_twophase( 

3404 self, connection, xid, is_prepared=True, recover=False 

3405 ): 

3406 if is_prepared: 

3407 if recover: 

3408 # FIXME: ugly hack to get out of transaction 

3409 # context when committing recoverable transactions 

3410 # Must find out a way how to make the dbapi not 

3411 # open a transaction. 

3412 connection.exec_driver_sql("ROLLBACK") 

3413 connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid) 

3414 connection.exec_driver_sql("BEGIN") 

3415 self.do_rollback(connection.connection) 

3416 else: 

3417 self.do_rollback(connection.connection) 

3418 

3419 def do_commit_twophase( 

3420 self, connection, xid, is_prepared=True, recover=False 

3421 ): 

3422 if is_prepared: 

3423 if recover: 

3424 connection.exec_driver_sql("ROLLBACK") 

3425 connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid) 

3426 connection.exec_driver_sql("BEGIN") 

3427 self.do_rollback(connection.connection) 

3428 else: 

3429 self.do_commit(connection.connection) 

3430 

3431 def do_recover_twophase(self, connection): 

3432 return connection.scalars( 

3433 sql.text("SELECT gid FROM pg_prepared_xacts") 

3434 ).all() 

3435 

3436 def _get_default_schema_name(self, connection): 

3437 return connection.exec_driver_sql("select current_schema()").scalar() 

3438 

3439 @reflection.cache 

3440 def has_schema(self, connection, schema, **kw): 

3441 query = select(pg_catalog.pg_namespace.c.nspname).where( 

3442 pg_catalog.pg_namespace.c.nspname == schema 

3443 ) 

3444 return bool(connection.scalar(query)) 

3445 

3446 def _pg_class_filter_scope_schema( 

3447 self, query, schema, scope, pg_class_table=None 

3448 ): 

3449 if pg_class_table is None: 

3450 pg_class_table = pg_catalog.pg_class 

3451 query = query.join( 

3452 pg_catalog.pg_namespace, 

3453 pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace, 

3454 ) 

3455 

3456 if scope is ObjectScope.DEFAULT: 

3457 query = query.where(pg_class_table.c.relpersistence != "t") 

3458 elif scope is ObjectScope.TEMPORARY: 

3459 query = query.where(pg_class_table.c.relpersistence == "t") 

3460 

3461 if schema is None: 

3462 query = query.where( 

3463 pg_catalog.pg_table_is_visible(pg_class_table.c.oid), 

3464 # ignore pg_catalog schema 

3465 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

3466 ) 

3467 else: 

3468 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

3469 return query 

3470 

3471 def _pg_class_relkind_condition(self, relkinds, pg_class_table=None): 

3472 if pg_class_table is None: 

3473 pg_class_table = pg_catalog.pg_class 

3474 # uses the any form instead of in otherwise postgresql complaings 

3475 # that 'IN could not convert type character to "char"' 

3476 return pg_class_table.c.relkind == sql.any_(_array.array(relkinds)) 

3477 

3478 @lru_cache() 

3479 def _has_table_query(self, schema): 

3480 query = select(pg_catalog.pg_class.c.relname).where( 

3481 pg_catalog.pg_class.c.relname == bindparam("table_name"), 

3482 self._pg_class_relkind_condition( 

3483 pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3484 ), 

3485 ) 

3486 return self._pg_class_filter_scope_schema( 

3487 query, schema, scope=ObjectScope.ANY 

3488 ) 

3489 

3490 @reflection.cache 

3491 def has_table(self, connection, table_name, schema=None, **kw): 

3492 self._ensure_has_table_connection(connection) 

3493 query = self._has_table_query(schema) 

3494 return bool(connection.scalar(query, {"table_name": table_name})) 

3495 

3496 @reflection.cache 

3497 def has_sequence(self, connection, sequence_name, schema=None, **kw): 

3498 query = select(pg_catalog.pg_class.c.relname).where( 

3499 pg_catalog.pg_class.c.relkind == "S", 

3500 pg_catalog.pg_class.c.relname == sequence_name, 

3501 ) 

3502 query = self._pg_class_filter_scope_schema( 

3503 query, schema, scope=ObjectScope.ANY 

3504 ) 

3505 return bool(connection.scalar(query)) 

3506 

3507 @reflection.cache 

3508 def has_type(self, connection, type_name, schema=None, **kw): 

3509 query = ( 

3510 select(pg_catalog.pg_type.c.typname) 

3511 .join( 

3512 pg_catalog.pg_namespace, 

3513 pg_catalog.pg_namespace.c.oid 

3514 == pg_catalog.pg_type.c.typnamespace, 

3515 ) 

3516 .where(pg_catalog.pg_type.c.typname == type_name) 

3517 ) 

3518 if schema is None: 

3519 query = query.where( 

3520 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), 

3521 # ignore pg_catalog schema 

3522 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

3523 ) 

3524 elif schema != "*": 

3525 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

3526 

3527 return bool(connection.scalar(query)) 

3528 

3529 def _get_server_version_info(self, connection): 

3530 v = connection.exec_driver_sql("select pg_catalog.version()").scalar() 

3531 m = re.match( 

3532 r".*(?:PostgreSQL|EnterpriseDB) " 

3533 r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?", 

3534 v, 

3535 ) 

3536 if not m: 

3537 raise AssertionError( 

3538 "Could not determine version from string '%s'" % v 

3539 ) 

3540 return tuple([int(x) for x in m.group(1, 2, 3) if x is not None]) 

3541 

3542 @reflection.cache 

3543 def get_table_oid(self, connection, table_name, schema=None, **kw): 

3544 """Fetch the oid for schema.table_name.""" 

3545 query = select(pg_catalog.pg_class.c.oid).where( 

3546 pg_catalog.pg_class.c.relname == table_name, 

3547 self._pg_class_relkind_condition( 

3548 pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3549 ), 

3550 ) 

3551 query = self._pg_class_filter_scope_schema( 

3552 query, schema, scope=ObjectScope.ANY 

3553 ) 

3554 table_oid = connection.scalar(query) 

3555 if table_oid is None: 

3556 raise exc.NoSuchTableError( 

3557 f"{schema}.{table_name}" if schema else table_name 

3558 ) 

3559 return table_oid 

3560 

3561 @reflection.cache 

3562 def get_schema_names(self, connection, **kw): 

3563 query = ( 

3564 select(pg_catalog.pg_namespace.c.nspname) 

3565 .where(pg_catalog.pg_namespace.c.nspname.not_like("pg_%")) 

3566 .order_by(pg_catalog.pg_namespace.c.nspname) 

3567 ) 

3568 return connection.scalars(query).all() 

3569 

3570 def _get_relnames_for_relkinds(self, connection, schema, relkinds, scope): 

3571 query = select(pg_catalog.pg_class.c.relname).where( 

3572 self._pg_class_relkind_condition(relkinds) 

3573 ) 

3574 query = self._pg_class_filter_scope_schema(query, schema, scope=scope) 

3575 return connection.scalars(query).all() 

3576 

3577 @reflection.cache 

3578 def get_table_names(self, connection, schema=None, **kw): 

3579 return self._get_relnames_for_relkinds( 

3580 connection, 

3581 schema, 

3582 pg_catalog.RELKINDS_TABLE_NO_FOREIGN, 

3583 scope=ObjectScope.DEFAULT, 

3584 ) 

3585 

3586 @reflection.cache 

3587 def get_temp_table_names(self, connection, **kw): 

3588 return self._get_relnames_for_relkinds( 

3589 connection, 

3590 schema=None, 

3591 relkinds=pg_catalog.RELKINDS_TABLE_NO_FOREIGN, 

3592 scope=ObjectScope.TEMPORARY, 

3593 ) 

3594 

3595 @reflection.cache 

3596 def _get_foreign_table_names(self, connection, schema=None, **kw): 

3597 return self._get_relnames_for_relkinds( 

3598 connection, schema, relkinds=("f",), scope=ObjectScope.ANY 

3599 ) 

3600 

3601 @reflection.cache 

3602 def get_view_names(self, connection, schema=None, **kw): 

3603 return self._get_relnames_for_relkinds( 

3604 connection, 

3605 schema, 

3606 pg_catalog.RELKINDS_VIEW, 

3607 scope=ObjectScope.DEFAULT, 

3608 ) 

3609 

3610 @reflection.cache 

3611 def get_materialized_view_names(self, connection, schema=None, **kw): 

3612 return self._get_relnames_for_relkinds( 

3613 connection, 

3614 schema, 

3615 pg_catalog.RELKINDS_MAT_VIEW, 

3616 scope=ObjectScope.DEFAULT, 

3617 ) 

3618 

3619 @reflection.cache 

3620 def get_temp_view_names(self, connection, schema=None, **kw): 

3621 return self._get_relnames_for_relkinds( 

3622 connection, 

3623 schema, 

3624 # NOTE: do not include temp materialzied views (that do not 

3625 # seem to be a thing at least up to version 14) 

3626 pg_catalog.RELKINDS_VIEW, 

3627 scope=ObjectScope.TEMPORARY, 

3628 ) 

3629 

3630 @reflection.cache 

3631 def get_sequence_names(self, connection, schema=None, **kw): 

3632 return self._get_relnames_for_relkinds( 

3633 connection, schema, relkinds=("S",), scope=ObjectScope.ANY 

3634 ) 

3635 

3636 @reflection.cache 

3637 def get_view_definition(self, connection, view_name, schema=None, **kw): 

3638 query = ( 

3639 select(pg_catalog.pg_get_viewdef(pg_catalog.pg_class.c.oid)) 

3640 .select_from(pg_catalog.pg_class) 

3641 .where( 

3642 pg_catalog.pg_class.c.relname == view_name, 

3643 self._pg_class_relkind_condition( 

3644 pg_catalog.RELKINDS_VIEW + pg_catalog.RELKINDS_MAT_VIEW 

3645 ), 

3646 ) 

3647 ) 

3648 query = self._pg_class_filter_scope_schema( 

3649 query, schema, scope=ObjectScope.ANY 

3650 ) 

3651 res = connection.scalar(query) 

3652 if res is None: 

3653 raise exc.NoSuchTableError( 

3654 f"{schema}.{view_name}" if schema else view_name 

3655 ) 

3656 else: 

3657 return res 

3658 

3659 def _value_or_raise(self, data, table, schema): 

3660 try: 

3661 return dict(data)[(schema, table)] 

3662 except KeyError: 

3663 raise exc.NoSuchTableError( 

3664 f"{schema}.{table}" if schema else table 

3665 ) from None 

3666 

3667 def _prepare_filter_names(self, filter_names): 

3668 if filter_names: 

3669 return True, {"filter_names": filter_names} 

3670 else: 

3671 return False, {} 

3672 

3673 def _kind_to_relkinds(self, kind: ObjectKind) -> Tuple[str, ...]: 

3674 if kind is ObjectKind.ANY: 

3675 return pg_catalog.RELKINDS_ALL_TABLE_LIKE 

3676 relkinds = () 

3677 if ObjectKind.TABLE in kind: 

3678 relkinds += pg_catalog.RELKINDS_TABLE 

3679 if ObjectKind.VIEW in kind: 

3680 relkinds += pg_catalog.RELKINDS_VIEW 

3681 if ObjectKind.MATERIALIZED_VIEW in kind: 

3682 relkinds += pg_catalog.RELKINDS_MAT_VIEW 

3683 return relkinds 

3684 

3685 @reflection.cache 

3686 def get_columns(self, connection, table_name, schema=None, **kw): 

3687 data = self.get_multi_columns( 

3688 connection, 

3689 schema=schema, 

3690 filter_names=[table_name], 

3691 scope=ObjectScope.ANY, 

3692 kind=ObjectKind.ANY, 

3693 **kw, 

3694 ) 

3695 return self._value_or_raise(data, table_name, schema) 

3696 

3697 @lru_cache() 

3698 def _columns_query(self, schema, has_filter_names, scope, kind): 

3699 # NOTE: the query with the default and identity options scalar 

3700 # subquery is faster than trying to use outer joins for them 

3701 generated = ( 

3702 pg_catalog.pg_attribute.c.attgenerated.label("generated") 

3703 if self.server_version_info >= (12,) 

3704 else sql.null().label("generated") 

3705 ) 

3706 if self.server_version_info >= (10,): 

3707 # join lateral performs worse (~2x slower) than a scalar_subquery 

3708 identity = ( 

3709 select( 

3710 sql.func.json_build_object( 

3711 "always", 

3712 pg_catalog.pg_attribute.c.attidentity == "a", 

3713 "start", 

3714 pg_catalog.pg_sequence.c.seqstart, 

3715 "increment", 

3716 pg_catalog.pg_sequence.c.seqincrement, 

3717 "minvalue", 

3718 pg_catalog.pg_sequence.c.seqmin, 

3719 "maxvalue", 

3720 pg_catalog.pg_sequence.c.seqmax, 

3721 "cache", 

3722 pg_catalog.pg_sequence.c.seqcache, 

3723 "cycle", 

3724 pg_catalog.pg_sequence.c.seqcycle, 

3725 type_=sqltypes.JSON(), 

3726 ) 

3727 ) 

3728 .select_from(pg_catalog.pg_sequence) 

3729 .where( 

3730 # attidentity != '' is required or it will reflect also 

3731 # serial columns as identity. 

3732 pg_catalog.pg_attribute.c.attidentity != "", 

3733 pg_catalog.pg_sequence.c.seqrelid 

3734 == sql.cast( 

3735 sql.cast( 

3736 pg_catalog.pg_get_serial_sequence( 

3737 sql.cast( 

3738 sql.cast( 

3739 pg_catalog.pg_attribute.c.attrelid, 

3740 REGCLASS, 

3741 ), 

3742 TEXT, 

3743 ), 

3744 pg_catalog.pg_attribute.c.attname, 

3745 ), 

3746 REGCLASS, 

3747 ), 

3748 OID, 

3749 ), 

3750 ) 

3751 .correlate(pg_catalog.pg_attribute) 

3752 .scalar_subquery() 

3753 .label("identity_options") 

3754 ) 

3755 else: 

3756 identity = sql.null().label("identity_options") 

3757 

3758 # join lateral performs the same as scalar_subquery here 

3759 default = ( 

3760 select( 

3761 pg_catalog.pg_get_expr( 

3762 pg_catalog.pg_attrdef.c.adbin, 

3763 pg_catalog.pg_attrdef.c.adrelid, 

3764 ) 

3765 ) 

3766 .select_from(pg_catalog.pg_attrdef) 

3767 .where( 

3768 pg_catalog.pg_attrdef.c.adrelid 

3769 == pg_catalog.pg_attribute.c.attrelid, 

3770 pg_catalog.pg_attrdef.c.adnum 

3771 == pg_catalog.pg_attribute.c.attnum, 

3772 pg_catalog.pg_attribute.c.atthasdef, 

3773 ) 

3774 .correlate(pg_catalog.pg_attribute) 

3775 .scalar_subquery() 

3776 .label("default") 

3777 ) 

3778 relkinds = self._kind_to_relkinds(kind) 

3779 query = ( 

3780 select( 

3781 pg_catalog.pg_attribute.c.attname.label("name"), 

3782 pg_catalog.format_type( 

3783 pg_catalog.pg_attribute.c.atttypid, 

3784 pg_catalog.pg_attribute.c.atttypmod, 

3785 ).label("format_type"), 

3786 default, 

3787 pg_catalog.pg_attribute.c.attnotnull.label("not_null"), 

3788 pg_catalog.pg_class.c.relname.label("table_name"), 

3789 pg_catalog.pg_description.c.description.label("comment"), 

3790 generated, 

3791 identity, 

3792 ) 

3793 .select_from(pg_catalog.pg_class) 

3794 # NOTE: postgresql support table with no user column, meaning 

3795 # there is no row with pg_attribute.attnum > 0. use a left outer 

3796 # join to avoid filtering these tables. 

3797 .outerjoin( 

3798 pg_catalog.pg_attribute, 

3799 sql.and_( 

3800 pg_catalog.pg_class.c.oid 

3801 == pg_catalog.pg_attribute.c.attrelid, 

3802 pg_catalog.pg_attribute.c.attnum > 0, 

3803 ~pg_catalog.pg_attribute.c.attisdropped, 

3804 ), 

3805 ) 

3806 .outerjoin( 

3807 pg_catalog.pg_description, 

3808 sql.and_( 

3809 pg_catalog.pg_description.c.objoid 

3810 == pg_catalog.pg_attribute.c.attrelid, 

3811 pg_catalog.pg_description.c.objsubid 

3812 == pg_catalog.pg_attribute.c.attnum, 

3813 ), 

3814 ) 

3815 .where(self._pg_class_relkind_condition(relkinds)) 

3816 .order_by( 

3817 pg_catalog.pg_class.c.relname, pg_catalog.pg_attribute.c.attnum 

3818 ) 

3819 ) 

3820 query = self._pg_class_filter_scope_schema(query, schema, scope=scope) 

3821 if has_filter_names: 

3822 query = query.where( 

3823 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

3824 ) 

3825 return query 

3826 

3827 def get_multi_columns( 

3828 self, connection, schema, filter_names, scope, kind, **kw 

3829 ): 

3830 has_filter_names, params = self._prepare_filter_names(filter_names) 

3831 query = self._columns_query(schema, has_filter_names, scope, kind) 

3832 rows = connection.execute(query, params).mappings() 

3833 

3834 # dictionary with (name, ) if default search path or (schema, name) 

3835 # as keys 

3836 domains = { 

3837 ((d["schema"], d["name"]) if not d["visible"] else (d["name"],)): d 

3838 for d in self._load_domains( 

3839 connection, schema="*", info_cache=kw.get("info_cache") 

3840 ) 

3841 } 

3842 

3843 # dictionary with (name, ) if default search path or (schema, name) 

3844 # as keys 

3845 enums = dict( 

3846 ( 

3847 ((rec["name"],), rec) 

3848 if rec["visible"] 

3849 else ((rec["schema"], rec["name"]), rec) 

3850 ) 

3851 for rec in self._load_enums( 

3852 connection, schema="*", info_cache=kw.get("info_cache") 

3853 ) 

3854 ) 

3855 

3856 columns = self._get_columns_info(rows, domains, enums, schema) 

3857 

3858 return columns.items() 

3859 

3860 _format_type_args_pattern = re.compile(r"\((.*)\)") 

3861 _format_type_args_delim = re.compile(r"\s*,\s*") 

3862 _format_array_spec_pattern = re.compile(r"((?:\[\])*)$") 

3863 

3864 def _reflect_type( 

3865 self, 

3866 format_type: Optional[str], 

3867 domains: Dict[str, ReflectedDomain], 

3868 enums: Dict[str, ReflectedEnum], 

3869 type_description: str, 

3870 ) -> sqltypes.TypeEngine[Any]: 

3871 """ 

3872 Attempts to reconstruct a column type defined in ischema_names based 

3873 on the information available in the format_type. 

3874 

3875 If the `format_type` cannot be associated with a known `ischema_names`, 

3876 it is treated as a reference to a known PostgreSQL named `ENUM` or 

3877 `DOMAIN` type. 

3878 """ 

3879 type_description = type_description or "unknown type" 

3880 if format_type is None: 

3881 util.warn( 

3882 "PostgreSQL format_type() returned NULL for %s" 

3883 % type_description 

3884 ) 

3885 return sqltypes.NULLTYPE 

3886 

3887 attype_args_match = self._format_type_args_pattern.search(format_type) 

3888 if attype_args_match and attype_args_match.group(1): 

3889 attype_args = self._format_type_args_delim.split( 

3890 attype_args_match.group(1) 

3891 ) 

3892 else: 

3893 attype_args = () 

3894 

3895 match_array_dim = self._format_array_spec_pattern.search(format_type) 

3896 # Each "[]" in array specs corresponds to an array dimension 

3897 array_dim = len(match_array_dim.group(1) or "") // 2 

3898 

3899 # Remove all parameters and array specs from format_type to obtain an 

3900 # ischema_name candidate 

3901 attype = self._format_type_args_pattern.sub("", format_type) 

3902 attype = self._format_array_spec_pattern.sub("", attype) 

3903 

3904 schema_type = self.ischema_names.get(attype.lower(), None) 

3905 args, kwargs = (), {} 

3906 

3907 if attype == "numeric": 

3908 if len(attype_args) == 2: 

3909 precision, scale = map(int, attype_args) 

3910 args = (precision, scale) 

3911 

3912 elif attype == "double precision": 

3913 args = (53,) 

3914 

3915 elif attype == "integer": 

3916 args = () 

3917 

3918 elif attype in ("timestamp with time zone", "time with time zone"): 

3919 kwargs["timezone"] = True 

3920 if len(attype_args) == 1: 

3921 kwargs["precision"] = int(attype_args[0]) 

3922 

3923 elif attype in ( 

3924 "timestamp without time zone", 

3925 "time without time zone", 

3926 "time", 

3927 ): 

3928 kwargs["timezone"] = False 

3929 if len(attype_args) == 1: 

3930 kwargs["precision"] = int(attype_args[0]) 

3931 

3932 elif attype == "bit varying": 

3933 kwargs["varying"] = True 

3934 if len(attype_args) == 1: 

3935 charlen = int(attype_args[0]) 

3936 args = (charlen,) 

3937 

3938 # a domain or enum can start with interval, so be mindful of that. 

3939 elif attype == "interval" or attype.startswith("interval "): 

3940 schema_type = INTERVAL 

3941 

3942 field_match = re.match(r"interval (.+)", attype) 

3943 if field_match: 

3944 kwargs["fields"] = field_match.group(1) 

3945 

3946 if len(attype_args) == 1: 

3947 kwargs["precision"] = int(attype_args[0]) 

3948 

3949 else: 

3950 enum_or_domain_key = tuple(util.quoted_token_parser(attype)) 

3951 

3952 if enum_or_domain_key in enums: 

3953 schema_type = ENUM 

3954 enum = enums[enum_or_domain_key] 

3955 

3956 kwargs["name"] = enum["name"] 

3957 

3958 if not enum["visible"]: 

3959 kwargs["schema"] = enum["schema"] 

3960 args = tuple(enum["labels"]) 

3961 elif enum_or_domain_key in domains: 

3962 schema_type = DOMAIN 

3963 domain = domains[enum_or_domain_key] 

3964 

3965 data_type = self._reflect_type( 

3966 domain["type"], 

3967 domains, 

3968 enums, 

3969 type_description="DOMAIN '%s'" % domain["name"], 

3970 ) 

3971 args = (domain["name"], data_type) 

3972 

3973 kwargs["collation"] = domain["collation"] 

3974 kwargs["default"] = domain["default"] 

3975 kwargs["not_null"] = not domain["nullable"] 

3976 kwargs["create_type"] = False 

3977 

3978 if domain["constraints"]: 

3979 # We only support a single constraint 

3980 check_constraint = domain["constraints"][0] 

3981 

3982 kwargs["constraint_name"] = check_constraint["name"] 

3983 kwargs["check"] = check_constraint["check"] 

3984 

3985 if not domain["visible"]: 

3986 kwargs["schema"] = domain["schema"] 

3987 

3988 else: 

3989 try: 

3990 charlen = int(attype_args[0]) 

3991 args = (charlen, *attype_args[1:]) 

3992 except (ValueError, IndexError): 

3993 args = attype_args 

3994 

3995 if not schema_type: 

3996 util.warn( 

3997 "Did not recognize type '%s' of %s" 

3998 % (attype, type_description) 

3999 ) 

4000 return sqltypes.NULLTYPE 

4001 

4002 data_type = schema_type(*args, **kwargs) 

4003 if array_dim >= 1: 

4004 # postgres does not preserve dimensionality or size of array types. 

4005 data_type = _array.ARRAY(data_type) 

4006 

4007 return data_type 

4008 

4009 def _get_columns_info(self, rows, domains, enums, schema): 

4010 columns = defaultdict(list) 

4011 for row_dict in rows: 

4012 # ensure that each table has an entry, even if it has no columns 

4013 if row_dict["name"] is None: 

4014 columns[(schema, row_dict["table_name"])] = ( 

4015 ReflectionDefaults.columns() 

4016 ) 

4017 continue 

4018 table_cols = columns[(schema, row_dict["table_name"])] 

4019 

4020 coltype = self._reflect_type( 

4021 row_dict["format_type"], 

4022 domains, 

4023 enums, 

4024 type_description="column '%s'" % row_dict["name"], 

4025 ) 

4026 

4027 default = row_dict["default"] 

4028 name = row_dict["name"] 

4029 generated = row_dict["generated"] 

4030 nullable = not row_dict["not_null"] 

4031 

4032 if isinstance(coltype, DOMAIN): 

4033 if not default: 

4034 # domain can override the default value but 

4035 # cant set it to None 

4036 if coltype.default is not None: 

4037 default = coltype.default 

4038 

4039 nullable = nullable and not coltype.not_null 

4040 

4041 identity = row_dict["identity_options"] 

4042 

4043 # If a zero byte or blank string depending on driver (is also 

4044 # absent for older PG versions), then not a generated column. 

4045 # Otherwise, s = stored. (Other values might be added in the 

4046 # future.) 

4047 if generated not in (None, "", b"\x00"): 

4048 computed = dict( 

4049 sqltext=default, persisted=generated in ("s", b"s") 

4050 ) 

4051 default = None 

4052 else: 

4053 computed = None 

4054 

4055 # adjust the default value 

4056 autoincrement = False 

4057 if default is not None: 

4058 match = re.search(r"""(nextval\(')([^']+)('.*$)""", default) 

4059 if match is not None: 

4060 if issubclass(coltype._type_affinity, sqltypes.Integer): 

4061 autoincrement = True 

4062 # the default is related to a Sequence 

4063 if "." not in match.group(2) and schema is not None: 

4064 # unconditionally quote the schema name. this could 

4065 # later be enhanced to obey quoting rules / 

4066 # "quote schema" 

4067 default = ( 

4068 match.group(1) 

4069 + ('"%s"' % schema) 

4070 + "." 

4071 + match.group(2) 

4072 + match.group(3) 

4073 ) 

4074 

4075 column_info = { 

4076 "name": name, 

4077 "type": coltype, 

4078 "nullable": nullable, 

4079 "default": default, 

4080 "autoincrement": autoincrement or identity is not None, 

4081 "comment": row_dict["comment"], 

4082 } 

4083 if computed is not None: 

4084 column_info["computed"] = computed 

4085 if identity is not None: 

4086 column_info["identity"] = identity 

4087 

4088 table_cols.append(column_info) 

4089 

4090 return columns 

4091 

4092 @lru_cache() 

4093 def _table_oids_query(self, schema, has_filter_names, scope, kind): 

4094 relkinds = self._kind_to_relkinds(kind) 

4095 oid_q = select( 

4096 pg_catalog.pg_class.c.oid, pg_catalog.pg_class.c.relname 

4097 ).where(self._pg_class_relkind_condition(relkinds)) 

4098 oid_q = self._pg_class_filter_scope_schema(oid_q, schema, scope=scope) 

4099 

4100 if has_filter_names: 

4101 oid_q = oid_q.where( 

4102 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

4103 ) 

4104 return oid_q 

4105 

4106 @reflection.flexi_cache( 

4107 ("schema", InternalTraversal.dp_string), 

4108 ("filter_names", InternalTraversal.dp_string_list), 

4109 ("kind", InternalTraversal.dp_plain_obj), 

4110 ("scope", InternalTraversal.dp_plain_obj), 

4111 ) 

4112 def _get_table_oids( 

4113 self, connection, schema, filter_names, scope, kind, **kw 

4114 ): 

4115 has_filter_names, params = self._prepare_filter_names(filter_names) 

4116 oid_q = self._table_oids_query(schema, has_filter_names, scope, kind) 

4117 result = connection.execute(oid_q, params) 

4118 return result.all() 

4119 

4120 @util.memoized_property 

4121 def _constraint_query(self): 

4122 if self.server_version_info >= (11, 0): 

4123 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts 

4124 else: 

4125 indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts") 

4126 

4127 if self.server_version_info >= (15,): 

4128 indnullsnotdistinct = pg_catalog.pg_index.c.indnullsnotdistinct 

4129 else: 

4130 indnullsnotdistinct = sql.false().label("indnullsnotdistinct") 

4131 

4132 con_sq = ( 

4133 select( 

4134 pg_catalog.pg_constraint.c.conrelid, 

4135 pg_catalog.pg_constraint.c.conname, 

4136 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"), 

4137 sql.func.generate_subscripts( 

4138 pg_catalog.pg_index.c.indkey, 1 

4139 ).label("ord"), 

4140 indnkeyatts, 

4141 indnullsnotdistinct, 

4142 pg_catalog.pg_description.c.description, 

4143 ) 

4144 .join( 

4145 pg_catalog.pg_index, 

4146 pg_catalog.pg_constraint.c.conindid 

4147 == pg_catalog.pg_index.c.indexrelid, 

4148 ) 

4149 .outerjoin( 

4150 pg_catalog.pg_description, 

4151 pg_catalog.pg_description.c.objoid 

4152 == pg_catalog.pg_constraint.c.oid, 

4153 ) 

4154 .where( 

4155 pg_catalog.pg_constraint.c.contype == bindparam("contype"), 

4156 pg_catalog.pg_constraint.c.conrelid.in_(bindparam("oids")), 

4157 # NOTE: filtering also on pg_index.indrelid for oids does 

4158 # not seem to have a performance effect, but it may be an 

4159 # option if perf problems are reported 

4160 ) 

4161 .subquery("con") 

4162 ) 

4163 

4164 attr_sq = ( 

4165 select( 

4166 con_sq.c.conrelid, 

4167 con_sq.c.conname, 

4168 con_sq.c.description, 

4169 con_sq.c.ord, 

4170 con_sq.c.indnkeyatts, 

4171 con_sq.c.indnullsnotdistinct, 

4172 pg_catalog.pg_attribute.c.attname, 

4173 ) 

4174 .select_from(pg_catalog.pg_attribute) 

4175 .join( 

4176 con_sq, 

4177 sql.and_( 

4178 pg_catalog.pg_attribute.c.attnum == con_sq.c.attnum, 

4179 pg_catalog.pg_attribute.c.attrelid == con_sq.c.conrelid, 

4180 ), 

4181 ) 

4182 .where( 

4183 # NOTE: restate the condition here, since pg15 otherwise 

4184 # seems to get confused on pscopg2 sometimes, doing 

4185 # a sequential scan of pg_attribute. 

4186 # The condition in the con_sq subquery is not actually needed 

4187 # in pg15, but it may be needed in older versions. Keeping it 

4188 # does not seems to have any inpact in any case. 

4189 con_sq.c.conrelid.in_(bindparam("oids")) 

4190 ) 

4191 .subquery("attr") 

4192 ) 

4193 

4194 return ( 

4195 select( 

4196 attr_sq.c.conrelid, 

4197 sql.func.array_agg( 

4198 # NOTE: cast since some postgresql derivatives may 

4199 # not support array_agg on the name type 

4200 aggregate_order_by( 

4201 attr_sq.c.attname.cast(TEXT), attr_sq.c.ord 

4202 ) 

4203 ).label("cols"), 

4204 attr_sq.c.conname, 

4205 sql.func.min(attr_sq.c.description).label("description"), 

4206 sql.func.min(attr_sq.c.indnkeyatts).label("indnkeyatts"), 

4207 sql.func.bool_and(attr_sq.c.indnullsnotdistinct).label( 

4208 "indnullsnotdistinct" 

4209 ), 

4210 ) 

4211 .group_by(attr_sq.c.conrelid, attr_sq.c.conname) 

4212 .order_by(attr_sq.c.conrelid, attr_sq.c.conname) 

4213 ) 

4214 

4215 def _reflect_constraint( 

4216 self, connection, contype, schema, filter_names, scope, kind, **kw 

4217 ): 

4218 # used to reflect primary and unique constraint 

4219 table_oids = self._get_table_oids( 

4220 connection, schema, filter_names, scope, kind, **kw 

4221 ) 

4222 batches = list(table_oids) 

4223 is_unique = contype == "u" 

4224 

4225 while batches: 

4226 batch = batches[0:3000] 

4227 batches[0:3000] = [] 

4228 

4229 result = connection.execute( 

4230 self._constraint_query, 

4231 {"oids": [r[0] for r in batch], "contype": contype}, 

4232 ).mappings() 

4233 

4234 result_by_oid = defaultdict(list) 

4235 for row_dict in result: 

4236 result_by_oid[row_dict["conrelid"]].append(row_dict) 

4237 

4238 for oid, tablename in batch: 

4239 for_oid = result_by_oid.get(oid, ()) 

4240 if for_oid: 

4241 for row in for_oid: 

4242 # See note in get_multi_indexes 

4243 all_cols = row["cols"] 

4244 indnkeyatts = row["indnkeyatts"] 

4245 if len(all_cols) > indnkeyatts: 

4246 inc_cols = all_cols[indnkeyatts:] 

4247 cst_cols = all_cols[:indnkeyatts] 

4248 else: 

4249 inc_cols = [] 

4250 cst_cols = all_cols 

4251 

4252 opts = {} 

4253 if self.server_version_info >= (11,): 

4254 opts["postgresql_include"] = inc_cols 

4255 if is_unique: 

4256 opts["postgresql_nulls_not_distinct"] = row[ 

4257 "indnullsnotdistinct" 

4258 ] 

4259 yield ( 

4260 tablename, 

4261 cst_cols, 

4262 row["conname"], 

4263 row["description"], 

4264 opts, 

4265 ) 

4266 else: 

4267 yield tablename, None, None, None, None 

4268 

4269 @reflection.cache 

4270 def get_pk_constraint(self, connection, table_name, schema=None, **kw): 

4271 data = self.get_multi_pk_constraint( 

4272 connection, 

4273 schema=schema, 

4274 filter_names=[table_name], 

4275 scope=ObjectScope.ANY, 

4276 kind=ObjectKind.ANY, 

4277 **kw, 

4278 ) 

4279 return self._value_or_raise(data, table_name, schema) 

4280 

4281 def get_multi_pk_constraint( 

4282 self, connection, schema, filter_names, scope, kind, **kw 

4283 ): 

4284 result = self._reflect_constraint( 

4285 connection, "p", schema, filter_names, scope, kind, **kw 

4286 ) 

4287 

4288 # only a single pk can be present for each table. Return an entry 

4289 # even if a table has no primary key 

4290 default = ReflectionDefaults.pk_constraint 

4291 

4292 def pk_constraint(pk_name, cols, comment, opts): 

4293 info = { 

4294 "constrained_columns": cols, 

4295 "name": pk_name, 

4296 "comment": comment, 

4297 } 

4298 if opts: 

4299 info["dialect_options"] = opts 

4300 return info 

4301 

4302 return ( 

4303 ( 

4304 (schema, table_name), 

4305 ( 

4306 pk_constraint(pk_name, cols, comment, opts) 

4307 if pk_name is not None 

4308 else default() 

4309 ), 

4310 ) 

4311 for table_name, cols, pk_name, comment, opts in result 

4312 ) 

4313 

4314 @reflection.cache 

4315 def get_foreign_keys( 

4316 self, 

4317 connection, 

4318 table_name, 

4319 schema=None, 

4320 postgresql_ignore_search_path=False, 

4321 **kw, 

4322 ): 

4323 data = self.get_multi_foreign_keys( 

4324 connection, 

4325 schema=schema, 

4326 filter_names=[table_name], 

4327 postgresql_ignore_search_path=postgresql_ignore_search_path, 

4328 scope=ObjectScope.ANY, 

4329 kind=ObjectKind.ANY, 

4330 **kw, 

4331 ) 

4332 return self._value_or_raise(data, table_name, schema) 

4333 

4334 @lru_cache() 

4335 def _foreing_key_query(self, schema, has_filter_names, scope, kind): 

4336 pg_class_ref = pg_catalog.pg_class.alias("cls_ref") 

4337 pg_namespace_ref = pg_catalog.pg_namespace.alias("nsp_ref") 

4338 relkinds = self._kind_to_relkinds(kind) 

4339 query = ( 

4340 select( 

4341 pg_catalog.pg_class.c.relname, 

4342 pg_catalog.pg_constraint.c.conname, 

4343 # NOTE: avoid calling pg_get_constraintdef when not needed 

4344 # to speed up the query 

4345 sql.case( 

4346 ( 

4347 pg_catalog.pg_constraint.c.oid.is_not(None), 

4348 pg_catalog.pg_get_constraintdef( 

4349 pg_catalog.pg_constraint.c.oid, True 

4350 ), 

4351 ), 

4352 else_=None, 

4353 ), 

4354 pg_namespace_ref.c.nspname, 

4355 pg_catalog.pg_description.c.description, 

4356 ) 

4357 .select_from(pg_catalog.pg_class) 

4358 .outerjoin( 

4359 pg_catalog.pg_constraint, 

4360 sql.and_( 

4361 pg_catalog.pg_class.c.oid 

4362 == pg_catalog.pg_constraint.c.conrelid, 

4363 pg_catalog.pg_constraint.c.contype == "f", 

4364 ), 

4365 ) 

4366 .outerjoin( 

4367 pg_class_ref, 

4368 pg_class_ref.c.oid == pg_catalog.pg_constraint.c.confrelid, 

4369 ) 

4370 .outerjoin( 

4371 pg_namespace_ref, 

4372 pg_class_ref.c.relnamespace == pg_namespace_ref.c.oid, 

4373 ) 

4374 .outerjoin( 

4375 pg_catalog.pg_description, 

4376 pg_catalog.pg_description.c.objoid 

4377 == pg_catalog.pg_constraint.c.oid, 

4378 ) 

4379 .order_by( 

4380 pg_catalog.pg_class.c.relname, 

4381 pg_catalog.pg_constraint.c.conname, 

4382 ) 

4383 .where(self._pg_class_relkind_condition(relkinds)) 

4384 ) 

4385 query = self._pg_class_filter_scope_schema(query, schema, scope) 

4386 if has_filter_names: 

4387 query = query.where( 

4388 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

4389 ) 

4390 return query 

4391 

4392 @util.memoized_property 

4393 def _fk_regex_pattern(self): 

4394 # optionally quoted token 

4395 qtoken = '(?:"[^"]+"|[A-Za-z0-9_]+?)' 

4396 

4397 # https://www.postgresql.org/docs/current/static/sql-createtable.html 

4398 return re.compile( 

4399 r"FOREIGN KEY \((.*?)\) " 

4400 rf"REFERENCES (?:({qtoken})\.)?({qtoken})\(((?:{qtoken}(?: *, *)?)+)\)" # noqa: E501 

4401 r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?" 

4402 r"[\s]?(ON UPDATE " 

4403 r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?" 

4404 r"[\s]?(ON DELETE " 

4405 r"(CASCADE|RESTRICT|NO ACTION|" 

4406 r"SET (?:NULL|DEFAULT)(?:\s\(.+\))?)+)?" 

4407 r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?" 

4408 r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?" 

4409 ) 

4410 

4411 def get_multi_foreign_keys( 

4412 self, 

4413 connection, 

4414 schema, 

4415 filter_names, 

4416 scope, 

4417 kind, 

4418 postgresql_ignore_search_path=False, 

4419 **kw, 

4420 ): 

4421 preparer = self.identifier_preparer 

4422 

4423 has_filter_names, params = self._prepare_filter_names(filter_names) 

4424 query = self._foreing_key_query(schema, has_filter_names, scope, kind) 

4425 result = connection.execute(query, params) 

4426 

4427 FK_REGEX = self._fk_regex_pattern 

4428 

4429 fkeys = defaultdict(list) 

4430 default = ReflectionDefaults.foreign_keys 

4431 for table_name, conname, condef, conschema, comment in result: 

4432 # ensure that each table has an entry, even if it has 

4433 # no foreign keys 

4434 if conname is None: 

4435 fkeys[(schema, table_name)] = default() 

4436 continue 

4437 table_fks = fkeys[(schema, table_name)] 

4438 m = re.search(FK_REGEX, condef).groups() 

4439 

4440 ( 

4441 constrained_columns, 

4442 referred_schema, 

4443 referred_table, 

4444 referred_columns, 

4445 _, 

4446 match, 

4447 _, 

4448 onupdate, 

4449 _, 

4450 ondelete, 

4451 deferrable, 

4452 _, 

4453 initially, 

4454 ) = m 

4455 

4456 if deferrable is not None: 

4457 deferrable = True if deferrable == "DEFERRABLE" else False 

4458 constrained_columns = [ 

4459 preparer._unquote_identifier(x) 

4460 for x in re.split(r"\s*,\s*", constrained_columns) 

4461 ] 

4462 

4463 if postgresql_ignore_search_path: 

4464 # when ignoring search path, we use the actual schema 

4465 # provided it isn't the "default" schema 

4466 if conschema != self.default_schema_name: 

4467 referred_schema = conschema 

4468 else: 

4469 referred_schema = schema 

4470 elif referred_schema: 

4471 # referred_schema is the schema that we regexp'ed from 

4472 # pg_get_constraintdef(). If the schema is in the search 

4473 # path, pg_get_constraintdef() will give us None. 

4474 referred_schema = preparer._unquote_identifier(referred_schema) 

4475 elif schema is not None and schema == conschema: 

4476 # If the actual schema matches the schema of the table 

4477 # we're reflecting, then we will use that. 

4478 referred_schema = schema 

4479 

4480 referred_table = preparer._unquote_identifier(referred_table) 

4481 referred_columns = [ 

4482 preparer._unquote_identifier(x) 

4483 for x in re.split(r"\s*,\s", referred_columns) 

4484 ] 

4485 options = { 

4486 k: v 

4487 for k, v in [ 

4488 ("onupdate", onupdate), 

4489 ("ondelete", ondelete), 

4490 ("initially", initially), 

4491 ("deferrable", deferrable), 

4492 ("match", match), 

4493 ] 

4494 if v is not None and v != "NO ACTION" 

4495 } 

4496 fkey_d = { 

4497 "name": conname, 

4498 "constrained_columns": constrained_columns, 

4499 "referred_schema": referred_schema, 

4500 "referred_table": referred_table, 

4501 "referred_columns": referred_columns, 

4502 "options": options, 

4503 "comment": comment, 

4504 } 

4505 table_fks.append(fkey_d) 

4506 return fkeys.items() 

4507 

4508 @reflection.cache 

4509 def get_indexes(self, connection, table_name, schema=None, **kw): 

4510 data = self.get_multi_indexes( 

4511 connection, 

4512 schema=schema, 

4513 filter_names=[table_name], 

4514 scope=ObjectScope.ANY, 

4515 kind=ObjectKind.ANY, 

4516 **kw, 

4517 ) 

4518 return self._value_or_raise(data, table_name, schema) 

4519 

4520 @util.memoized_property 

4521 def _index_query(self): 

4522 # NOTE: pg_index is used as from two times to improve performance, 

4523 # since extraing all the index information from `idx_sq` to avoid 

4524 # the second pg_index use leads to a worse performing query in 

4525 # particular when querying for a single table (as of pg 17) 

4526 # NOTE: repeating oids clause improve query performance 

4527 

4528 # subquery to get the columns 

4529 idx_sq = ( 

4530 select( 

4531 pg_catalog.pg_index.c.indexrelid, 

4532 pg_catalog.pg_index.c.indrelid, 

4533 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"), 

4534 sql.func.unnest(pg_catalog.pg_index.c.indclass).label( 

4535 "att_opclass" 

4536 ), 

4537 sql.func.generate_subscripts( 

4538 pg_catalog.pg_index.c.indkey, 1 

4539 ).label("ord"), 

4540 ) 

4541 .where( 

4542 ~pg_catalog.pg_index.c.indisprimary, 

4543 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), 

4544 ) 

4545 .subquery("idx") 

4546 ) 

4547 

4548 attr_sq = ( 

4549 select( 

4550 idx_sq.c.indexrelid, 

4551 idx_sq.c.indrelid, 

4552 idx_sq.c.ord, 

4553 # NOTE: always using pg_get_indexdef is too slow so just 

4554 # invoke when the element is an expression 

4555 sql.case( 

4556 ( 

4557 idx_sq.c.attnum == 0, 

4558 pg_catalog.pg_get_indexdef( 

4559 idx_sq.c.indexrelid, idx_sq.c.ord + 1, True 

4560 ), 

4561 ), 

4562 # NOTE: need to cast this since attname is of type "name" 

4563 # that's limited to 63 bytes, while pg_get_indexdef 

4564 # returns "text" so its output may get cut 

4565 else_=pg_catalog.pg_attribute.c.attname.cast(TEXT), 

4566 ).label("element"), 

4567 (idx_sq.c.attnum == 0).label("is_expr"), 

4568 pg_catalog.pg_opclass.c.opcname, 

4569 pg_catalog.pg_opclass.c.opcdefault, 

4570 ) 

4571 .select_from(idx_sq) 

4572 .outerjoin( 

4573 # do not remove rows where idx_sq.c.attnum is 0 

4574 pg_catalog.pg_attribute, 

4575 sql.and_( 

4576 pg_catalog.pg_attribute.c.attnum == idx_sq.c.attnum, 

4577 pg_catalog.pg_attribute.c.attrelid == idx_sq.c.indrelid, 

4578 ), 

4579 ) 

4580 .outerjoin( 

4581 pg_catalog.pg_opclass, 

4582 pg_catalog.pg_opclass.c.oid == idx_sq.c.att_opclass, 

4583 ) 

4584 .where(idx_sq.c.indrelid.in_(bindparam("oids"))) 

4585 .subquery("idx_attr") 

4586 ) 

4587 

4588 cols_sq = ( 

4589 select( 

4590 attr_sq.c.indexrelid, 

4591 sql.func.min(attr_sq.c.indrelid), 

4592 sql.func.array_agg( 

4593 aggregate_order_by(attr_sq.c.element, attr_sq.c.ord) 

4594 ).label("elements"), 

4595 sql.func.array_agg( 

4596 aggregate_order_by(attr_sq.c.is_expr, attr_sq.c.ord) 

4597 ).label("elements_is_expr"), 

4598 sql.func.array_agg( 

4599 aggregate_order_by(attr_sq.c.opcname, attr_sq.c.ord) 

4600 ).label("elements_opclass"), 

4601 sql.func.array_agg( 

4602 aggregate_order_by(attr_sq.c.opcdefault, attr_sq.c.ord) 

4603 ).label("elements_opdefault"), 

4604 ) 

4605 .group_by(attr_sq.c.indexrelid) 

4606 .subquery("idx_cols") 

4607 ) 

4608 

4609 if self.server_version_info >= (11, 0): 

4610 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts 

4611 else: 

4612 indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts") 

4613 

4614 if self.server_version_info >= (15,): 

4615 nulls_not_distinct = pg_catalog.pg_index.c.indnullsnotdistinct 

4616 else: 

4617 nulls_not_distinct = sql.false().label("indnullsnotdistinct") 

4618 

4619 return ( 

4620 select( 

4621 pg_catalog.pg_index.c.indrelid, 

4622 pg_catalog.pg_class.c.relname, 

4623 pg_catalog.pg_index.c.indisunique, 

4624 pg_catalog.pg_constraint.c.conrelid.is_not(None).label( 

4625 "has_constraint" 

4626 ), 

4627 pg_catalog.pg_index.c.indoption, 

4628 pg_catalog.pg_class.c.reloptions, 

4629 pg_catalog.pg_am.c.amname, 

4630 # NOTE: pg_get_expr is very fast so this case has almost no 

4631 # performance impact 

4632 sql.case( 

4633 ( 

4634 pg_catalog.pg_index.c.indpred.is_not(None), 

4635 pg_catalog.pg_get_expr( 

4636 pg_catalog.pg_index.c.indpred, 

4637 pg_catalog.pg_index.c.indrelid, 

4638 ), 

4639 ), 

4640 else_=None, 

4641 ).label("filter_definition"), 

4642 indnkeyatts, 

4643 nulls_not_distinct, 

4644 cols_sq.c.elements, 

4645 cols_sq.c.elements_is_expr, 

4646 cols_sq.c.elements_opclass, 

4647 cols_sq.c.elements_opdefault, 

4648 ) 

4649 .select_from(pg_catalog.pg_index) 

4650 .where( 

4651 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")), 

4652 ~pg_catalog.pg_index.c.indisprimary, 

4653 ) 

4654 .join( 

4655 pg_catalog.pg_class, 

4656 pg_catalog.pg_index.c.indexrelid == pg_catalog.pg_class.c.oid, 

4657 ) 

4658 .join( 

4659 pg_catalog.pg_am, 

4660 pg_catalog.pg_class.c.relam == pg_catalog.pg_am.c.oid, 

4661 ) 

4662 .outerjoin( 

4663 cols_sq, 

4664 pg_catalog.pg_index.c.indexrelid == cols_sq.c.indexrelid, 

4665 ) 

4666 .outerjoin( 

4667 pg_catalog.pg_constraint, 

4668 sql.and_( 

4669 pg_catalog.pg_index.c.indrelid 

4670 == pg_catalog.pg_constraint.c.conrelid, 

4671 pg_catalog.pg_index.c.indexrelid 

4672 == pg_catalog.pg_constraint.c.conindid, 

4673 pg_catalog.pg_constraint.c.contype 

4674 == sql.any_(_array.array(("p", "u", "x"))), 

4675 ), 

4676 ) 

4677 .order_by( 

4678 pg_catalog.pg_index.c.indrelid, pg_catalog.pg_class.c.relname 

4679 ) 

4680 ) 

4681 

4682 def get_multi_indexes( 

4683 self, connection, schema, filter_names, scope, kind, **kw 

4684 ): 

4685 table_oids = self._get_table_oids( 

4686 connection, schema, filter_names, scope, kind, **kw 

4687 ) 

4688 

4689 indexes = defaultdict(list) 

4690 default = ReflectionDefaults.indexes 

4691 

4692 batches = list(table_oids) 

4693 

4694 while batches: 

4695 batch = batches[0:3000] 

4696 batches[0:3000] = [] 

4697 

4698 result = connection.execute( 

4699 self._index_query, {"oids": [r[0] for r in batch]} 

4700 ).mappings() 

4701 

4702 result_by_oid = defaultdict(list) 

4703 for row_dict in result: 

4704 result_by_oid[row_dict["indrelid"]].append(row_dict) 

4705 

4706 for oid, table_name in batch: 

4707 if oid not in result_by_oid: 

4708 # ensure that each table has an entry, even if reflection 

4709 # is skipped because not supported 

4710 indexes[(schema, table_name)] = default() 

4711 continue 

4712 

4713 for row in result_by_oid[oid]: 

4714 index_name = row["relname"] 

4715 

4716 table_indexes = indexes[(schema, table_name)] 

4717 

4718 all_elements = row["elements"] 

4719 all_elements_is_expr = row["elements_is_expr"] 

4720 all_elements_opclass = row["elements_opclass"] 

4721 all_elements_opdefault = row["elements_opdefault"] 

4722 indnkeyatts = row["indnkeyatts"] 

4723 # "The number of key columns in the index, not counting any 

4724 # included columns, which are merely stored and do not 

4725 # participate in the index semantics" 

4726 if len(all_elements) > indnkeyatts: 

4727 # this is a "covering index" which has INCLUDE columns 

4728 # as well as regular index columns 

4729 inc_cols = all_elements[indnkeyatts:] 

4730 idx_elements = all_elements[:indnkeyatts] 

4731 idx_elements_is_expr = all_elements_is_expr[ 

4732 :indnkeyatts 

4733 ] 

4734 # postgresql does not support expression on included 

4735 # columns as of v14: "ERROR: expressions are not 

4736 # supported in included columns". 

4737 assert all( 

4738 not is_expr 

4739 for is_expr in all_elements_is_expr[indnkeyatts:] 

4740 ) 

4741 idx_elements_opclass = all_elements_opclass[ 

4742 :indnkeyatts 

4743 ] 

4744 idx_elements_opdefault = all_elements_opdefault[ 

4745 :indnkeyatts 

4746 ] 

4747 else: 

4748 idx_elements = all_elements 

4749 idx_elements_is_expr = all_elements_is_expr 

4750 inc_cols = [] 

4751 idx_elements_opclass = all_elements_opclass 

4752 idx_elements_opdefault = all_elements_opdefault 

4753 

4754 index = {"name": index_name, "unique": row["indisunique"]} 

4755 if any(idx_elements_is_expr): 

4756 index["column_names"] = [ 

4757 None if is_expr else expr 

4758 for expr, is_expr in zip( 

4759 idx_elements, idx_elements_is_expr 

4760 ) 

4761 ] 

4762 index["expressions"] = idx_elements 

4763 else: 

4764 index["column_names"] = idx_elements 

4765 

4766 dialect_options = {} 

4767 

4768 if not all(idx_elements_opdefault): 

4769 dialect_options["postgresql_ops"] = { 

4770 name: opclass 

4771 for name, opclass, is_default in zip( 

4772 idx_elements, 

4773 idx_elements_opclass, 

4774 idx_elements_opdefault, 

4775 ) 

4776 if not is_default 

4777 } 

4778 

4779 sorting = {} 

4780 for col_index, col_flags in enumerate(row["indoption"]): 

4781 col_sorting = () 

4782 # try to set flags only if they differ from PG 

4783 # defaults... 

4784 if col_flags & 0x01: 

4785 col_sorting += ("desc",) 

4786 if not (col_flags & 0x02): 

4787 col_sorting += ("nulls_last",) 

4788 else: 

4789 if col_flags & 0x02: 

4790 col_sorting += ("nulls_first",) 

4791 if col_sorting: 

4792 sorting[idx_elements[col_index]] = col_sorting 

4793 if sorting: 

4794 index["column_sorting"] = sorting 

4795 if row["has_constraint"]: 

4796 index["duplicates_constraint"] = index_name 

4797 

4798 if row["reloptions"]: 

4799 dialect_options["postgresql_with"] = dict( 

4800 [ 

4801 option.split("=", 1) 

4802 for option in row["reloptions"] 

4803 ] 

4804 ) 

4805 # it *might* be nice to include that this is 'btree' in the 

4806 # reflection info. But we don't want an Index object 

4807 # to have a ``postgresql_using`` in it that is just the 

4808 # default, so for the moment leaving this out. 

4809 amname = row["amname"] 

4810 if amname != "btree": 

4811 dialect_options["postgresql_using"] = row["amname"] 

4812 if row["filter_definition"]: 

4813 dialect_options["postgresql_where"] = row[ 

4814 "filter_definition" 

4815 ] 

4816 if self.server_version_info >= (11,): 

4817 # NOTE: this is legacy, this is part of 

4818 # dialect_options now as of #7382 

4819 index["include_columns"] = inc_cols 

4820 dialect_options["postgresql_include"] = inc_cols 

4821 if row["indnullsnotdistinct"]: 

4822 # the default is False, so ignore it. 

4823 dialect_options["postgresql_nulls_not_distinct"] = row[ 

4824 "indnullsnotdistinct" 

4825 ] 

4826 

4827 if dialect_options: 

4828 index["dialect_options"] = dialect_options 

4829 

4830 table_indexes.append(index) 

4831 return indexes.items() 

4832 

4833 @reflection.cache 

4834 def get_unique_constraints( 

4835 self, connection, table_name, schema=None, **kw 

4836 ): 

4837 data = self.get_multi_unique_constraints( 

4838 connection, 

4839 schema=schema, 

4840 filter_names=[table_name], 

4841 scope=ObjectScope.ANY, 

4842 kind=ObjectKind.ANY, 

4843 **kw, 

4844 ) 

4845 return self._value_or_raise(data, table_name, schema) 

4846 

4847 def get_multi_unique_constraints( 

4848 self, 

4849 connection, 

4850 schema, 

4851 filter_names, 

4852 scope, 

4853 kind, 

4854 **kw, 

4855 ): 

4856 result = self._reflect_constraint( 

4857 connection, "u", schema, filter_names, scope, kind, **kw 

4858 ) 

4859 

4860 # each table can have multiple unique constraints 

4861 uniques = defaultdict(list) 

4862 default = ReflectionDefaults.unique_constraints 

4863 for table_name, cols, con_name, comment, options in result: 

4864 # ensure a list is created for each table. leave it empty if 

4865 # the table has no unique cosntraint 

4866 if con_name is None: 

4867 uniques[(schema, table_name)] = default() 

4868 continue 

4869 

4870 uc_dict = { 

4871 "column_names": cols, 

4872 "name": con_name, 

4873 "comment": comment, 

4874 } 

4875 if options: 

4876 uc_dict["dialect_options"] = options 

4877 

4878 uniques[(schema, table_name)].append(uc_dict) 

4879 return uniques.items() 

4880 

4881 @reflection.cache 

4882 def get_table_comment(self, connection, table_name, schema=None, **kw): 

4883 data = self.get_multi_table_comment( 

4884 connection, 

4885 schema, 

4886 [table_name], 

4887 scope=ObjectScope.ANY, 

4888 kind=ObjectKind.ANY, 

4889 **kw, 

4890 ) 

4891 return self._value_or_raise(data, table_name, schema) 

4892 

4893 @lru_cache() 

4894 def _comment_query(self, schema, has_filter_names, scope, kind): 

4895 relkinds = self._kind_to_relkinds(kind) 

4896 query = ( 

4897 select( 

4898 pg_catalog.pg_class.c.relname, 

4899 pg_catalog.pg_description.c.description, 

4900 ) 

4901 .select_from(pg_catalog.pg_class) 

4902 .outerjoin( 

4903 pg_catalog.pg_description, 

4904 sql.and_( 

4905 pg_catalog.pg_class.c.oid 

4906 == pg_catalog.pg_description.c.objoid, 

4907 pg_catalog.pg_description.c.objsubid == 0, 

4908 pg_catalog.pg_description.c.classoid 

4909 == sql.func.cast("pg_catalog.pg_class", REGCLASS), 

4910 ), 

4911 ) 

4912 .where(self._pg_class_relkind_condition(relkinds)) 

4913 ) 

4914 query = self._pg_class_filter_scope_schema(query, schema, scope) 

4915 if has_filter_names: 

4916 query = query.where( 

4917 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

4918 ) 

4919 return query 

4920 

4921 def get_multi_table_comment( 

4922 self, connection, schema, filter_names, scope, kind, **kw 

4923 ): 

4924 has_filter_names, params = self._prepare_filter_names(filter_names) 

4925 query = self._comment_query(schema, has_filter_names, scope, kind) 

4926 result = connection.execute(query, params) 

4927 

4928 default = ReflectionDefaults.table_comment 

4929 return ( 

4930 ( 

4931 (schema, table), 

4932 {"text": comment} if comment is not None else default(), 

4933 ) 

4934 for table, comment in result 

4935 ) 

4936 

4937 @reflection.cache 

4938 def get_check_constraints(self, connection, table_name, schema=None, **kw): 

4939 data = self.get_multi_check_constraints( 

4940 connection, 

4941 schema, 

4942 [table_name], 

4943 scope=ObjectScope.ANY, 

4944 kind=ObjectKind.ANY, 

4945 **kw, 

4946 ) 

4947 return self._value_or_raise(data, table_name, schema) 

4948 

4949 @lru_cache() 

4950 def _check_constraint_query(self, schema, has_filter_names, scope, kind): 

4951 relkinds = self._kind_to_relkinds(kind) 

4952 query = ( 

4953 select( 

4954 pg_catalog.pg_class.c.relname, 

4955 pg_catalog.pg_constraint.c.conname, 

4956 # NOTE: avoid calling pg_get_constraintdef when not needed 

4957 # to speed up the query 

4958 sql.case( 

4959 ( 

4960 pg_catalog.pg_constraint.c.oid.is_not(None), 

4961 pg_catalog.pg_get_constraintdef( 

4962 pg_catalog.pg_constraint.c.oid, True 

4963 ), 

4964 ), 

4965 else_=None, 

4966 ), 

4967 pg_catalog.pg_description.c.description, 

4968 ) 

4969 .select_from(pg_catalog.pg_class) 

4970 .outerjoin( 

4971 pg_catalog.pg_constraint, 

4972 sql.and_( 

4973 pg_catalog.pg_class.c.oid 

4974 == pg_catalog.pg_constraint.c.conrelid, 

4975 pg_catalog.pg_constraint.c.contype == "c", 

4976 ), 

4977 ) 

4978 .outerjoin( 

4979 pg_catalog.pg_description, 

4980 pg_catalog.pg_description.c.objoid 

4981 == pg_catalog.pg_constraint.c.oid, 

4982 ) 

4983 .order_by( 

4984 pg_catalog.pg_class.c.relname, 

4985 pg_catalog.pg_constraint.c.conname, 

4986 ) 

4987 .where(self._pg_class_relkind_condition(relkinds)) 

4988 ) 

4989 query = self._pg_class_filter_scope_schema(query, schema, scope) 

4990 if has_filter_names: 

4991 query = query.where( 

4992 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names")) 

4993 ) 

4994 return query 

4995 

4996 def get_multi_check_constraints( 

4997 self, connection, schema, filter_names, scope, kind, **kw 

4998 ): 

4999 has_filter_names, params = self._prepare_filter_names(filter_names) 

5000 query = self._check_constraint_query( 

5001 schema, has_filter_names, scope, kind 

5002 ) 

5003 result = connection.execute(query, params) 

5004 

5005 check_constraints = defaultdict(list) 

5006 default = ReflectionDefaults.check_constraints 

5007 for table_name, check_name, src, comment in result: 

5008 # only two cases for check_name and src: both null or both defined 

5009 if check_name is None and src is None: 

5010 check_constraints[(schema, table_name)] = default() 

5011 continue 

5012 # samples: 

5013 # "CHECK (((a > 1) AND (a < 5)))" 

5014 # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))" 

5015 # "CHECK (((a > 1) AND (a < 5))) NOT VALID" 

5016 # "CHECK (some_boolean_function(a))" 

5017 # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)" 

5018 # "CHECK (a NOT NULL) NO INHERIT" 

5019 # "CHECK (a NOT NULL) NO INHERIT NOT VALID" 

5020 

5021 m = re.match( 

5022 r"^CHECK *\((.+)\)( NO INHERIT)?( NOT VALID)?$", 

5023 src, 

5024 flags=re.DOTALL, 

5025 ) 

5026 if not m: 

5027 util.warn("Could not parse CHECK constraint text: %r" % src) 

5028 sqltext = "" 

5029 else: 

5030 sqltext = re.compile( 

5031 r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL 

5032 ).sub(r"\1", m.group(1)) 

5033 entry = { 

5034 "name": check_name, 

5035 "sqltext": sqltext, 

5036 "comment": comment, 

5037 } 

5038 if m: 

5039 do = {} 

5040 if " NOT VALID" in m.groups(): 

5041 do["not_valid"] = True 

5042 if " NO INHERIT" in m.groups(): 

5043 do["no_inherit"] = True 

5044 if do: 

5045 entry["dialect_options"] = do 

5046 

5047 check_constraints[(schema, table_name)].append(entry) 

5048 return check_constraints.items() 

5049 

5050 def _pg_type_filter_schema(self, query, schema): 

5051 if schema is None: 

5052 query = query.where( 

5053 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid), 

5054 # ignore pg_catalog schema 

5055 pg_catalog.pg_namespace.c.nspname != "pg_catalog", 

5056 ) 

5057 elif schema != "*": 

5058 query = query.where(pg_catalog.pg_namespace.c.nspname == schema) 

5059 return query 

5060 

5061 @lru_cache() 

5062 def _enum_query(self, schema): 

5063 lbl_agg_sq = ( 

5064 select( 

5065 pg_catalog.pg_enum.c.enumtypid, 

5066 sql.func.array_agg( 

5067 aggregate_order_by( 

5068 # NOTE: cast since some postgresql derivatives may 

5069 # not support array_agg on the name type 

5070 pg_catalog.pg_enum.c.enumlabel.cast(TEXT), 

5071 pg_catalog.pg_enum.c.enumsortorder, 

5072 ) 

5073 ).label("labels"), 

5074 ) 

5075 .group_by(pg_catalog.pg_enum.c.enumtypid) 

5076 .subquery("lbl_agg") 

5077 ) 

5078 

5079 query = ( 

5080 select( 

5081 pg_catalog.pg_type.c.typname.label("name"), 

5082 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( 

5083 "visible" 

5084 ), 

5085 pg_catalog.pg_namespace.c.nspname.label("schema"), 

5086 lbl_agg_sq.c.labels.label("labels"), 

5087 ) 

5088 .join( 

5089 pg_catalog.pg_namespace, 

5090 pg_catalog.pg_namespace.c.oid 

5091 == pg_catalog.pg_type.c.typnamespace, 

5092 ) 

5093 .outerjoin( 

5094 lbl_agg_sq, pg_catalog.pg_type.c.oid == lbl_agg_sq.c.enumtypid 

5095 ) 

5096 .where(pg_catalog.pg_type.c.typtype == "e") 

5097 .order_by( 

5098 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname 

5099 ) 

5100 ) 

5101 

5102 return self._pg_type_filter_schema(query, schema) 

5103 

5104 @reflection.cache 

5105 def _load_enums(self, connection, schema=None, **kw): 

5106 if not self.supports_native_enum: 

5107 return [] 

5108 

5109 result = connection.execute(self._enum_query(schema)) 

5110 

5111 enums = [] 

5112 for name, visible, schema, labels in result: 

5113 enums.append( 

5114 { 

5115 "name": name, 

5116 "schema": schema, 

5117 "visible": visible, 

5118 "labels": [] if labels is None else labels, 

5119 } 

5120 ) 

5121 return enums 

5122 

5123 @lru_cache() 

5124 def _domain_query(self, schema): 

5125 con_sq = ( 

5126 select( 

5127 pg_catalog.pg_constraint.c.contypid, 

5128 sql.func.array_agg( 

5129 pg_catalog.pg_get_constraintdef( 

5130 pg_catalog.pg_constraint.c.oid, True 

5131 ) 

5132 ).label("condefs"), 

5133 sql.func.array_agg( 

5134 # NOTE: cast since some postgresql derivatives may 

5135 # not support array_agg on the name type 

5136 pg_catalog.pg_constraint.c.conname.cast(TEXT) 

5137 ).label("connames"), 

5138 ) 

5139 # The domain this constraint is on; zero if not a domain constraint 

5140 .where(pg_catalog.pg_constraint.c.contypid != 0) 

5141 .group_by(pg_catalog.pg_constraint.c.contypid) 

5142 .subquery("domain_constraints") 

5143 ) 

5144 

5145 query = ( 

5146 select( 

5147 pg_catalog.pg_type.c.typname.label("name"), 

5148 pg_catalog.format_type( 

5149 pg_catalog.pg_type.c.typbasetype, 

5150 pg_catalog.pg_type.c.typtypmod, 

5151 ).label("attype"), 

5152 (~pg_catalog.pg_type.c.typnotnull).label("nullable"), 

5153 pg_catalog.pg_type.c.typdefault.label("default"), 

5154 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label( 

5155 "visible" 

5156 ), 

5157 pg_catalog.pg_namespace.c.nspname.label("schema"), 

5158 con_sq.c.condefs, 

5159 con_sq.c.connames, 

5160 pg_catalog.pg_collation.c.collname, 

5161 ) 

5162 .join( 

5163 pg_catalog.pg_namespace, 

5164 pg_catalog.pg_namespace.c.oid 

5165 == pg_catalog.pg_type.c.typnamespace, 

5166 ) 

5167 .outerjoin( 

5168 pg_catalog.pg_collation, 

5169 pg_catalog.pg_type.c.typcollation 

5170 == pg_catalog.pg_collation.c.oid, 

5171 ) 

5172 .outerjoin( 

5173 con_sq, 

5174 pg_catalog.pg_type.c.oid == con_sq.c.contypid, 

5175 ) 

5176 .where(pg_catalog.pg_type.c.typtype == "d") 

5177 .order_by( 

5178 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname 

5179 ) 

5180 ) 

5181 return self._pg_type_filter_schema(query, schema) 

5182 

5183 @reflection.cache 

5184 def _load_domains(self, connection, schema=None, **kw): 

5185 result = connection.execute(self._domain_query(schema)) 

5186 

5187 domains: List[ReflectedDomain] = [] 

5188 for domain in result.mappings(): 

5189 # strip (30) from character varying(30) 

5190 attype = re.search(r"([^\(]+)", domain["attype"]).group(1) 

5191 constraints: List[ReflectedDomainConstraint] = [] 

5192 if domain["connames"]: 

5193 # When a domain has multiple CHECK constraints, they will 

5194 # be tested in alphabetical order by name. 

5195 sorted_constraints = sorted( 

5196 zip(domain["connames"], domain["condefs"]), 

5197 key=lambda t: t[0], 

5198 ) 

5199 for name, def_ in sorted_constraints: 

5200 # constraint is in the form "CHECK (expression)" 

5201 # or "NOT NULL". Ignore the "NOT NULL" and 

5202 # remove "CHECK (" and the tailing ")". 

5203 if def_.casefold().startswith("check"): 

5204 check = def_[7:-1] 

5205 constraints.append({"name": name, "check": check}) 

5206 domain_rec: ReflectedDomain = { 

5207 "name": domain["name"], 

5208 "schema": domain["schema"], 

5209 "visible": domain["visible"], 

5210 "type": attype, 

5211 "nullable": domain["nullable"], 

5212 "default": domain["default"], 

5213 "constraints": constraints, 

5214 "collation": domain["collname"], 

5215 } 

5216 domains.append(domain_rec) 

5217 

5218 return domains 

5219 

5220 def _set_backslash_escapes(self, connection): 

5221 # this method is provided as an override hook for descendant 

5222 # dialects (e.g. Redshift), so removing it may break them 

5223 std_string = connection.exec_driver_sql( 

5224 "show standard_conforming_strings" 

5225 ).scalar() 

5226 self._backslash_escapes = std_string == "off"