1# dialects/postgresql/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9r"""
10.. dialect:: postgresql
11 :name: PostgreSQL
12 :normal_support: 9.6+
13 :best_effort: 9+
14
15.. _postgresql_sequences:
16
17Sequences/SERIAL/IDENTITY
18-------------------------
19
20PostgreSQL supports sequences, and SQLAlchemy uses these as the default means
21of creating new primary key values for integer-based primary key columns. When
22creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for
23integer-based primary key columns, which generates a sequence and server side
24default corresponding to the column.
25
26To specify a specific named sequence to be used for primary key generation,
27use the :func:`~sqlalchemy.schema.Sequence` construct::
28
29 Table(
30 "sometable",
31 metadata,
32 Column(
33 "id", Integer, Sequence("some_id_seq", start=1), primary_key=True
34 ),
35 )
36
37When SQLAlchemy issues a single INSERT statement, to fulfill the contract of
38having the "last insert identifier" available, a RETURNING clause is added to
39the INSERT statement which specifies the primary key columns should be
40returned after the statement completes. The RETURNING functionality only takes
41place if PostgreSQL 8.2 or later is in use. As a fallback approach, the
42sequence, whether specified explicitly or implicitly via ``SERIAL``, is
43executed independently beforehand, the returned value to be used in the
44subsequent insert. Note that when an
45:func:`~sqlalchemy.sql.expression.insert()` construct is executed using
46"executemany" semantics, the "last inserted identifier" functionality does not
47apply; no RETURNING clause is emitted nor is the sequence pre-executed in this
48case.
49
50
51PostgreSQL 10 and above IDENTITY columns
52^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
53
54PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use
55of SERIAL. The :class:`_schema.Identity` construct in a
56:class:`_schema.Column` can be used to control its behavior::
57
58 from sqlalchemy import Table, Column, MetaData, Integer, Computed
59
60 metadata = MetaData()
61
62 data = Table(
63 "data",
64 metadata,
65 Column(
66 "id", Integer, Identity(start=42, cycle=True), primary_key=True
67 ),
68 Column("data", String),
69 )
70
71The CREATE TABLE for the above :class:`_schema.Table` object would be:
72
73.. sourcecode:: sql
74
75 CREATE TABLE data (
76 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE),
77 data VARCHAR,
78 PRIMARY KEY (id)
79 )
80
81.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
82 in a :class:`_schema.Column` to specify the option of an autoincrementing
83 column.
84
85.. note::
86
87 Previous versions of SQLAlchemy did not have built-in support for rendering
88 of IDENTITY, and could use the following compilation hook to replace
89 occurrences of SERIAL with IDENTITY::
90
91 from sqlalchemy.schema import CreateColumn
92 from sqlalchemy.ext.compiler import compiles
93
94
95 @compiles(CreateColumn, "postgresql")
96 def use_identity(element, compiler, **kw):
97 text = compiler.visit_create_column(element, **kw)
98 text = text.replace("SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY")
99 return text
100
101 Using the above, a table such as::
102
103 t = Table(
104 "t", m, Column("id", Integer, primary_key=True), Column("data", String)
105 )
106
107 Will generate on the backing database as:
108
109 .. sourcecode:: sql
110
111 CREATE TABLE t (
112 id INT GENERATED BY DEFAULT AS IDENTITY,
113 data VARCHAR,
114 PRIMARY KEY (id)
115 )
116
117.. _postgresql_ss_cursors:
118
119Server Side Cursors
120-------------------
121
122Server-side cursor support is available for the psycopg2, asyncpg
123dialects and may also be available in others.
124
125Server side cursors are enabled on a per-statement basis by using the
126:paramref:`.Connection.execution_options.stream_results` connection execution
127option::
128
129 with engine.connect() as conn:
130 result = conn.execution_options(stream_results=True).execute(
131 text("select * from table")
132 )
133
134Note that some kinds of SQL statements may not be supported with
135server side cursors; generally, only SQL statements that return rows should be
136used with this option.
137
138.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated
139 and will be removed in a future release. Please use the
140 :paramref:`_engine.Connection.stream_results` execution option for
141 unbuffered cursor support.
142
143.. seealso::
144
145 :ref:`engine_stream_results`
146
147.. _postgresql_isolation_level:
148
149Transaction Isolation Level
150---------------------------
151
152Most SQLAlchemy dialects support setting of transaction isolation level
153using the :paramref:`_sa.create_engine.isolation_level` parameter
154at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection`
155level via the :paramref:`.Connection.execution_options.isolation_level`
156parameter.
157
158For PostgreSQL dialects, this feature works either by making use of the
159DBAPI-specific features, such as psycopg2's isolation level flags which will
160embed the isolation level setting inline with the ``"BEGIN"`` statement, or for
161DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS
162TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement
163emitted by the DBAPI. For the special AUTOCOMMIT isolation level,
164DBAPI-specific techniques are used which is typically an ``.autocommit``
165flag on the DBAPI connection object.
166
167To set isolation level using :func:`_sa.create_engine`::
168
169 engine = create_engine(
170 "postgresql+pg8000://scott:tiger@localhost/test",
171 isolation_level="REPEATABLE READ",
172 )
173
174To set using per-connection execution options::
175
176 with engine.connect() as conn:
177 conn = conn.execution_options(isolation_level="REPEATABLE READ")
178 with conn.begin():
179 ... # work with transaction
180
181There are also more options for isolation level configurations, such as
182"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
183different isolation level settings. See the discussion at
184:ref:`dbapi_autocommit` for background.
185
186Valid values for ``isolation_level`` on most PostgreSQL dialects include:
187
188* ``READ COMMITTED``
189* ``READ UNCOMMITTED``
190* ``REPEATABLE READ``
191* ``SERIALIZABLE``
192* ``AUTOCOMMIT``
193
194.. seealso::
195
196 :ref:`dbapi_autocommit`
197
198 :ref:`postgresql_readonly_deferrable`
199
200 :ref:`psycopg2_isolation_level`
201
202 :ref:`pg8000_isolation_level`
203
204.. _postgresql_readonly_deferrable:
205
206Setting READ ONLY / DEFERRABLE
207------------------------------
208
209Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE"
210characteristics of the transaction, which is in addition to the isolation level
211setting. These two attributes can be established either in conjunction with or
212independently of the isolation level by passing the ``postgresql_readonly`` and
213``postgresql_deferrable`` flags with
214:meth:`_engine.Connection.execution_options`. The example below illustrates
215passing the ``"SERIALIZABLE"`` isolation level at the same time as setting
216"READ ONLY" and "DEFERRABLE"::
217
218 with engine.connect() as conn:
219 conn = conn.execution_options(
220 isolation_level="SERIALIZABLE",
221 postgresql_readonly=True,
222 postgresql_deferrable=True,
223 )
224 with conn.begin():
225 ... # work with transaction
226
227Note that some DBAPIs such as asyncpg only support "readonly" with
228SERIALIZABLE isolation.
229
230.. versionadded:: 1.4 added support for the ``postgresql_readonly``
231 and ``postgresql_deferrable`` execution options.
232
233.. _postgresql_reset_on_return:
234
235Temporary Table / Resource Reset for Connection Pooling
236-------------------------------------------------------
237
238The :class:`.QueuePool` connection pool implementation used
239by the SQLAlchemy :class:`.Engine` object includes
240:ref:`reset on return <pool_reset_on_return>` behavior that will invoke
241the DBAPI ``.rollback()`` method when connections are returned to the pool.
242While this rollback will clear out the immediate state used by the previous
243transaction, it does not cover a wider range of session-level state, including
244temporary tables as well as other server state such as prepared statement
245handles and statement caches. The PostgreSQL database includes a variety
246of commands which may be used to reset this state, including
247``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``.
248
249
250To install
251one or more of these commands as the means of performing reset-on-return,
252the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated
253in the example below. The implementation
254will end transactions in progress as well as discard temporary tables
255using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL
256documentation for background on what each of these statements do.
257
258The :paramref:`_sa.create_engine.pool_reset_on_return` parameter
259is set to ``None`` so that the custom scheme can replace the default behavior
260completely. The custom hook implementation calls ``.rollback()`` in any case,
261as it's usually important that the DBAPI's own tracking of commit/rollback
262will remain consistent with the state of the transaction::
263
264
265 from sqlalchemy import create_engine
266 from sqlalchemy import event
267
268 postgresql_engine = create_engine(
269 "postgresql+psycopg2://scott:tiger@hostname/dbname",
270 # disable default reset-on-return scheme
271 pool_reset_on_return=None,
272 )
273
274
275 @event.listens_for(postgresql_engine, "reset")
276 def _reset_postgresql(dbapi_connection, connection_record, reset_state):
277 if not reset_state.terminate_only:
278 dbapi_connection.execute("CLOSE ALL")
279 dbapi_connection.execute("RESET ALL")
280 dbapi_connection.execute("DISCARD TEMP")
281
282 # so that the DBAPI itself knows that the connection has been
283 # reset
284 dbapi_connection.rollback()
285
286.. versionchanged:: 2.0.0b3 Added additional state arguments to
287 the :meth:`.PoolEvents.reset` event and additionally ensured the event
288 is invoked for all "reset" occurrences, so that it's appropriate
289 as a place for custom "reset" handlers. Previous schemes which
290 use the :meth:`.PoolEvents.checkin` handler remain usable as well.
291
292.. seealso::
293
294 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation
295
296.. _postgresql_alternate_search_path:
297
298Setting Alternate Search Paths on Connect
299------------------------------------------
300
301The PostgreSQL ``search_path`` variable refers to the list of schema names
302that will be implicitly referenced when a particular table or other
303object is referenced in a SQL statement. As detailed in the next section
304:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around
305the concept of keeping this variable at its default value of ``public``,
306however, in order to have it set to any arbitrary name or names when connections
307are used automatically, the "SET SESSION search_path" command may be invoked
308for all connections in a pool using the following event handler, as discussed
309at :ref:`schema_set_default_connections`::
310
311 from sqlalchemy import event
312 from sqlalchemy import create_engine
313
314 engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname")
315
316
317 @event.listens_for(engine, "connect", insert=True)
318 def set_search_path(dbapi_connection, connection_record):
319 existing_autocommit = dbapi_connection.autocommit
320 dbapi_connection.autocommit = True
321 cursor = dbapi_connection.cursor()
322 cursor.execute("SET SESSION search_path='%s'" % schema_name)
323 cursor.close()
324 dbapi_connection.autocommit = existing_autocommit
325
326The reason the recipe is complicated by use of the ``.autocommit`` DBAPI
327attribute is so that when the ``SET SESSION search_path`` directive is invoked,
328it is invoked outside of the scope of any transaction and therefore will not
329be reverted when the DBAPI connection has a rollback.
330
331.. seealso::
332
333 :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation
334
335.. _postgresql_schema_reflection:
336
337Remote-Schema Table Introspection and PostgreSQL search_path
338------------------------------------------------------------
339
340.. admonition:: Section Best Practices Summarized
341
342 keep the ``search_path`` variable set to its default of ``public``, without
343 any other schema names. Ensure the username used to connect **does not**
344 match remote schemas, or ensure the ``"$user"`` token is **removed** from
345 ``search_path``. For other schema names, name these explicitly
346 within :class:`_schema.Table` definitions. Alternatively, the
347 ``postgresql_ignore_search_path`` option will cause all reflected
348 :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema`
349 attribute set up.
350
351The PostgreSQL dialect can reflect tables from any schema, as outlined in
352:ref:`metadata_reflection_schemas`.
353
354In all cases, the first thing SQLAlchemy does when reflecting tables is
355to **determine the default schema for the current database connection**.
356It does this using the PostgreSQL ``current_schema()``
357function, illustated below using a PostgreSQL client session (i.e. using
358the ``psql`` tool):
359
360.. sourcecode:: sql
361
362 test=> select current_schema();
363 current_schema
364 ----------------
365 public
366 (1 row)
367
368Above we see that on a plain install of PostgreSQL, the default schema name
369is the name ``public``.
370
371However, if your database username **matches the name of a schema**, PostgreSQL's
372default is to then **use that name as the default schema**. Below, we log in
373using the username ``scott``. When we create a schema named ``scott``, **it
374implicitly changes the default schema**:
375
376.. sourcecode:: sql
377
378 test=> select current_schema();
379 current_schema
380 ----------------
381 public
382 (1 row)
383
384 test=> create schema scott;
385 CREATE SCHEMA
386 test=> select current_schema();
387 current_schema
388 ----------------
389 scott
390 (1 row)
391
392The behavior of ``current_schema()`` is derived from the
393`PostgreSQL search path
394<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
395variable ``search_path``, which in modern PostgreSQL versions defaults to this:
396
397.. sourcecode:: sql
398
399 test=> show search_path;
400 search_path
401 -----------------
402 "$user", public
403 (1 row)
404
405Where above, the ``"$user"`` variable will inject the current username as the
406default schema, if one exists. Otherwise, ``public`` is used.
407
408When a :class:`_schema.Table` object is reflected, if it is present in the
409schema indicated by the ``current_schema()`` function, **the schema name assigned
410to the ".schema" attribute of the Table is the Python "None" value**. Otherwise, the
411".schema" attribute will be assigned the string name of that schema.
412
413With regards to tables which these :class:`_schema.Table`
414objects refer to via foreign key constraint, a decision must be made as to how
415the ``.schema`` is represented in those remote tables, in the case where that
416remote schema name is also a member of the current ``search_path``.
417
418By default, the PostgreSQL dialect mimics the behavior encouraged by
419PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function
420returns a sample definition for a particular foreign key constraint,
421omitting the referenced schema name from that definition when the name is
422also in the PostgreSQL schema search path. The interaction below
423illustrates this behavior:
424
425.. sourcecode:: sql
426
427 test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY);
428 CREATE TABLE
429 test=> CREATE TABLE referring(
430 test(> id INTEGER PRIMARY KEY,
431 test(> referred_id INTEGER REFERENCES test_schema.referred(id));
432 CREATE TABLE
433 test=> SET search_path TO public, test_schema;
434 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
435 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
436 test-> ON n.oid = c.relnamespace
437 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
438 test-> WHERE c.relname='referring' AND r.contype = 'f'
439 test-> ;
440 pg_get_constraintdef
441 ---------------------------------------------------
442 FOREIGN KEY (referred_id) REFERENCES referred(id)
443 (1 row)
444
445Above, we created a table ``referred`` as a member of the remote schema
446``test_schema``, however when we added ``test_schema`` to the
447PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the
448``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of
449the function.
450
451On the other hand, if we set the search path back to the typical default
452of ``public``:
453
454.. sourcecode:: sql
455
456 test=> SET search_path TO public;
457 SET
458
459The same query against ``pg_get_constraintdef()`` now returns the fully
460schema-qualified name for us:
461
462.. sourcecode:: sql
463
464 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
465 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
466 test-> ON n.oid = c.relnamespace
467 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
468 test-> WHERE c.relname='referring' AND r.contype = 'f';
469 pg_get_constraintdef
470 ---------------------------------------------------------------
471 FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id)
472 (1 row)
473
474SQLAlchemy will by default use the return value of ``pg_get_constraintdef()``
475in order to determine the remote schema name. That is, if our ``search_path``
476were set to include ``test_schema``, and we invoked a table
477reflection process as follows::
478
479 >>> from sqlalchemy import Table, MetaData, create_engine, text
480 >>> engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
481 >>> with engine.connect() as conn:
482 ... conn.execute(text("SET search_path TO test_schema, public"))
483 ... metadata_obj = MetaData()
484 ... referring = Table("referring", metadata_obj, autoload_with=conn)
485 <sqlalchemy.engine.result.CursorResult object at 0x101612ed0>
486
487The above process would deliver to the :attr:`_schema.MetaData.tables`
488collection
489``referred`` table named **without** the schema::
490
491 >>> metadata_obj.tables["referred"].schema is None
492 True
493
494To alter the behavior of reflection such that the referred schema is
495maintained regardless of the ``search_path`` setting, use the
496``postgresql_ignore_search_path`` option, which can be specified as a
497dialect-specific argument to both :class:`_schema.Table` as well as
498:meth:`_schema.MetaData.reflect`::
499
500 >>> with engine.connect() as conn:
501 ... conn.execute(text("SET search_path TO test_schema, public"))
502 ... metadata_obj = MetaData()
503 ... referring = Table(
504 ... "referring",
505 ... metadata_obj,
506 ... autoload_with=conn,
507 ... postgresql_ignore_search_path=True,
508 ... )
509 <sqlalchemy.engine.result.CursorResult object at 0x1016126d0>
510
511We will now have ``test_schema.referred`` stored as schema-qualified::
512
513 >>> metadata_obj.tables["test_schema.referred"].schema
514 'test_schema'
515
516.. sidebar:: Best Practices for PostgreSQL Schema reflection
517
518 The description of PostgreSQL schema reflection behavior is complex, and
519 is the product of many years of dealing with widely varied use cases and
520 user preferences. But in fact, there's no need to understand any of it if
521 you just stick to the simplest use pattern: leave the ``search_path`` set
522 to its default of ``public`` only, never refer to the name ``public`` as
523 an explicit schema name otherwise, and refer to all other schema names
524 explicitly when building up a :class:`_schema.Table` object. The options
525 described here are only for those users who can't, or prefer not to, stay
526 within these guidelines.
527
528.. seealso::
529
530 :ref:`reflection_schema_qualified_interaction` - discussion of the issue
531 from a backend-agnostic perspective
532
533 `The Schema Search Path
534 <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
535 - on the PostgreSQL website.
536
537INSERT/UPDATE...RETURNING
538-------------------------
539
540The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and
541``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default
542for single-row INSERT statements in order to fetch newly generated
543primary key identifiers. To specify an explicit ``RETURNING`` clause,
544use the :meth:`._UpdateBase.returning` method on a per-statement basis::
545
546 # INSERT..RETURNING
547 result = (
548 table.insert().returning(table.c.col1, table.c.col2).values(name="foo")
549 )
550 print(result.fetchall())
551
552 # UPDATE..RETURNING
553 result = (
554 table.update()
555 .returning(table.c.col1, table.c.col2)
556 .where(table.c.name == "foo")
557 .values(name="bar")
558 )
559 print(result.fetchall())
560
561 # DELETE..RETURNING
562 result = (
563 table.delete()
564 .returning(table.c.col1, table.c.col2)
565 .where(table.c.name == "foo")
566 )
567 print(result.fetchall())
568
569.. _postgresql_insert_on_conflict:
570
571INSERT...ON CONFLICT (Upsert)
572------------------------------
573
574Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of
575rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A
576candidate row will only be inserted if that row does not violate any unique
577constraints. In the case of a unique constraint violation, a secondary action
578can occur which can be either "DO UPDATE", indicating that the data in the
579target row should be updated, or "DO NOTHING", which indicates to silently skip
580this row.
581
582Conflicts are determined using existing unique constraints and indexes. These
583constraints may be identified either using their name as stated in DDL,
584or they may be inferred by stating the columns and conditions that comprise
585the indexes.
586
587SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific
588:func:`_postgresql.insert()` function, which provides
589the generative methods :meth:`_postgresql.Insert.on_conflict_do_update`
590and :meth:`~.postgresql.Insert.on_conflict_do_nothing`:
591
592.. sourcecode:: pycon+sql
593
594 >>> from sqlalchemy.dialects.postgresql import insert
595 >>> insert_stmt = insert(my_table).values(
596 ... id="some_existing_id", data="inserted value"
597 ... )
598 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"])
599 >>> print(do_nothing_stmt)
600 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
601 ON CONFLICT (id) DO NOTHING
602 {stop}
603
604 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
605 ... constraint="pk_my_table", set_=dict(data="updated value")
606 ... )
607 >>> print(do_update_stmt)
608 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
609 ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s
610
611.. seealso::
612
613 `INSERT .. ON CONFLICT
614 <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_
615 - in the PostgreSQL documentation.
616
617Specifying the Target
618^^^^^^^^^^^^^^^^^^^^^
619
620Both methods supply the "target" of the conflict using either the
621named constraint or by column inference:
622
623* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument
624 specifies a sequence containing string column names, :class:`_schema.Column`
625 objects, and/or SQL expression elements, which would identify a unique
626 index:
627
628 .. sourcecode:: pycon+sql
629
630 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
631 ... index_elements=["id"], set_=dict(data="updated value")
632 ... )
633 >>> print(do_update_stmt)
634 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
635 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
636 {stop}
637
638 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
639 ... index_elements=[my_table.c.id], set_=dict(data="updated value")
640 ... )
641 >>> print(do_update_stmt)
642 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
643 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
644
645* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to
646 infer an index, a partial index can be inferred by also specifying the
647 use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter:
648
649 .. sourcecode:: pycon+sql
650
651 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data")
652 >>> stmt = stmt.on_conflict_do_update(
653 ... index_elements=[my_table.c.user_email],
654 ... index_where=my_table.c.user_email.like("%@gmail.com"),
655 ... set_=dict(data=stmt.excluded.data),
656 ... )
657 >>> print(stmt)
658 {printsql}INSERT INTO my_table (data, user_email)
659 VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email)
660 WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data
661
662* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is
663 used to specify an index directly rather than inferring it. This can be
664 the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX:
665
666 .. sourcecode:: pycon+sql
667
668 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
669 ... constraint="my_table_idx_1", set_=dict(data="updated value")
670 ... )
671 >>> print(do_update_stmt)
672 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
673 ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s
674 {stop}
675
676 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
677 ... constraint="my_table_pk", set_=dict(data="updated value")
678 ... )
679 >>> print(do_update_stmt)
680 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
681 ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s
682 {stop}
683
684* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may
685 also refer to a SQLAlchemy construct representing a constraint,
686 e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`,
687 :class:`.Index`, or :class:`.ExcludeConstraint`. In this use,
688 if the constraint has a name, it is used directly. Otherwise, if the
689 constraint is unnamed, then inference will be used, where the expressions
690 and optional WHERE clause of the constraint will be spelled out in the
691 construct. This use is especially convenient
692 to refer to the named or unnamed primary key of a :class:`_schema.Table`
693 using the
694 :attr:`_schema.Table.primary_key` attribute:
695
696 .. sourcecode:: pycon+sql
697
698 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
699 ... constraint=my_table.primary_key, set_=dict(data="updated value")
700 ... )
701 >>> print(do_update_stmt)
702 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
703 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
704
705The SET Clause
706^^^^^^^^^^^^^^^
707
708``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
709existing row, using any combination of new values as well as values
710from the proposed insertion. These values are specified using the
711:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This
712parameter accepts a dictionary which consists of direct values
713for UPDATE:
714
715.. sourcecode:: pycon+sql
716
717 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
718 >>> do_update_stmt = stmt.on_conflict_do_update(
719 ... index_elements=["id"], set_=dict(data="updated value")
720 ... )
721 >>> print(do_update_stmt)
722 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
723 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
724
725.. warning::
726
727 The :meth:`_expression.Insert.on_conflict_do_update`
728 method does **not** take into
729 account Python-side default UPDATE values or generation functions, e.g.
730 those specified using :paramref:`_schema.Column.onupdate`.
731 These values will not be exercised for an ON CONFLICT style of UPDATE,
732 unless they are manually specified in the
733 :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary.
734
735Updating using the Excluded INSERT Values
736^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
737
738In order to refer to the proposed insertion row, the special alias
739:attr:`~.postgresql.Insert.excluded` is available as an attribute on
740the :class:`_postgresql.Insert` object; this object is a
741:class:`_expression.ColumnCollection`
742which alias contains all columns of the target
743table:
744
745.. sourcecode:: pycon+sql
746
747 >>> stmt = insert(my_table).values(
748 ... id="some_id", data="inserted value", author="jlh"
749 ... )
750 >>> do_update_stmt = stmt.on_conflict_do_update(
751 ... index_elements=["id"],
752 ... set_=dict(data="updated value", author=stmt.excluded.author),
753 ... )
754 >>> print(do_update_stmt)
755 {printsql}INSERT INTO my_table (id, data, author)
756 VALUES (%(id)s, %(data)s, %(author)s)
757 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
758
759Additional WHERE Criteria
760^^^^^^^^^^^^^^^^^^^^^^^^^
761
762The :meth:`_expression.Insert.on_conflict_do_update` method also accepts
763a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where`
764parameter, which will limit those rows which receive an UPDATE:
765
766.. sourcecode:: pycon+sql
767
768 >>> stmt = insert(my_table).values(
769 ... id="some_id", data="inserted value", author="jlh"
770 ... )
771 >>> on_update_stmt = stmt.on_conflict_do_update(
772 ... index_elements=["id"],
773 ... set_=dict(data="updated value", author=stmt.excluded.author),
774 ... where=(my_table.c.status == 2),
775 ... )
776 >>> print(on_update_stmt)
777 {printsql}INSERT INTO my_table (id, data, author)
778 VALUES (%(id)s, %(data)s, %(author)s)
779 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
780 WHERE my_table.status = %(status_1)s
781
782Skipping Rows with DO NOTHING
783^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
784
785``ON CONFLICT`` may be used to skip inserting a row entirely
786if any conflict with a unique or exclusion constraint occurs; below
787this is illustrated using the
788:meth:`~.postgresql.Insert.on_conflict_do_nothing` method:
789
790.. sourcecode:: pycon+sql
791
792 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
793 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
794 >>> print(stmt)
795 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
796 ON CONFLICT (id) DO NOTHING
797
798If ``DO NOTHING`` is used without specifying any columns or constraint,
799it has the effect of skipping the INSERT for any unique or exclusion
800constraint violation which occurs:
801
802.. sourcecode:: pycon+sql
803
804 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
805 >>> stmt = stmt.on_conflict_do_nothing()
806 >>> print(stmt)
807 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
808 ON CONFLICT DO NOTHING
809
810.. _postgresql_match:
811
812Full Text Search
813----------------
814
815PostgreSQL's full text search system is available through the use of the
816:data:`.func` namespace, combined with the use of custom operators
817via the :meth:`.Operators.bool_op` method. For simple cases with some
818degree of cross-backend compatibility, the :meth:`.Operators.match` operator
819may also be used.
820
821.. _postgresql_simple_match:
822
823Simple plain text matching with ``match()``
824^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
825
826The :meth:`.Operators.match` operator provides for cross-compatible simple
827text matching. For the PostgreSQL backend, it's hardcoded to generate
828an expression using the ``@@`` operator in conjunction with the
829``plainto_tsquery()`` PostgreSQL function.
830
831On the PostgreSQL dialect, an expression like the following::
832
833 select(sometable.c.text.match("search string"))
834
835would emit to the database:
836
837.. sourcecode:: sql
838
839 SELECT text @@ plainto_tsquery('search string') FROM table
840
841Above, passing a plain string to :meth:`.Operators.match` will automatically
842make use of ``plainto_tsquery()`` to specify the type of tsquery. This
843establishes basic database cross-compatibility for :meth:`.Operators.match`
844with other backends.
845
846.. versionchanged:: 2.0 The default tsquery generation function used by the
847 PostgreSQL dialect with :meth:`.Operators.match` is ``plainto_tsquery()``.
848
849 To render exactly what was rendered in 1.4, use the following form::
850
851 from sqlalchemy import func
852
853 select(sometable.c.text.bool_op("@@")(func.to_tsquery("search string")))
854
855 Which would emit:
856
857 .. sourcecode:: sql
858
859 SELECT text @@ to_tsquery('search string') FROM table
860
861Using PostgreSQL full text functions and operators directly
862^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
863
864Text search operations beyond the simple use of :meth:`.Operators.match`
865may make use of the :data:`.func` namespace to generate PostgreSQL full-text
866functions, in combination with :meth:`.Operators.bool_op` to generate
867any boolean operator.
868
869For example, the query::
870
871 select(func.to_tsquery("cat").bool_op("@>")(func.to_tsquery("cat & rat")))
872
873would generate:
874
875.. sourcecode:: sql
876
877 SELECT to_tsquery('cat') @> to_tsquery('cat & rat')
878
879
880The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST::
881
882 from sqlalchemy.dialects.postgresql import TSVECTOR
883 from sqlalchemy import select, cast
884
885 select(cast("some text", TSVECTOR))
886
887produces a statement equivalent to:
888
889.. sourcecode:: sql
890
891 SELECT CAST('some text' AS TSVECTOR) AS anon_1
892
893The ``func`` namespace is augmented by the PostgreSQL dialect to set up
894correct argument and return types for most full text search functions.
895These functions are used automatically by the :attr:`_sql.func` namespace
896assuming the ``sqlalchemy.dialects.postgresql`` package has been imported,
897or :func:`_sa.create_engine` has been invoked using a ``postgresql``
898dialect. These functions are documented at:
899
900* :class:`_postgresql.to_tsvector`
901* :class:`_postgresql.to_tsquery`
902* :class:`_postgresql.plainto_tsquery`
903* :class:`_postgresql.phraseto_tsquery`
904* :class:`_postgresql.websearch_to_tsquery`
905* :class:`_postgresql.ts_headline`
906
907Specifying the "regconfig" with ``match()`` or custom operators
908^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
909
910PostgreSQL's ``plainto_tsquery()`` function accepts an optional
911"regconfig" argument that is used to instruct PostgreSQL to use a
912particular pre-computed GIN or GiST index in order to perform the search.
913When using :meth:`.Operators.match`, this additional parameter may be
914specified using the ``postgresql_regconfig`` parameter, such as::
915
916 select(mytable.c.id).where(
917 mytable.c.title.match("somestring", postgresql_regconfig="english")
918 )
919
920Which would emit:
921
922.. sourcecode:: sql
923
924 SELECT mytable.id FROM mytable
925 WHERE mytable.title @@ plainto_tsquery('english', 'somestring')
926
927When using other PostgreSQL search functions with :data:`.func`, the
928"regconfig" parameter may be passed directly as the initial argument::
929
930 select(mytable.c.id).where(
931 func.to_tsvector("english", mytable.c.title).bool_op("@@")(
932 func.to_tsquery("english", "somestring")
933 )
934 )
935
936produces a statement equivalent to:
937
938.. sourcecode:: sql
939
940 SELECT mytable.id FROM mytable
941 WHERE to_tsvector('english', mytable.title) @@
942 to_tsquery('english', 'somestring')
943
944It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from
945PostgreSQL to ensure that you are generating queries with SQLAlchemy that
946take full advantage of any indexes you may have created for full text search.
947
948.. seealso::
949
950 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation
951
952
953FROM ONLY ...
954-------------
955
956The dialect supports PostgreSQL's ONLY keyword for targeting only a particular
957table in an inheritance hierarchy. This can be used to produce the
958``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...``
959syntaxes. It uses SQLAlchemy's hints mechanism::
960
961 # SELECT ... FROM ONLY ...
962 result = table.select().with_hint(table, "ONLY", "postgresql")
963 print(result.fetchall())
964
965 # UPDATE ONLY ...
966 table.update(values=dict(foo="bar")).with_hint(
967 "ONLY", dialect_name="postgresql"
968 )
969
970 # DELETE FROM ONLY ...
971 table.delete().with_hint("ONLY", dialect_name="postgresql")
972
973.. _postgresql_indexes:
974
975PostgreSQL-Specific Index Options
976---------------------------------
977
978Several extensions to the :class:`.Index` construct are available, specific
979to the PostgreSQL dialect.
980
981.. _postgresql_covering_indexes:
982
983Covering Indexes
984^^^^^^^^^^^^^^^^
985
986The ``postgresql_include`` option renders INCLUDE(colname) for the given
987string names::
988
989 Index("my_index", table.c.x, postgresql_include=["y"])
990
991would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
992
993Note that this feature requires PostgreSQL 11 or later.
994
995.. seealso::
996
997 :ref:`postgresql_constraint_options`
998
999.. versionadded:: 1.4
1000
1001.. _postgresql_partial_indexes:
1002
1003Partial Indexes
1004^^^^^^^^^^^^^^^
1005
1006Partial indexes add criterion to the index definition so that the index is
1007applied to a subset of rows. These can be specified on :class:`.Index`
1008using the ``postgresql_where`` keyword argument::
1009
1010 Index("my_index", my_table.c.id, postgresql_where=my_table.c.value > 10)
1011
1012.. _postgresql_operator_classes:
1013
1014Operator Classes
1015^^^^^^^^^^^^^^^^
1016
1017PostgreSQL allows the specification of an *operator class* for each column of
1018an index (see
1019https://www.postgresql.org/docs/current/interactive/indexes-opclass.html).
1020The :class:`.Index` construct allows these to be specified via the
1021``postgresql_ops`` keyword argument::
1022
1023 Index(
1024 "my_index",
1025 my_table.c.id,
1026 my_table.c.data,
1027 postgresql_ops={"data": "text_pattern_ops", "id": "int4_ops"},
1028 )
1029
1030Note that the keys in the ``postgresql_ops`` dictionaries are the
1031"key" name of the :class:`_schema.Column`, i.e. the name used to access it from
1032the ``.c`` collection of :class:`_schema.Table`, which can be configured to be
1033different than the actual name of the column as expressed in the database.
1034
1035If ``postgresql_ops`` is to be used against a complex SQL expression such
1036as a function call, then to apply to the column it must be given a label
1037that is identified in the dictionary by name, e.g.::
1038
1039 Index(
1040 "my_index",
1041 my_table.c.id,
1042 func.lower(my_table.c.data).label("data_lower"),
1043 postgresql_ops={"data_lower": "text_pattern_ops", "id": "int4_ops"},
1044 )
1045
1046Operator classes are also supported by the
1047:class:`_postgresql.ExcludeConstraint` construct using the
1048:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for
1049details.
1050
1051.. versionadded:: 1.3.21 added support for operator classes with
1052 :class:`_postgresql.ExcludeConstraint`.
1053
1054
1055Index Types
1056^^^^^^^^^^^
1057
1058PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well
1059as the ability for users to create their own (see
1060https://www.postgresql.org/docs/current/static/indexes-types.html). These can be
1061specified on :class:`.Index` using the ``postgresql_using`` keyword argument::
1062
1063 Index("my_index", my_table.c.data, postgresql_using="gin")
1064
1065The value passed to the keyword argument will be simply passed through to the
1066underlying CREATE INDEX command, so it *must* be a valid index type for your
1067version of PostgreSQL.
1068
1069.. _postgresql_index_storage:
1070
1071Index Storage Parameters
1072^^^^^^^^^^^^^^^^^^^^^^^^
1073
1074PostgreSQL allows storage parameters to be set on indexes. The storage
1075parameters available depend on the index method used by the index. Storage
1076parameters can be specified on :class:`.Index` using the ``postgresql_with``
1077keyword argument::
1078
1079 Index("my_index", my_table.c.data, postgresql_with={"fillfactor": 50})
1080
1081PostgreSQL allows to define the tablespace in which to create the index.
1082The tablespace can be specified on :class:`.Index` using the
1083``postgresql_tablespace`` keyword argument::
1084
1085 Index("my_index", my_table.c.data, postgresql_tablespace="my_tablespace")
1086
1087Note that the same option is available on :class:`_schema.Table` as well.
1088
1089.. _postgresql_index_concurrently:
1090
1091Indexes with CONCURRENTLY
1092^^^^^^^^^^^^^^^^^^^^^^^^^
1093
1094The PostgreSQL index option CONCURRENTLY is supported by passing the
1095flag ``postgresql_concurrently`` to the :class:`.Index` construct::
1096
1097 tbl = Table("testtbl", m, Column("data", Integer))
1098
1099 idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True)
1100
1101The above index construct will render DDL for CREATE INDEX, assuming
1102PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as:
1103
1104.. sourcecode:: sql
1105
1106 CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)
1107
1108For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for
1109a connection-less dialect, it will emit:
1110
1111.. sourcecode:: sql
1112
1113 DROP INDEX CONCURRENTLY test_idx1
1114
1115When using CONCURRENTLY, the PostgreSQL database requires that the statement
1116be invoked outside of a transaction block. The Python DBAPI enforces that
1117even for a single statement, a transaction is present, so to use this
1118construct, the DBAPI's "autocommit" mode must be used::
1119
1120 metadata = MetaData()
1121 table = Table("foo", metadata, Column("id", String))
1122 index = Index("foo_idx", table.c.id, postgresql_concurrently=True)
1123
1124 with engine.connect() as conn:
1125 with conn.execution_options(isolation_level="AUTOCOMMIT"):
1126 table.create(conn)
1127
1128.. seealso::
1129
1130 :ref:`postgresql_isolation_level`
1131
1132.. _postgresql_index_reflection:
1133
1134PostgreSQL Index Reflection
1135---------------------------
1136
1137The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the
1138UNIQUE CONSTRAINT construct is used. When inspecting a table using
1139:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes`
1140and the :meth:`_reflection.Inspector.get_unique_constraints`
1141will report on these
1142two constructs distinctly; in the case of the index, the key
1143``duplicates_constraint`` will be present in the index entry if it is
1144detected as mirroring a constraint. When performing reflection using
1145``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned
1146in :attr:`_schema.Table.indexes` when it is detected as mirroring a
1147:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection
1148.
1149
1150Special Reflection Options
1151--------------------------
1152
1153The :class:`_reflection.Inspector`
1154used for the PostgreSQL backend is an instance
1155of :class:`.PGInspector`, which offers additional methods::
1156
1157 from sqlalchemy import create_engine, inspect
1158
1159 engine = create_engine("postgresql+psycopg2://localhost/test")
1160 insp = inspect(engine) # will be a PGInspector
1161
1162 print(insp.get_enums())
1163
1164.. autoclass:: PGInspector
1165 :members:
1166
1167.. _postgresql_table_options:
1168
1169PostgreSQL Table Options
1170------------------------
1171
1172Several options for CREATE TABLE are supported directly by the PostgreSQL
1173dialect in conjunction with the :class:`_schema.Table` construct:
1174
1175* ``INHERITS``::
1176
1177 Table("some_table", metadata, ..., postgresql_inherits="some_supertable")
1178
1179 Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...))
1180
1181* ``ON COMMIT``::
1182
1183 Table("some_table", metadata, ..., postgresql_on_commit="PRESERVE ROWS")
1184
1185*
1186 ``PARTITION BY``::
1187
1188 Table(
1189 "some_table",
1190 metadata,
1191 ...,
1192 postgresql_partition_by="LIST (part_column)",
1193 )
1194
1195 .. versionadded:: 1.2.6
1196
1197*
1198 ``TABLESPACE``::
1199
1200 Table("some_table", metadata, ..., postgresql_tablespace="some_tablespace")
1201
1202 The above option is also available on the :class:`.Index` construct.
1203
1204*
1205 ``USING``::
1206
1207 Table("some_table", metadata, ..., postgresql_using="heap")
1208
1209 .. versionadded:: 2.0.26
1210
1211* ``WITH OIDS``::
1212
1213 Table("some_table", metadata, ..., postgresql_with_oids=True)
1214
1215* ``WITHOUT OIDS``::
1216
1217 Table("some_table", metadata, ..., postgresql_with_oids=False)
1218
1219.. seealso::
1220
1221 `PostgreSQL CREATE TABLE options
1222 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ -
1223 in the PostgreSQL documentation.
1224
1225.. _postgresql_constraint_options:
1226
1227PostgreSQL Constraint Options
1228-----------------------------
1229
1230The following option(s) are supported by the PostgreSQL dialect in conjunction
1231with selected constraint constructs:
1232
1233* ``NOT VALID``: This option applies towards CHECK and FOREIGN KEY constraints
1234 when the constraint is being added to an existing table via ALTER TABLE,
1235 and has the effect that existing rows are not scanned during the ALTER
1236 operation against the constraint being added.
1237
1238 When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_
1239 that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument
1240 may be specified as an additional keyword argument within the operation
1241 that creates the constraint, as in the following Alembic example::
1242
1243 def update():
1244 op.create_foreign_key(
1245 "fk_user_address",
1246 "address",
1247 "user",
1248 ["user_id"],
1249 ["id"],
1250 postgresql_not_valid=True,
1251 )
1252
1253 The keyword is ultimately accepted directly by the
1254 :class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint`
1255 and :class:`_schema.ForeignKey` constructs; when using a tool like
1256 Alembic, dialect-specific keyword arguments are passed through to
1257 these constructs from the migration operation directives::
1258
1259 CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True)
1260
1261 ForeignKeyConstraint(
1262 ["some_id"], ["some_table.some_id"], postgresql_not_valid=True
1263 )
1264
1265 .. versionadded:: 1.4.32
1266
1267 .. seealso::
1268
1269 `PostgreSQL ALTER TABLE options
1270 <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ -
1271 in the PostgreSQL documentation.
1272
1273* ``INCLUDE``: This option adds one or more columns as a "payload" to the
1274 unique index created automatically by PostgreSQL for the constraint.
1275 For example, the following table definition::
1276
1277 Table(
1278 "mytable",
1279 metadata,
1280 Column("id", Integer, nullable=False),
1281 Column("value", Integer, nullable=False),
1282 UniqueConstraint("id", postgresql_include=["value"]),
1283 )
1284
1285 would produce the DDL statement
1286
1287 .. sourcecode:: sql
1288
1289 CREATE TABLE mytable (
1290 id INTEGER NOT NULL,
1291 value INTEGER NOT NULL,
1292 UNIQUE (id) INCLUDE (value)
1293 )
1294
1295 Note that this feature requires PostgreSQL 11 or later.
1296
1297 .. versionadded:: 2.0.41
1298
1299 .. seealso::
1300
1301 :ref:`postgresql_covering_indexes`
1302
1303 .. seealso::
1304
1305 `PostgreSQL CREATE TABLE options
1306 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ -
1307 in the PostgreSQL documentation.
1308
1309* Column list with foreign key ``ON DELETE SET`` actions: This applies to
1310 :class:`.ForeignKey` and :class:`.ForeignKeyConstraint`, the :paramref:`.ForeignKey.ondelete`
1311 parameter will accept on the PostgreSQL backend only a string list of column
1312 names inside parenthesis, following the ``SET NULL`` or ``SET DEFAULT``
1313 phrases, which will limit the set of columns that are subject to the
1314 action::
1315
1316 fktable = Table(
1317 "fktable",
1318 metadata,
1319 Column("tid", Integer),
1320 Column("id", Integer),
1321 Column("fk_id_del_set_null", Integer),
1322 ForeignKeyConstraint(
1323 columns=["tid", "fk_id_del_set_null"],
1324 refcolumns=[pktable.c.tid, pktable.c.id],
1325 ondelete="SET NULL (fk_id_del_set_null)",
1326 ),
1327 )
1328
1329 .. versionadded:: 2.0.40
1330
1331
1332.. _postgresql_table_valued_overview:
1333
1334Table values, Table and Column valued functions, Row and Tuple objects
1335-----------------------------------------------------------------------
1336
1337PostgreSQL makes great use of modern SQL forms such as table-valued functions,
1338tables and rows as values. These constructs are commonly used as part
1339of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other
1340datatypes. SQLAlchemy's SQL expression language has native support for
1341most table-valued and row-valued forms.
1342
1343.. _postgresql_table_valued:
1344
1345Table-Valued Functions
1346^^^^^^^^^^^^^^^^^^^^^^^
1347
1348Many PostgreSQL built-in functions are intended to be used in the FROM clause
1349of a SELECT statement, and are capable of returning table rows or sets of table
1350rows. A large portion of PostgreSQL's JSON functions for example such as
1351``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``,
1352``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such
1353forms. These classes of SQL function calling forms in SQLAlchemy are available
1354using the :meth:`_functions.FunctionElement.table_valued` method in conjunction
1355with :class:`_functions.Function` objects generated from the :data:`_sql.func`
1356namespace.
1357
1358Examples from PostgreSQL's reference documentation follow below:
1359
1360* ``json_each()``:
1361
1362 .. sourcecode:: pycon+sql
1363
1364 >>> from sqlalchemy import select, func
1365 >>> stmt = select(
1366 ... func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value")
1367 ... )
1368 >>> print(stmt)
1369 {printsql}SELECT anon_1.key, anon_1.value
1370 FROM json_each(:json_each_1) AS anon_1
1371
1372* ``json_populate_record()``:
1373
1374 .. sourcecode:: pycon+sql
1375
1376 >>> from sqlalchemy import select, func, literal_column
1377 >>> stmt = select(
1378 ... func.json_populate_record(
1379 ... literal_column("null::myrowtype"), '{"a":1,"b":2}'
1380 ... ).table_valued("a", "b", name="x")
1381 ... )
1382 >>> print(stmt)
1383 {printsql}SELECT x.a, x.b
1384 FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x
1385
1386* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived
1387 columns in the alias, where we may make use of :func:`_sql.column` elements with
1388 types to produce them. The :meth:`_functions.FunctionElement.table_valued`
1389 method produces a :class:`_sql.TableValuedAlias` construct, and the method
1390 :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived
1391 columns specification:
1392
1393 .. sourcecode:: pycon+sql
1394
1395 >>> from sqlalchemy import select, func, column, Integer, Text
1396 >>> stmt = select(
1397 ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}')
1398 ... .table_valued(
1399 ... column("a", Integer),
1400 ... column("b", Text),
1401 ... column("d", Text),
1402 ... )
1403 ... .render_derived(name="x", with_types=True)
1404 ... )
1405 >>> print(stmt)
1406 {printsql}SELECT x.a, x.b, x.d
1407 FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT)
1408
1409* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an
1410 ordinal counter to the output of a function and is accepted by a limited set
1411 of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The
1412 :meth:`_functions.FunctionElement.table_valued` method accepts a keyword
1413 parameter ``with_ordinality`` for this purpose, which accepts the string name
1414 that will be applied to the "ordinality" column:
1415
1416 .. sourcecode:: pycon+sql
1417
1418 >>> from sqlalchemy import select, func
1419 >>> stmt = select(
1420 ... func.generate_series(4, 1, -1)
1421 ... .table_valued("value", with_ordinality="ordinality")
1422 ... .render_derived()
1423 ... )
1424 >>> print(stmt)
1425 {printsql}SELECT anon_1.value, anon_1.ordinality
1426 FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3)
1427 WITH ORDINALITY AS anon_1(value, ordinality)
1428
1429.. versionadded:: 1.4.0b2
1430
1431.. seealso::
1432
1433 :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial`
1434
1435.. _postgresql_column_valued:
1436
1437Column Valued Functions
1438^^^^^^^^^^^^^^^^^^^^^^^
1439
1440Similar to the table valued function, a column valued function is present
1441in the FROM clause, but delivers itself to the columns clause as a single
1442scalar value. PostgreSQL functions such as ``json_array_elements()``,
1443``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the
1444:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`:
1445
1446* ``json_array_elements()``:
1447
1448 .. sourcecode:: pycon+sql
1449
1450 >>> from sqlalchemy import select, func
1451 >>> stmt = select(
1452 ... func.json_array_elements('["one", "two"]').column_valued("x")
1453 ... )
1454 >>> print(stmt)
1455 {printsql}SELECT x
1456 FROM json_array_elements(:json_array_elements_1) AS x
1457
1458* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the
1459 :func:`_postgresql.array` construct may be used:
1460
1461 .. sourcecode:: pycon+sql
1462
1463 >>> from sqlalchemy.dialects.postgresql import array
1464 >>> from sqlalchemy import select, func
1465 >>> stmt = select(func.unnest(array([1, 2])).column_valued())
1466 >>> print(stmt)
1467 {printsql}SELECT anon_1
1468 FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1
1469
1470 The function can of course be used against an existing table-bound column
1471 that's of type :class:`_types.ARRAY`:
1472
1473 .. sourcecode:: pycon+sql
1474
1475 >>> from sqlalchemy import table, column, ARRAY, Integer
1476 >>> from sqlalchemy import select, func
1477 >>> t = table("t", column("value", ARRAY(Integer)))
1478 >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value"))
1479 >>> print(stmt)
1480 {printsql}SELECT unnested_value
1481 FROM unnest(t.value) AS unnested_value
1482
1483.. seealso::
1484
1485 :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial`
1486
1487
1488Row Types
1489^^^^^^^^^
1490
1491Built-in support for rendering a ``ROW`` may be approximated using
1492``func.ROW`` with the :attr:`_sa.func` namespace, or by using the
1493:func:`_sql.tuple_` construct:
1494
1495.. sourcecode:: pycon+sql
1496
1497 >>> from sqlalchemy import table, column, func, tuple_
1498 >>> t = table("t", column("id"), column("fk"))
1499 >>> stmt = (
1500 ... t.select()
1501 ... .where(tuple_(t.c.id, t.c.fk) > (1, 2))
1502 ... .where(func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7))
1503 ... )
1504 >>> print(stmt)
1505 {printsql}SELECT t.id, t.fk
1506 FROM t
1507 WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2)
1508
1509.. seealso::
1510
1511 `PostgreSQL Row Constructors
1512 <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_
1513
1514 `PostgreSQL Row Constructor Comparison
1515 <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_
1516
1517Table Types passed to Functions
1518^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1519
1520PostgreSQL supports passing a table as an argument to a function, which is
1521known as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects
1522such as :class:`_schema.Table` support this special form using the
1523:meth:`_sql.FromClause.table_valued` method, which is comparable to the
1524:meth:`_functions.FunctionElement.table_valued` method except that the collection
1525of columns is already established by that of the :class:`_sql.FromClause`
1526itself:
1527
1528.. sourcecode:: pycon+sql
1529
1530 >>> from sqlalchemy import table, column, func, select
1531 >>> a = table("a", column("id"), column("x"), column("y"))
1532 >>> stmt = select(func.row_to_json(a.table_valued()))
1533 >>> print(stmt)
1534 {printsql}SELECT row_to_json(a) AS row_to_json_1
1535 FROM a
1536
1537.. versionadded:: 1.4.0b2
1538
1539
1540
1541""" # noqa: E501
1542
1543from __future__ import annotations
1544
1545from collections import defaultdict
1546from functools import lru_cache
1547import re
1548from typing import Any
1549from typing import cast
1550from typing import Dict
1551from typing import List
1552from typing import Optional
1553from typing import Tuple
1554from typing import TYPE_CHECKING
1555from typing import Union
1556
1557from . import arraylib as _array
1558from . import json as _json
1559from . import pg_catalog
1560from . import ranges as _ranges
1561from .ext import _regconfig_fn
1562from .ext import aggregate_order_by
1563from .hstore import HSTORE
1564from .named_types import CreateDomainType as CreateDomainType # noqa: F401
1565from .named_types import CreateEnumType as CreateEnumType # noqa: F401
1566from .named_types import DOMAIN as DOMAIN # noqa: F401
1567from .named_types import DropDomainType as DropDomainType # noqa: F401
1568from .named_types import DropEnumType as DropEnumType # noqa: F401
1569from .named_types import ENUM as ENUM # noqa: F401
1570from .named_types import NamedType as NamedType # noqa: F401
1571from .types import _DECIMAL_TYPES # noqa: F401
1572from .types import _FLOAT_TYPES # noqa: F401
1573from .types import _INT_TYPES # noqa: F401
1574from .types import BIT as BIT
1575from .types import BYTEA as BYTEA
1576from .types import CIDR as CIDR
1577from .types import CITEXT as CITEXT
1578from .types import INET as INET
1579from .types import INTERVAL as INTERVAL
1580from .types import MACADDR as MACADDR
1581from .types import MACADDR8 as MACADDR8
1582from .types import MONEY as MONEY
1583from .types import OID as OID
1584from .types import PGBit as PGBit # noqa: F401
1585from .types import PGCidr as PGCidr # noqa: F401
1586from .types import PGInet as PGInet # noqa: F401
1587from .types import PGInterval as PGInterval # noqa: F401
1588from .types import PGMacAddr as PGMacAddr # noqa: F401
1589from .types import PGMacAddr8 as PGMacAddr8 # noqa: F401
1590from .types import PGUuid as PGUuid
1591from .types import REGCLASS as REGCLASS
1592from .types import REGCONFIG as REGCONFIG # noqa: F401
1593from .types import TIME as TIME
1594from .types import TIMESTAMP as TIMESTAMP
1595from .types import TSVECTOR as TSVECTOR
1596from ... import exc
1597from ... import schema
1598from ... import select
1599from ... import sql
1600from ... import util
1601from ...engine import characteristics
1602from ...engine import default
1603from ...engine import interfaces
1604from ...engine import ObjectKind
1605from ...engine import ObjectScope
1606from ...engine import reflection
1607from ...engine import URL
1608from ...engine.reflection import ReflectionDefaults
1609from ...sql import bindparam
1610from ...sql import coercions
1611from ...sql import compiler
1612from ...sql import elements
1613from ...sql import expression
1614from ...sql import roles
1615from ...sql import sqltypes
1616from ...sql import util as sql_util
1617from ...sql.compiler import InsertmanyvaluesSentinelOpts
1618from ...sql.visitors import InternalTraversal
1619from ...types import BIGINT
1620from ...types import BOOLEAN
1621from ...types import CHAR
1622from ...types import DATE
1623from ...types import DOUBLE_PRECISION
1624from ...types import FLOAT
1625from ...types import INTEGER
1626from ...types import NUMERIC
1627from ...types import REAL
1628from ...types import SMALLINT
1629from ...types import TEXT
1630from ...types import UUID as UUID
1631from ...types import VARCHAR
1632from ...util.typing import TypedDict
1633
1634IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I)
1635
1636RESERVED_WORDS = {
1637 "all",
1638 "analyse",
1639 "analyze",
1640 "and",
1641 "any",
1642 "array",
1643 "as",
1644 "asc",
1645 "asymmetric",
1646 "both",
1647 "case",
1648 "cast",
1649 "check",
1650 "collate",
1651 "column",
1652 "constraint",
1653 "create",
1654 "current_catalog",
1655 "current_date",
1656 "current_role",
1657 "current_time",
1658 "current_timestamp",
1659 "current_user",
1660 "default",
1661 "deferrable",
1662 "desc",
1663 "distinct",
1664 "do",
1665 "else",
1666 "end",
1667 "except",
1668 "false",
1669 "fetch",
1670 "for",
1671 "foreign",
1672 "from",
1673 "grant",
1674 "group",
1675 "having",
1676 "in",
1677 "initially",
1678 "intersect",
1679 "into",
1680 "leading",
1681 "limit",
1682 "localtime",
1683 "localtimestamp",
1684 "new",
1685 "not",
1686 "null",
1687 "of",
1688 "off",
1689 "offset",
1690 "old",
1691 "on",
1692 "only",
1693 "or",
1694 "order",
1695 "placing",
1696 "primary",
1697 "references",
1698 "returning",
1699 "select",
1700 "session_user",
1701 "some",
1702 "symmetric",
1703 "table",
1704 "then",
1705 "to",
1706 "trailing",
1707 "true",
1708 "union",
1709 "unique",
1710 "user",
1711 "using",
1712 "variadic",
1713 "when",
1714 "where",
1715 "window",
1716 "with",
1717 "authorization",
1718 "between",
1719 "binary",
1720 "cross",
1721 "current_schema",
1722 "freeze",
1723 "full",
1724 "ilike",
1725 "inner",
1726 "is",
1727 "isnull",
1728 "join",
1729 "left",
1730 "like",
1731 "natural",
1732 "notnull",
1733 "outer",
1734 "over",
1735 "overlaps",
1736 "right",
1737 "similar",
1738 "verbose",
1739}
1740
1741
1742colspecs = {
1743 sqltypes.ARRAY: _array.ARRAY,
1744 sqltypes.Interval: INTERVAL,
1745 sqltypes.Enum: ENUM,
1746 sqltypes.JSON.JSONPathType: _json.JSONPATH,
1747 sqltypes.JSON: _json.JSON,
1748 sqltypes.Uuid: PGUuid,
1749}
1750
1751
1752ischema_names = {
1753 "_array": _array.ARRAY,
1754 "hstore": HSTORE,
1755 "json": _json.JSON,
1756 "jsonb": _json.JSONB,
1757 "int4range": _ranges.INT4RANGE,
1758 "int8range": _ranges.INT8RANGE,
1759 "numrange": _ranges.NUMRANGE,
1760 "daterange": _ranges.DATERANGE,
1761 "tsrange": _ranges.TSRANGE,
1762 "tstzrange": _ranges.TSTZRANGE,
1763 "int4multirange": _ranges.INT4MULTIRANGE,
1764 "int8multirange": _ranges.INT8MULTIRANGE,
1765 "nummultirange": _ranges.NUMMULTIRANGE,
1766 "datemultirange": _ranges.DATEMULTIRANGE,
1767 "tsmultirange": _ranges.TSMULTIRANGE,
1768 "tstzmultirange": _ranges.TSTZMULTIRANGE,
1769 "integer": INTEGER,
1770 "bigint": BIGINT,
1771 "smallint": SMALLINT,
1772 "character varying": VARCHAR,
1773 "character": CHAR,
1774 '"char"': sqltypes.String,
1775 "name": sqltypes.String,
1776 "text": TEXT,
1777 "numeric": NUMERIC,
1778 "float": FLOAT,
1779 "real": REAL,
1780 "inet": INET,
1781 "cidr": CIDR,
1782 "citext": CITEXT,
1783 "uuid": UUID,
1784 "bit": BIT,
1785 "bit varying": BIT,
1786 "macaddr": MACADDR,
1787 "macaddr8": MACADDR8,
1788 "money": MONEY,
1789 "oid": OID,
1790 "regclass": REGCLASS,
1791 "double precision": DOUBLE_PRECISION,
1792 "timestamp": TIMESTAMP,
1793 "timestamp with time zone": TIMESTAMP,
1794 "timestamp without time zone": TIMESTAMP,
1795 "time with time zone": TIME,
1796 "time without time zone": TIME,
1797 "date": DATE,
1798 "time": TIME,
1799 "bytea": BYTEA,
1800 "boolean": BOOLEAN,
1801 "interval": INTERVAL,
1802 "tsvector": TSVECTOR,
1803}
1804
1805
1806class PGCompiler(compiler.SQLCompiler):
1807 def visit_to_tsvector_func(self, element, **kw):
1808 return self._assert_pg_ts_ext(element, **kw)
1809
1810 def visit_to_tsquery_func(self, element, **kw):
1811 return self._assert_pg_ts_ext(element, **kw)
1812
1813 def visit_plainto_tsquery_func(self, element, **kw):
1814 return self._assert_pg_ts_ext(element, **kw)
1815
1816 def visit_phraseto_tsquery_func(self, element, **kw):
1817 return self._assert_pg_ts_ext(element, **kw)
1818
1819 def visit_websearch_to_tsquery_func(self, element, **kw):
1820 return self._assert_pg_ts_ext(element, **kw)
1821
1822 def visit_ts_headline_func(self, element, **kw):
1823 return self._assert_pg_ts_ext(element, **kw)
1824
1825 def _assert_pg_ts_ext(self, element, **kw):
1826 if not isinstance(element, _regconfig_fn):
1827 # other options here include trying to rewrite the function
1828 # with the correct types. however, that means we have to
1829 # "un-SQL-ize" the first argument, which can't work in a
1830 # generalized way. Also, parent compiler class has already added
1831 # the incorrect return type to the result map. So let's just
1832 # make sure the function we want is used up front.
1833
1834 raise exc.CompileError(
1835 f'Can\'t compile "{element.name}()" full text search '
1836 f"function construct that does not originate from the "
1837 f'"sqlalchemy.dialects.postgresql" package. '
1838 f'Please ensure "import sqlalchemy.dialects.postgresql" is '
1839 f"called before constructing "
1840 f'"sqlalchemy.func.{element.name}()" to ensure registration '
1841 f"of the correct argument and return types."
1842 )
1843
1844 return f"{element.name}{self.function_argspec(element, **kw)}"
1845
1846 def render_bind_cast(self, type_, dbapi_type, sqltext):
1847 if dbapi_type._type_affinity is sqltypes.String and dbapi_type.length:
1848 # use VARCHAR with no length for VARCHAR cast.
1849 # see #9511
1850 dbapi_type = sqltypes.STRINGTYPE
1851 return f"""{sqltext}::{
1852 self.dialect.type_compiler_instance.process(
1853 dbapi_type, identifier_preparer=self.preparer
1854 )
1855 }"""
1856
1857 def visit_array(self, element, **kw):
1858 if not element.clauses and not element.type.item_type._isnull:
1859 return "ARRAY[]::%s" % element.type.compile(self.dialect)
1860 return "ARRAY[%s]" % self.visit_clauselist(element, **kw)
1861
1862 def visit_slice(self, element, **kw):
1863 return "%s:%s" % (
1864 self.process(element.start, **kw),
1865 self.process(element.stop, **kw),
1866 )
1867
1868 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1869 return self._generate_generic_binary(binary, " # ", **kw)
1870
1871 def visit_json_getitem_op_binary(
1872 self, binary, operator, _cast_applied=False, **kw
1873 ):
1874 if (
1875 not _cast_applied
1876 and binary.type._type_affinity is not sqltypes.JSON
1877 ):
1878 kw["_cast_applied"] = True
1879 return self.process(sql.cast(binary, binary.type), **kw)
1880
1881 kw["eager_grouping"] = True
1882
1883 if (
1884 not _cast_applied
1885 and isinstance(binary.left.type, _json.JSONB)
1886 and self.dialect._supports_jsonb_subscripting
1887 ):
1888 # for pg14+JSONB use subscript notation: col['key'] instead
1889 # of col -> 'key'
1890 return "%s[%s]" % (
1891 self.process(binary.left, **kw),
1892 self.process(binary.right, **kw),
1893 )
1894 else:
1895 # Fall back to arrow notation for older versions or when cast
1896 # is applied
1897 return self._generate_generic_binary(
1898 binary, " -> " if not _cast_applied else " ->> ", **kw
1899 )
1900
1901 def visit_json_path_getitem_op_binary(
1902 self, binary, operator, _cast_applied=False, **kw
1903 ):
1904 if (
1905 not _cast_applied
1906 and binary.type._type_affinity is not sqltypes.JSON
1907 ):
1908 kw["_cast_applied"] = True
1909 return self.process(sql.cast(binary, binary.type), **kw)
1910
1911 kw["eager_grouping"] = True
1912 return self._generate_generic_binary(
1913 binary, " #> " if not _cast_applied else " #>> ", **kw
1914 )
1915
1916 def visit_getitem_binary(self, binary, operator, **kw):
1917 return "%s[%s]" % (
1918 self.process(binary.left, **kw),
1919 self.process(binary.right, **kw),
1920 )
1921
1922 def visit_aggregate_order_by(self, element, **kw):
1923 return "%s ORDER BY %s" % (
1924 self.process(element.target, **kw),
1925 self.process(element.order_by, **kw),
1926 )
1927
1928 def visit_match_op_binary(self, binary, operator, **kw):
1929 if "postgresql_regconfig" in binary.modifiers:
1930 regconfig = self.render_literal_value(
1931 binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE
1932 )
1933 if regconfig:
1934 return "%s @@ plainto_tsquery(%s, %s)" % (
1935 self.process(binary.left, **kw),
1936 regconfig,
1937 self.process(binary.right, **kw),
1938 )
1939 return "%s @@ plainto_tsquery(%s)" % (
1940 self.process(binary.left, **kw),
1941 self.process(binary.right, **kw),
1942 )
1943
1944 def visit_ilike_case_insensitive_operand(self, element, **kw):
1945 return element.element._compiler_dispatch(self, **kw)
1946
1947 def visit_ilike_op_binary(self, binary, operator, **kw):
1948 escape = binary.modifiers.get("escape", None)
1949
1950 return "%s ILIKE %s" % (
1951 self.process(binary.left, **kw),
1952 self.process(binary.right, **kw),
1953 ) + (
1954 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
1955 if escape is not None
1956 else ""
1957 )
1958
1959 def visit_not_ilike_op_binary(self, binary, operator, **kw):
1960 escape = binary.modifiers.get("escape", None)
1961 return "%s NOT ILIKE %s" % (
1962 self.process(binary.left, **kw),
1963 self.process(binary.right, **kw),
1964 ) + (
1965 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
1966 if escape is not None
1967 else ""
1968 )
1969
1970 def _regexp_match(self, base_op, binary, operator, kw):
1971 flags = binary.modifiers["flags"]
1972 if flags is None:
1973 return self._generate_generic_binary(
1974 binary, " %s " % base_op, **kw
1975 )
1976 if flags == "i":
1977 return self._generate_generic_binary(
1978 binary, " %s* " % base_op, **kw
1979 )
1980 return "%s %s CONCAT('(?', %s, ')', %s)" % (
1981 self.process(binary.left, **kw),
1982 base_op,
1983 self.render_literal_value(flags, sqltypes.STRINGTYPE),
1984 self.process(binary.right, **kw),
1985 )
1986
1987 def visit_regexp_match_op_binary(self, binary, operator, **kw):
1988 return self._regexp_match("~", binary, operator, kw)
1989
1990 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1991 return self._regexp_match("!~", binary, operator, kw)
1992
1993 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
1994 string = self.process(binary.left, **kw)
1995 pattern_replace = self.process(binary.right, **kw)
1996 flags = binary.modifiers["flags"]
1997 if flags is None:
1998 return "REGEXP_REPLACE(%s, %s)" % (
1999 string,
2000 pattern_replace,
2001 )
2002 else:
2003 return "REGEXP_REPLACE(%s, %s, %s)" % (
2004 string,
2005 pattern_replace,
2006 self.render_literal_value(flags, sqltypes.STRINGTYPE),
2007 )
2008
2009 def visit_empty_set_expr(self, element_types, **kw):
2010 # cast the empty set to the type we are comparing against. if
2011 # we are comparing against the null type, pick an arbitrary
2012 # datatype for the empty set
2013 return "SELECT %s WHERE 1!=1" % (
2014 ", ".join(
2015 "CAST(NULL AS %s)"
2016 % self.dialect.type_compiler_instance.process(
2017 INTEGER() if type_._isnull else type_
2018 )
2019 for type_ in element_types or [INTEGER()]
2020 ),
2021 )
2022
2023 def render_literal_value(self, value, type_):
2024 value = super().render_literal_value(value, type_)
2025
2026 if self.dialect._backslash_escapes:
2027 value = value.replace("\\", "\\\\")
2028 return value
2029
2030 def visit_aggregate_strings_func(self, fn, **kw):
2031 return "string_agg%s" % self.function_argspec(fn)
2032
2033 def visit_sequence(self, seq, **kw):
2034 return "nextval('%s')" % self.preparer.format_sequence(seq)
2035
2036 def limit_clause(self, select, **kw):
2037 text = ""
2038 if select._limit_clause is not None:
2039 text += " \n LIMIT " + self.process(select._limit_clause, **kw)
2040 if select._offset_clause is not None:
2041 if select._limit_clause is None:
2042 text += "\n LIMIT ALL"
2043 text += " OFFSET " + self.process(select._offset_clause, **kw)
2044 return text
2045
2046 def format_from_hint_text(self, sqltext, table, hint, iscrud):
2047 if hint.upper() != "ONLY":
2048 raise exc.CompileError("Unrecognized hint: %r" % hint)
2049 return "ONLY " + sqltext
2050
2051 def get_select_precolumns(self, select, **kw):
2052 # Do not call super().get_select_precolumns because
2053 # it will warn/raise when distinct on is present
2054 if select._distinct or select._distinct_on:
2055 if select._distinct_on:
2056 return (
2057 "DISTINCT ON ("
2058 + ", ".join(
2059 [
2060 self.process(col, **kw)
2061 for col in select._distinct_on
2062 ]
2063 )
2064 + ") "
2065 )
2066 else:
2067 return "DISTINCT "
2068 else:
2069 return ""
2070
2071 def for_update_clause(self, select, **kw):
2072 if select._for_update_arg.read:
2073 if select._for_update_arg.key_share:
2074 tmp = " FOR KEY SHARE"
2075 else:
2076 tmp = " FOR SHARE"
2077 elif select._for_update_arg.key_share:
2078 tmp = " FOR NO KEY UPDATE"
2079 else:
2080 tmp = " FOR UPDATE"
2081
2082 if select._for_update_arg.of:
2083 tables = util.OrderedSet()
2084 for c in select._for_update_arg.of:
2085 tables.update(sql_util.surface_selectables_only(c))
2086
2087 of_kw = dict(kw)
2088 of_kw.update(ashint=True, use_schema=False)
2089 tmp += " OF " + ", ".join(
2090 self.process(table, **of_kw) for table in tables
2091 )
2092
2093 if select._for_update_arg.nowait:
2094 tmp += " NOWAIT"
2095 if select._for_update_arg.skip_locked:
2096 tmp += " SKIP LOCKED"
2097
2098 return tmp
2099
2100 def visit_substring_func(self, func, **kw):
2101 s = self.process(func.clauses.clauses[0], **kw)
2102 start = self.process(func.clauses.clauses[1], **kw)
2103 if len(func.clauses.clauses) > 2:
2104 length = self.process(func.clauses.clauses[2], **kw)
2105 return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length)
2106 else:
2107 return "SUBSTRING(%s FROM %s)" % (s, start)
2108
2109 def _on_conflict_target(self, clause, **kw):
2110 if clause.constraint_target is not None:
2111 # target may be a name of an Index, UniqueConstraint or
2112 # ExcludeConstraint. While there is a separate
2113 # "max_identifier_length" for indexes, PostgreSQL uses the same
2114 # length for all objects so we can use
2115 # truncate_and_render_constraint_name
2116 target_text = (
2117 "ON CONSTRAINT %s"
2118 % self.preparer.truncate_and_render_constraint_name(
2119 clause.constraint_target
2120 )
2121 )
2122 elif clause.inferred_target_elements is not None:
2123 target_text = "(%s)" % ", ".join(
2124 (
2125 self.preparer.quote(c)
2126 if isinstance(c, str)
2127 else self.process(c, include_table=False, use_schema=False)
2128 )
2129 for c in clause.inferred_target_elements
2130 )
2131 if clause.inferred_target_whereclause is not None:
2132 target_text += " WHERE %s" % self.process(
2133 clause.inferred_target_whereclause,
2134 include_table=False,
2135 use_schema=False,
2136 )
2137 else:
2138 target_text = ""
2139
2140 return target_text
2141
2142 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
2143 target_text = self._on_conflict_target(on_conflict, **kw)
2144
2145 if target_text:
2146 return "ON CONFLICT %s DO NOTHING" % target_text
2147 else:
2148 return "ON CONFLICT DO NOTHING"
2149
2150 def visit_on_conflict_do_update(self, on_conflict, **kw):
2151 clause = on_conflict
2152
2153 target_text = self._on_conflict_target(on_conflict, **kw)
2154
2155 action_set_ops = []
2156
2157 set_parameters = dict(clause.update_values_to_set)
2158 # create a list of column assignment clauses as tuples
2159
2160 insert_statement = self.stack[-1]["selectable"]
2161 cols = insert_statement.table.c
2162 for c in cols:
2163 col_key = c.key
2164
2165 if col_key in set_parameters:
2166 value = set_parameters.pop(col_key)
2167 elif c in set_parameters:
2168 value = set_parameters.pop(c)
2169 else:
2170 continue
2171
2172 # TODO: this coercion should be up front. we can't cache
2173 # SQL constructs with non-bound literals buried in them
2174 if coercions._is_literal(value):
2175 value = elements.BindParameter(None, value, type_=c.type)
2176
2177 else:
2178 if (
2179 isinstance(value, elements.BindParameter)
2180 and value.type._isnull
2181 ):
2182 value = value._clone()
2183 value.type = c.type
2184 value_text = self.process(value.self_group(), use_schema=False)
2185
2186 key_text = self.preparer.quote(c.name)
2187 action_set_ops.append("%s = %s" % (key_text, value_text))
2188
2189 # check for names that don't match columns
2190 if set_parameters:
2191 util.warn(
2192 "Additional column names not matching "
2193 "any column keys in table '%s': %s"
2194 % (
2195 self.current_executable.table.name,
2196 (", ".join("'%s'" % c for c in set_parameters)),
2197 )
2198 )
2199 for k, v in set_parameters.items():
2200 key_text = (
2201 self.preparer.quote(k)
2202 if isinstance(k, str)
2203 else self.process(k, use_schema=False)
2204 )
2205 value_text = self.process(
2206 coercions.expect(roles.ExpressionElementRole, v),
2207 use_schema=False,
2208 )
2209 action_set_ops.append("%s = %s" % (key_text, value_text))
2210
2211 action_text = ", ".join(action_set_ops)
2212 if clause.update_whereclause is not None:
2213 action_text += " WHERE %s" % self.process(
2214 clause.update_whereclause, include_table=True, use_schema=False
2215 )
2216
2217 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
2218
2219 def update_from_clause(
2220 self, update_stmt, from_table, extra_froms, from_hints, **kw
2221 ):
2222 kw["asfrom"] = True
2223 return "FROM " + ", ".join(
2224 t._compiler_dispatch(self, fromhints=from_hints, **kw)
2225 for t in extra_froms
2226 )
2227
2228 def delete_extra_from_clause(
2229 self, delete_stmt, from_table, extra_froms, from_hints, **kw
2230 ):
2231 """Render the DELETE .. USING clause specific to PostgreSQL."""
2232 kw["asfrom"] = True
2233 return "USING " + ", ".join(
2234 t._compiler_dispatch(self, fromhints=from_hints, **kw)
2235 for t in extra_froms
2236 )
2237
2238 def fetch_clause(self, select, **kw):
2239 # pg requires parens for non literal clauses. It's also required for
2240 # bind parameters if a ::type casts is used by the driver (asyncpg),
2241 # so it's easiest to just always add it
2242 text = ""
2243 if select._offset_clause is not None:
2244 text += "\n OFFSET (%s) ROWS" % self.process(
2245 select._offset_clause, **kw
2246 )
2247 if select._fetch_clause is not None:
2248 text += "\n FETCH FIRST (%s)%s ROWS %s" % (
2249 self.process(select._fetch_clause, **kw),
2250 " PERCENT" if select._fetch_clause_options["percent"] else "",
2251 (
2252 "WITH TIES"
2253 if select._fetch_clause_options["with_ties"]
2254 else "ONLY"
2255 ),
2256 )
2257 return text
2258
2259
2260class PGDDLCompiler(compiler.DDLCompiler):
2261 def get_column_specification(self, column, **kwargs):
2262 colspec = self.preparer.format_column(column)
2263 impl_type = column.type.dialect_impl(self.dialect)
2264 if isinstance(impl_type, sqltypes.TypeDecorator):
2265 impl_type = impl_type.impl
2266
2267 has_identity = (
2268 column.identity is not None
2269 and self.dialect.supports_identity_columns
2270 )
2271
2272 if (
2273 column.primary_key
2274 and column is column.table._autoincrement_column
2275 and (
2276 self.dialect.supports_smallserial
2277 or not isinstance(impl_type, sqltypes.SmallInteger)
2278 )
2279 and not has_identity
2280 and (
2281 column.default is None
2282 or (
2283 isinstance(column.default, schema.Sequence)
2284 and column.default.optional
2285 )
2286 )
2287 ):
2288 if isinstance(impl_type, sqltypes.BigInteger):
2289 colspec += " BIGSERIAL"
2290 elif isinstance(impl_type, sqltypes.SmallInteger):
2291 colspec += " SMALLSERIAL"
2292 else:
2293 colspec += " SERIAL"
2294 else:
2295 colspec += " " + self.dialect.type_compiler_instance.process(
2296 column.type,
2297 type_expression=column,
2298 identifier_preparer=self.preparer,
2299 )
2300 default = self.get_column_default_string(column)
2301 if default is not None:
2302 colspec += " DEFAULT " + default
2303
2304 if column.computed is not None:
2305 colspec += " " + self.process(column.computed)
2306 if has_identity:
2307 colspec += " " + self.process(column.identity)
2308
2309 if not column.nullable and not has_identity:
2310 colspec += " NOT NULL"
2311 elif column.nullable and has_identity:
2312 colspec += " NULL"
2313 return colspec
2314
2315 def _define_constraint_validity(self, constraint):
2316 not_valid = constraint.dialect_options["postgresql"]["not_valid"]
2317 return " NOT VALID" if not_valid else ""
2318
2319 def _define_include(self, obj):
2320 includeclause = obj.dialect_options["postgresql"]["include"]
2321 if not includeclause:
2322 return ""
2323 inclusions = [
2324 obj.table.c[col] if isinstance(col, str) else col
2325 for col in includeclause
2326 ]
2327 return " INCLUDE (%s)" % ", ".join(
2328 [self.preparer.quote(c.name) for c in inclusions]
2329 )
2330
2331 def visit_check_constraint(self, constraint, **kw):
2332 if constraint._type_bound:
2333 typ = list(constraint.columns)[0].type
2334 if (
2335 isinstance(typ, sqltypes.ARRAY)
2336 and isinstance(typ.item_type, sqltypes.Enum)
2337 and not typ.item_type.native_enum
2338 ):
2339 raise exc.CompileError(
2340 "PostgreSQL dialect cannot produce the CHECK constraint "
2341 "for ARRAY of non-native ENUM; please specify "
2342 "create_constraint=False on this Enum datatype."
2343 )
2344
2345 text = super().visit_check_constraint(constraint)
2346 text += self._define_constraint_validity(constraint)
2347 return text
2348
2349 def visit_foreign_key_constraint(self, constraint, **kw):
2350 text = super().visit_foreign_key_constraint(constraint)
2351 text += self._define_constraint_validity(constraint)
2352 return text
2353
2354 def visit_primary_key_constraint(self, constraint, **kw):
2355 text = super().visit_primary_key_constraint(constraint)
2356 text += self._define_include(constraint)
2357 return text
2358
2359 def visit_unique_constraint(self, constraint, **kw):
2360 text = super().visit_unique_constraint(constraint)
2361 text += self._define_include(constraint)
2362 return text
2363
2364 @util.memoized_property
2365 def _fk_ondelete_pattern(self):
2366 return re.compile(
2367 r"^(?:RESTRICT|CASCADE|SET (?:NULL|DEFAULT)(?:\s*\(.+\))?"
2368 r"|NO ACTION)$",
2369 re.I,
2370 )
2371
2372 def define_constraint_ondelete_cascade(self, constraint):
2373 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
2374 constraint.ondelete, self._fk_ondelete_pattern
2375 )
2376
2377 def visit_create_enum_type(self, create, **kw):
2378 type_ = create.element
2379
2380 return "CREATE TYPE %s AS ENUM (%s)" % (
2381 self.preparer.format_type(type_),
2382 ", ".join(
2383 self.sql_compiler.process(sql.literal(e), literal_binds=True)
2384 for e in type_.enums
2385 ),
2386 )
2387
2388 def visit_drop_enum_type(self, drop, **kw):
2389 type_ = drop.element
2390
2391 return "DROP TYPE %s" % (self.preparer.format_type(type_))
2392
2393 def visit_create_domain_type(self, create, **kw):
2394 domain: DOMAIN = create.element
2395
2396 options = []
2397 if domain.collation is not None:
2398 options.append(f"COLLATE {self.preparer.quote(domain.collation)}")
2399 if domain.default is not None:
2400 default = self.render_default_string(domain.default)
2401 options.append(f"DEFAULT {default}")
2402 if domain.constraint_name is not None:
2403 name = self.preparer.truncate_and_render_constraint_name(
2404 domain.constraint_name
2405 )
2406 options.append(f"CONSTRAINT {name}")
2407 if domain.not_null:
2408 options.append("NOT NULL")
2409 if domain.check is not None:
2410 check = self.sql_compiler.process(
2411 domain.check, include_table=False, literal_binds=True
2412 )
2413 options.append(f"CHECK ({check})")
2414
2415 return (
2416 f"CREATE DOMAIN {self.preparer.format_type(domain)} AS "
2417 f"{self.type_compiler.process(domain.data_type)} "
2418 f"{' '.join(options)}"
2419 )
2420
2421 def visit_drop_domain_type(self, drop, **kw):
2422 domain = drop.element
2423 return f"DROP DOMAIN {self.preparer.format_type(domain)}"
2424
2425 def visit_create_index(self, create, **kw):
2426 preparer = self.preparer
2427 index = create.element
2428 self._verify_index_table(index)
2429 text = "CREATE "
2430 if index.unique:
2431 text += "UNIQUE "
2432
2433 text += "INDEX "
2434
2435 if self.dialect._supports_create_index_concurrently:
2436 concurrently = index.dialect_options["postgresql"]["concurrently"]
2437 if concurrently:
2438 text += "CONCURRENTLY "
2439
2440 if create.if_not_exists:
2441 text += "IF NOT EXISTS "
2442
2443 text += "%s ON %s " % (
2444 self._prepared_index_name(index, include_schema=False),
2445 preparer.format_table(index.table),
2446 )
2447
2448 using = index.dialect_options["postgresql"]["using"]
2449 if using:
2450 text += (
2451 "USING %s "
2452 % self.preparer.validate_sql_phrase(using, IDX_USING).lower()
2453 )
2454
2455 ops = index.dialect_options["postgresql"]["ops"]
2456 text += "(%s)" % (
2457 ", ".join(
2458 [
2459 self.sql_compiler.process(
2460 (
2461 expr.self_group()
2462 if not isinstance(expr, expression.ColumnClause)
2463 else expr
2464 ),
2465 include_table=False,
2466 literal_binds=True,
2467 )
2468 + (
2469 (" " + ops[expr.key])
2470 if hasattr(expr, "key") and expr.key in ops
2471 else ""
2472 )
2473 for expr in index.expressions
2474 ]
2475 )
2476 )
2477
2478 text += self._define_include(index)
2479
2480 nulls_not_distinct = index.dialect_options["postgresql"][
2481 "nulls_not_distinct"
2482 ]
2483 if nulls_not_distinct is True:
2484 text += " NULLS NOT DISTINCT"
2485 elif nulls_not_distinct is False:
2486 text += " NULLS DISTINCT"
2487
2488 withclause = index.dialect_options["postgresql"]["with"]
2489 if withclause:
2490 text += " WITH (%s)" % (
2491 ", ".join(
2492 [
2493 "%s = %s" % storage_parameter
2494 for storage_parameter in withclause.items()
2495 ]
2496 )
2497 )
2498
2499 tablespace_name = index.dialect_options["postgresql"]["tablespace"]
2500 if tablespace_name:
2501 text += " TABLESPACE %s" % preparer.quote(tablespace_name)
2502
2503 whereclause = index.dialect_options["postgresql"]["where"]
2504 if whereclause is not None:
2505 whereclause = coercions.expect(
2506 roles.DDLExpressionRole, whereclause
2507 )
2508
2509 where_compiled = self.sql_compiler.process(
2510 whereclause, include_table=False, literal_binds=True
2511 )
2512 text += " WHERE " + where_compiled
2513
2514 return text
2515
2516 def define_unique_constraint_distinct(self, constraint, **kw):
2517 nulls_not_distinct = constraint.dialect_options["postgresql"][
2518 "nulls_not_distinct"
2519 ]
2520 if nulls_not_distinct is True:
2521 nulls_not_distinct_param = "NULLS NOT DISTINCT "
2522 elif nulls_not_distinct is False:
2523 nulls_not_distinct_param = "NULLS DISTINCT "
2524 else:
2525 nulls_not_distinct_param = ""
2526 return nulls_not_distinct_param
2527
2528 def visit_drop_index(self, drop, **kw):
2529 index = drop.element
2530
2531 text = "\nDROP INDEX "
2532
2533 if self.dialect._supports_drop_index_concurrently:
2534 concurrently = index.dialect_options["postgresql"]["concurrently"]
2535 if concurrently:
2536 text += "CONCURRENTLY "
2537
2538 if drop.if_exists:
2539 text += "IF EXISTS "
2540
2541 text += self._prepared_index_name(index, include_schema=True)
2542 return text
2543
2544 def visit_exclude_constraint(self, constraint, **kw):
2545 text = ""
2546 if constraint.name is not None:
2547 text += "CONSTRAINT %s " % self.preparer.format_constraint(
2548 constraint
2549 )
2550 elements = []
2551 kw["include_table"] = False
2552 kw["literal_binds"] = True
2553 for expr, name, op in constraint._render_exprs:
2554 exclude_element = self.sql_compiler.process(expr, **kw) + (
2555 (" " + constraint.ops[expr.key])
2556 if hasattr(expr, "key") and expr.key in constraint.ops
2557 else ""
2558 )
2559
2560 elements.append("%s WITH %s" % (exclude_element, op))
2561 text += "EXCLUDE USING %s (%s)" % (
2562 self.preparer.validate_sql_phrase(
2563 constraint.using, IDX_USING
2564 ).lower(),
2565 ", ".join(elements),
2566 )
2567 if constraint.where is not None:
2568 text += " WHERE (%s)" % self.sql_compiler.process(
2569 constraint.where, literal_binds=True
2570 )
2571 text += self.define_constraint_deferrability(constraint)
2572 return text
2573
2574 def post_create_table(self, table):
2575 table_opts = []
2576 pg_opts = table.dialect_options["postgresql"]
2577
2578 inherits = pg_opts.get("inherits")
2579 if inherits is not None:
2580 if not isinstance(inherits, (list, tuple)):
2581 inherits = (inherits,)
2582 table_opts.append(
2583 "\n INHERITS ( "
2584 + ", ".join(self.preparer.quote(name) for name in inherits)
2585 + " )"
2586 )
2587
2588 if pg_opts["partition_by"]:
2589 table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"])
2590
2591 if pg_opts["using"]:
2592 table_opts.append("\n USING %s" % pg_opts["using"])
2593
2594 if pg_opts["with_oids"] is True:
2595 table_opts.append("\n WITH OIDS")
2596 elif pg_opts["with_oids"] is False:
2597 table_opts.append("\n WITHOUT OIDS")
2598
2599 if pg_opts["on_commit"]:
2600 on_commit_options = pg_opts["on_commit"].replace("_", " ").upper()
2601 table_opts.append("\n ON COMMIT %s" % on_commit_options)
2602
2603 if pg_opts["tablespace"]:
2604 tablespace_name = pg_opts["tablespace"]
2605 table_opts.append(
2606 "\n TABLESPACE %s" % self.preparer.quote(tablespace_name)
2607 )
2608
2609 return "".join(table_opts)
2610
2611 def visit_computed_column(self, generated, **kw):
2612 if generated.persisted is False:
2613 raise exc.CompileError(
2614 "PostrgreSQL computed columns do not support 'virtual' "
2615 "persistence; set the 'persisted' flag to None or True for "
2616 "PostgreSQL support."
2617 )
2618
2619 return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process(
2620 generated.sqltext, include_table=False, literal_binds=True
2621 )
2622
2623 def visit_create_sequence(self, create, **kw):
2624 prefix = None
2625 if create.element.data_type is not None:
2626 prefix = " AS %s" % self.type_compiler.process(
2627 create.element.data_type
2628 )
2629
2630 return super().visit_create_sequence(create, prefix=prefix, **kw)
2631
2632 def _can_comment_on_constraint(self, ddl_instance):
2633 constraint = ddl_instance.element
2634 if constraint.name is None:
2635 raise exc.CompileError(
2636 f"Can't emit COMMENT ON for constraint {constraint!r}: "
2637 "it has no name"
2638 )
2639 if constraint.table is None:
2640 raise exc.CompileError(
2641 f"Can't emit COMMENT ON for constraint {constraint!r}: "
2642 "it has no associated table"
2643 )
2644
2645 def visit_set_constraint_comment(self, create, **kw):
2646 self._can_comment_on_constraint(create)
2647 return "COMMENT ON CONSTRAINT %s ON %s IS %s" % (
2648 self.preparer.format_constraint(create.element),
2649 self.preparer.format_table(create.element.table),
2650 self.sql_compiler.render_literal_value(
2651 create.element.comment, sqltypes.String()
2652 ),
2653 )
2654
2655 def visit_drop_constraint_comment(self, drop, **kw):
2656 self._can_comment_on_constraint(drop)
2657 return "COMMENT ON CONSTRAINT %s ON %s IS NULL" % (
2658 self.preparer.format_constraint(drop.element),
2659 self.preparer.format_table(drop.element.table),
2660 )
2661
2662
2663class PGTypeCompiler(compiler.GenericTypeCompiler):
2664 def visit_TSVECTOR(self, type_, **kw):
2665 return "TSVECTOR"
2666
2667 def visit_TSQUERY(self, type_, **kw):
2668 return "TSQUERY"
2669
2670 def visit_INET(self, type_, **kw):
2671 return "INET"
2672
2673 def visit_CIDR(self, type_, **kw):
2674 return "CIDR"
2675
2676 def visit_CITEXT(self, type_, **kw):
2677 return "CITEXT"
2678
2679 def visit_MACADDR(self, type_, **kw):
2680 return "MACADDR"
2681
2682 def visit_MACADDR8(self, type_, **kw):
2683 return "MACADDR8"
2684
2685 def visit_MONEY(self, type_, **kw):
2686 return "MONEY"
2687
2688 def visit_OID(self, type_, **kw):
2689 return "OID"
2690
2691 def visit_REGCONFIG(self, type_, **kw):
2692 return "REGCONFIG"
2693
2694 def visit_REGCLASS(self, type_, **kw):
2695 return "REGCLASS"
2696
2697 def visit_FLOAT(self, type_, **kw):
2698 if not type_.precision:
2699 return "FLOAT"
2700 else:
2701 return "FLOAT(%(precision)s)" % {"precision": type_.precision}
2702
2703 def visit_double(self, type_, **kw):
2704 return self.visit_DOUBLE_PRECISION(type, **kw)
2705
2706 def visit_BIGINT(self, type_, **kw):
2707 return "BIGINT"
2708
2709 def visit_HSTORE(self, type_, **kw):
2710 return "HSTORE"
2711
2712 def visit_JSON(self, type_, **kw):
2713 return "JSON"
2714
2715 def visit_JSONB(self, type_, **kw):
2716 return "JSONB"
2717
2718 def visit_INT4MULTIRANGE(self, type_, **kw):
2719 return "INT4MULTIRANGE"
2720
2721 def visit_INT8MULTIRANGE(self, type_, **kw):
2722 return "INT8MULTIRANGE"
2723
2724 def visit_NUMMULTIRANGE(self, type_, **kw):
2725 return "NUMMULTIRANGE"
2726
2727 def visit_DATEMULTIRANGE(self, type_, **kw):
2728 return "DATEMULTIRANGE"
2729
2730 def visit_TSMULTIRANGE(self, type_, **kw):
2731 return "TSMULTIRANGE"
2732
2733 def visit_TSTZMULTIRANGE(self, type_, **kw):
2734 return "TSTZMULTIRANGE"
2735
2736 def visit_INT4RANGE(self, type_, **kw):
2737 return "INT4RANGE"
2738
2739 def visit_INT8RANGE(self, type_, **kw):
2740 return "INT8RANGE"
2741
2742 def visit_NUMRANGE(self, type_, **kw):
2743 return "NUMRANGE"
2744
2745 def visit_DATERANGE(self, type_, **kw):
2746 return "DATERANGE"
2747
2748 def visit_TSRANGE(self, type_, **kw):
2749 return "TSRANGE"
2750
2751 def visit_TSTZRANGE(self, type_, **kw):
2752 return "TSTZRANGE"
2753
2754 def visit_json_int_index(self, type_, **kw):
2755 return "INT"
2756
2757 def visit_json_str_index(self, type_, **kw):
2758 return "TEXT"
2759
2760 def visit_datetime(self, type_, **kw):
2761 return self.visit_TIMESTAMP(type_, **kw)
2762
2763 def visit_enum(self, type_, **kw):
2764 if not type_.native_enum or not self.dialect.supports_native_enum:
2765 return super().visit_enum(type_, **kw)
2766 else:
2767 return self.visit_ENUM(type_, **kw)
2768
2769 def visit_ENUM(self, type_, identifier_preparer=None, **kw):
2770 if identifier_preparer is None:
2771 identifier_preparer = self.dialect.identifier_preparer
2772 return identifier_preparer.format_type(type_)
2773
2774 def visit_DOMAIN(self, type_, identifier_preparer=None, **kw):
2775 if identifier_preparer is None:
2776 identifier_preparer = self.dialect.identifier_preparer
2777 return identifier_preparer.format_type(type_)
2778
2779 def visit_TIMESTAMP(self, type_, **kw):
2780 return "TIMESTAMP%s %s" % (
2781 (
2782 "(%d)" % type_.precision
2783 if getattr(type_, "precision", None) is not None
2784 else ""
2785 ),
2786 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
2787 )
2788
2789 def visit_TIME(self, type_, **kw):
2790 return "TIME%s %s" % (
2791 (
2792 "(%d)" % type_.precision
2793 if getattr(type_, "precision", None) is not None
2794 else ""
2795 ),
2796 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
2797 )
2798
2799 def visit_INTERVAL(self, type_, **kw):
2800 text = "INTERVAL"
2801 if type_.fields is not None:
2802 text += " " + type_.fields
2803 if type_.precision is not None:
2804 text += " (%d)" % type_.precision
2805 return text
2806
2807 def visit_BIT(self, type_, **kw):
2808 if type_.varying:
2809 compiled = "BIT VARYING"
2810 if type_.length is not None:
2811 compiled += "(%d)" % type_.length
2812 else:
2813 compiled = "BIT(%d)" % type_.length
2814 return compiled
2815
2816 def visit_uuid(self, type_, **kw):
2817 if type_.native_uuid:
2818 return self.visit_UUID(type_, **kw)
2819 else:
2820 return super().visit_uuid(type_, **kw)
2821
2822 def visit_UUID(self, type_, **kw):
2823 return "UUID"
2824
2825 def visit_large_binary(self, type_, **kw):
2826 return self.visit_BYTEA(type_, **kw)
2827
2828 def visit_BYTEA(self, type_, **kw):
2829 return "BYTEA"
2830
2831 def visit_ARRAY(self, type_, **kw):
2832 inner = self.process(type_.item_type, **kw)
2833 return re.sub(
2834 r"((?: COLLATE.*)?)$",
2835 (
2836 r"%s\1"
2837 % (
2838 "[]"
2839 * (type_.dimensions if type_.dimensions is not None else 1)
2840 )
2841 ),
2842 inner,
2843 count=1,
2844 )
2845
2846 def visit_json_path(self, type_, **kw):
2847 return self.visit_JSONPATH(type_, **kw)
2848
2849 def visit_JSONPATH(self, type_, **kw):
2850 return "JSONPATH"
2851
2852
2853class PGIdentifierPreparer(compiler.IdentifierPreparer):
2854 reserved_words = RESERVED_WORDS
2855
2856 def _unquote_identifier(self, value):
2857 if value[0] == self.initial_quote:
2858 value = value[1:-1].replace(
2859 self.escape_to_quote, self.escape_quote
2860 )
2861 return value
2862
2863 def format_type(self, type_, use_schema=True):
2864 if not type_.name:
2865 raise exc.CompileError(
2866 f"PostgreSQL {type_.__class__.__name__} type requires a name."
2867 )
2868
2869 name = self.quote(type_.name)
2870 effective_schema = self.schema_for_object(type_)
2871
2872 if (
2873 not self.omit_schema
2874 and use_schema
2875 and effective_schema is not None
2876 ):
2877 name = f"{self.quote_schema(effective_schema)}.{name}"
2878 return name
2879
2880
2881class ReflectedNamedType(TypedDict):
2882 """Represents a reflected named type."""
2883
2884 name: str
2885 """Name of the type."""
2886 schema: str
2887 """The schema of the type."""
2888 visible: bool
2889 """Indicates if this type is in the current search path."""
2890
2891
2892class ReflectedDomainConstraint(TypedDict):
2893 """Represents a reflect check constraint of a domain."""
2894
2895 name: str
2896 """Name of the constraint."""
2897 check: str
2898 """The check constraint text."""
2899
2900
2901class ReflectedDomain(ReflectedNamedType):
2902 """Represents a reflected enum."""
2903
2904 type: str
2905 """The string name of the underlying data type of the domain."""
2906 nullable: bool
2907 """Indicates if the domain allows null or not."""
2908 default: Optional[str]
2909 """The string representation of the default value of this domain
2910 or ``None`` if none present.
2911 """
2912 constraints: List[ReflectedDomainConstraint]
2913 """The constraints defined in the domain, if any.
2914 The constraint are in order of evaluation by postgresql.
2915 """
2916 collation: Optional[str]
2917 """The collation for the domain."""
2918
2919
2920class ReflectedEnum(ReflectedNamedType):
2921 """Represents a reflected enum."""
2922
2923 labels: List[str]
2924 """The labels that compose the enum."""
2925
2926
2927class PGInspector(reflection.Inspector):
2928 dialect: PGDialect
2929
2930 def get_table_oid(
2931 self, table_name: str, schema: Optional[str] = None
2932 ) -> int:
2933 """Return the OID for the given table name.
2934
2935 :param table_name: string name of the table. For special quoting,
2936 use :class:`.quoted_name`.
2937
2938 :param schema: string schema name; if omitted, uses the default schema
2939 of the database connection. For special quoting,
2940 use :class:`.quoted_name`.
2941
2942 """
2943
2944 with self._operation_context() as conn:
2945 return self.dialect.get_table_oid(
2946 conn, table_name, schema, info_cache=self.info_cache
2947 )
2948
2949 def get_domains(
2950 self, schema: Optional[str] = None
2951 ) -> List[ReflectedDomain]:
2952 """Return a list of DOMAIN objects.
2953
2954 Each member is a dictionary containing these fields:
2955
2956 * name - name of the domain
2957 * schema - the schema name for the domain.
2958 * visible - boolean, whether or not this domain is visible
2959 in the default search path.
2960 * type - the type defined by this domain.
2961 * nullable - Indicates if this domain can be ``NULL``.
2962 * default - The default value of the domain or ``None`` if the
2963 domain has no default.
2964 * constraints - A list of dict wit the constraint defined by this
2965 domain. Each element constaints two keys: ``name`` of the
2966 constraint and ``check`` with the constraint text.
2967
2968 :param schema: schema name. If None, the default schema
2969 (typically 'public') is used. May also be set to ``'*'`` to
2970 indicate load domains for all schemas.
2971
2972 .. versionadded:: 2.0
2973
2974 """
2975 with self._operation_context() as conn:
2976 return self.dialect._load_domains(
2977 conn, schema, info_cache=self.info_cache
2978 )
2979
2980 def get_enums(self, schema: Optional[str] = None) -> List[ReflectedEnum]:
2981 """Return a list of ENUM objects.
2982
2983 Each member is a dictionary containing these fields:
2984
2985 * name - name of the enum
2986 * schema - the schema name for the enum.
2987 * visible - boolean, whether or not this enum is visible
2988 in the default search path.
2989 * labels - a list of string labels that apply to the enum.
2990
2991 :param schema: schema name. If None, the default schema
2992 (typically 'public') is used. May also be set to ``'*'`` to
2993 indicate load enums for all schemas.
2994
2995 """
2996 with self._operation_context() as conn:
2997 return self.dialect._load_enums(
2998 conn, schema, info_cache=self.info_cache
2999 )
3000
3001 def get_foreign_table_names(
3002 self, schema: Optional[str] = None
3003 ) -> List[str]:
3004 """Return a list of FOREIGN TABLE names.
3005
3006 Behavior is similar to that of
3007 :meth:`_reflection.Inspector.get_table_names`,
3008 except that the list is limited to those tables that report a
3009 ``relkind`` value of ``f``.
3010
3011 """
3012 with self._operation_context() as conn:
3013 return self.dialect._get_foreign_table_names(
3014 conn, schema, info_cache=self.info_cache
3015 )
3016
3017 def has_type(
3018 self, type_name: str, schema: Optional[str] = None, **kw: Any
3019 ) -> bool:
3020 """Return if the database has the specified type in the provided
3021 schema.
3022
3023 :param type_name: the type to check.
3024 :param schema: schema name. If None, the default schema
3025 (typically 'public') is used. May also be set to ``'*'`` to
3026 check in all schemas.
3027
3028 .. versionadded:: 2.0
3029
3030 """
3031 with self._operation_context() as conn:
3032 return self.dialect.has_type(
3033 conn, type_name, schema, info_cache=self.info_cache
3034 )
3035
3036
3037class PGExecutionContext(default.DefaultExecutionContext):
3038 def fire_sequence(self, seq, type_):
3039 return self._execute_scalar(
3040 (
3041 "select nextval('%s')"
3042 % self.identifier_preparer.format_sequence(seq)
3043 ),
3044 type_,
3045 )
3046
3047 def get_insert_default(self, column):
3048 if column.primary_key and column is column.table._autoincrement_column:
3049 if column.server_default and column.server_default.has_argument:
3050 # pre-execute passive defaults on primary key columns
3051 return self._execute_scalar(
3052 "select %s" % column.server_default.arg, column.type
3053 )
3054
3055 elif column.default is None or (
3056 column.default.is_sequence and column.default.optional
3057 ):
3058 # execute the sequence associated with a SERIAL primary
3059 # key column. for non-primary-key SERIAL, the ID just
3060 # generates server side.
3061
3062 try:
3063 seq_name = column._postgresql_seq_name
3064 except AttributeError:
3065 tab = column.table.name
3066 col = column.name
3067 tab = tab[0 : 29 + max(0, (29 - len(col)))]
3068 col = col[0 : 29 + max(0, (29 - len(tab)))]
3069 name = "%s_%s_seq" % (tab, col)
3070 column._postgresql_seq_name = seq_name = name
3071
3072 if column.table is not None:
3073 effective_schema = self.connection.schema_for_object(
3074 column.table
3075 )
3076 else:
3077 effective_schema = None
3078
3079 if effective_schema is not None:
3080 exc = 'select nextval(\'"%s"."%s"\')' % (
3081 effective_schema,
3082 seq_name,
3083 )
3084 else:
3085 exc = "select nextval('\"%s\"')" % (seq_name,)
3086
3087 return self._execute_scalar(exc, column.type)
3088
3089 return super().get_insert_default(column)
3090
3091
3092class PGReadOnlyConnectionCharacteristic(
3093 characteristics.ConnectionCharacteristic
3094):
3095 transactional = True
3096
3097 def reset_characteristic(self, dialect, dbapi_conn):
3098 dialect.set_readonly(dbapi_conn, False)
3099
3100 def set_characteristic(self, dialect, dbapi_conn, value):
3101 dialect.set_readonly(dbapi_conn, value)
3102
3103 def get_characteristic(self, dialect, dbapi_conn):
3104 return dialect.get_readonly(dbapi_conn)
3105
3106
3107class PGDeferrableConnectionCharacteristic(
3108 characteristics.ConnectionCharacteristic
3109):
3110 transactional = True
3111
3112 def reset_characteristic(self, dialect, dbapi_conn):
3113 dialect.set_deferrable(dbapi_conn, False)
3114
3115 def set_characteristic(self, dialect, dbapi_conn, value):
3116 dialect.set_deferrable(dbapi_conn, value)
3117
3118 def get_characteristic(self, dialect, dbapi_conn):
3119 return dialect.get_deferrable(dbapi_conn)
3120
3121
3122class PGDialect(default.DefaultDialect):
3123 name = "postgresql"
3124 supports_statement_cache = True
3125 supports_alter = True
3126 max_identifier_length = 63
3127 supports_sane_rowcount = True
3128
3129 bind_typing = interfaces.BindTyping.RENDER_CASTS
3130
3131 supports_native_enum = True
3132 supports_native_boolean = True
3133 supports_native_uuid = True
3134 supports_smallserial = True
3135
3136 supports_sequences = True
3137 sequences_optional = True
3138 preexecute_autoincrement_sequences = True
3139 postfetch_lastrowid = False
3140 use_insertmanyvalues = True
3141
3142 returns_native_bytes = True
3143
3144 insertmanyvalues_implicit_sentinel = (
3145 InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
3146 | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
3147 | InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
3148 )
3149
3150 supports_comments = True
3151 supports_constraint_comments = True
3152 supports_default_values = True
3153
3154 supports_default_metavalue = True
3155
3156 supports_empty_insert = False
3157 supports_multivalues_insert = True
3158
3159 supports_identity_columns = True
3160
3161 default_paramstyle = "pyformat"
3162 ischema_names = ischema_names
3163 colspecs = colspecs
3164
3165 statement_compiler = PGCompiler
3166 ddl_compiler = PGDDLCompiler
3167 type_compiler_cls = PGTypeCompiler
3168 preparer = PGIdentifierPreparer
3169 execution_ctx_cls = PGExecutionContext
3170 inspector = PGInspector
3171
3172 update_returning = True
3173 delete_returning = True
3174 insert_returning = True
3175 update_returning_multifrom = True
3176 delete_returning_multifrom = True
3177
3178 connection_characteristics = (
3179 default.DefaultDialect.connection_characteristics
3180 )
3181 connection_characteristics = connection_characteristics.union(
3182 {
3183 "postgresql_readonly": PGReadOnlyConnectionCharacteristic(),
3184 "postgresql_deferrable": PGDeferrableConnectionCharacteristic(),
3185 }
3186 )
3187
3188 construct_arguments = [
3189 (
3190 schema.Index,
3191 {
3192 "using": False,
3193 "include": None,
3194 "where": None,
3195 "ops": {},
3196 "concurrently": False,
3197 "with": {},
3198 "tablespace": None,
3199 "nulls_not_distinct": None,
3200 },
3201 ),
3202 (
3203 schema.Table,
3204 {
3205 "ignore_search_path": False,
3206 "tablespace": None,
3207 "partition_by": None,
3208 "with_oids": None,
3209 "on_commit": None,
3210 "inherits": None,
3211 "using": None,
3212 },
3213 ),
3214 (
3215 schema.CheckConstraint,
3216 {
3217 "not_valid": False,
3218 },
3219 ),
3220 (
3221 schema.ForeignKeyConstraint,
3222 {
3223 "not_valid": False,
3224 },
3225 ),
3226 (
3227 schema.PrimaryKeyConstraint,
3228 {"include": None},
3229 ),
3230 (
3231 schema.UniqueConstraint,
3232 {
3233 "include": None,
3234 "nulls_not_distinct": None,
3235 },
3236 ),
3237 ]
3238
3239 reflection_options = ("postgresql_ignore_search_path",)
3240
3241 _backslash_escapes = True
3242 _supports_create_index_concurrently = True
3243 _supports_drop_index_concurrently = True
3244 _supports_jsonb_subscripting = True
3245
3246 def __init__(
3247 self,
3248 native_inet_types=None,
3249 json_serializer=None,
3250 json_deserializer=None,
3251 **kwargs,
3252 ):
3253 default.DefaultDialect.__init__(self, **kwargs)
3254
3255 self._native_inet_types = native_inet_types
3256 self._json_deserializer = json_deserializer
3257 self._json_serializer = json_serializer
3258
3259 def initialize(self, connection):
3260 super().initialize(connection)
3261
3262 # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689
3263 self.supports_smallserial = self.server_version_info >= (9, 2)
3264
3265 self._set_backslash_escapes(connection)
3266
3267 self._supports_drop_index_concurrently = self.server_version_info >= (
3268 9,
3269 2,
3270 )
3271 self.supports_identity_columns = self.server_version_info >= (10,)
3272
3273 self._supports_jsonb_subscripting = self.server_version_info >= (14,)
3274
3275 def get_isolation_level_values(self, dbapi_conn):
3276 # note the generic dialect doesn't have AUTOCOMMIT, however
3277 # all postgresql dialects should include AUTOCOMMIT.
3278 return (
3279 "SERIALIZABLE",
3280 "READ UNCOMMITTED",
3281 "READ COMMITTED",
3282 "REPEATABLE READ",
3283 )
3284
3285 def set_isolation_level(self, dbapi_connection, level):
3286 cursor = dbapi_connection.cursor()
3287 cursor.execute(
3288 "SET SESSION CHARACTERISTICS AS TRANSACTION "
3289 f"ISOLATION LEVEL {level}"
3290 )
3291 cursor.execute("COMMIT")
3292 cursor.close()
3293
3294 def get_isolation_level(self, dbapi_connection):
3295 cursor = dbapi_connection.cursor()
3296 cursor.execute("show transaction isolation level")
3297 val = cursor.fetchone()[0]
3298 cursor.close()
3299 return val.upper()
3300
3301 def set_readonly(self, connection, value):
3302 raise NotImplementedError()
3303
3304 def get_readonly(self, connection):
3305 raise NotImplementedError()
3306
3307 def set_deferrable(self, connection, value):
3308 raise NotImplementedError()
3309
3310 def get_deferrable(self, connection):
3311 raise NotImplementedError()
3312
3313 def _split_multihost_from_url(self, url: URL) -> Union[
3314 Tuple[None, None],
3315 Tuple[Tuple[Optional[str], ...], Tuple[Optional[int], ...]],
3316 ]:
3317 hosts: Optional[Tuple[Optional[str], ...]] = None
3318 ports_str: Union[str, Tuple[Optional[str], ...], None] = None
3319
3320 integrated_multihost = False
3321
3322 if "host" in url.query:
3323 if isinstance(url.query["host"], (list, tuple)):
3324 integrated_multihost = True
3325 hosts, ports_str = zip(
3326 *[
3327 token.split(":") if ":" in token else (token, None)
3328 for token in url.query["host"]
3329 ]
3330 )
3331
3332 elif isinstance(url.query["host"], str):
3333 hosts = tuple(url.query["host"].split(","))
3334
3335 if (
3336 "port" not in url.query
3337 and len(hosts) == 1
3338 and ":" in hosts[0]
3339 ):
3340 # internet host is alphanumeric plus dots or hyphens.
3341 # this is essentially rfc1123, which refers to rfc952.
3342 # https://stackoverflow.com/questions/3523028/
3343 # valid-characters-of-a-hostname
3344 host_port_match = re.match(
3345 r"^([a-zA-Z0-9\-\.]*)(?:\:(\d*))?$", hosts[0]
3346 )
3347 if host_port_match:
3348 integrated_multihost = True
3349 h, p = host_port_match.group(1, 2)
3350 if TYPE_CHECKING:
3351 assert isinstance(h, str)
3352 assert isinstance(p, str)
3353 hosts = (h,)
3354 ports_str = cast(
3355 "Tuple[Optional[str], ...]", (p,) if p else (None,)
3356 )
3357
3358 if "port" in url.query:
3359 if integrated_multihost:
3360 raise exc.ArgumentError(
3361 "Can't mix 'multihost' formats together; use "
3362 '"host=h1,h2,h3&port=p1,p2,p3" or '
3363 '"host=h1:p1&host=h2:p2&host=h3:p3" separately'
3364 )
3365 if isinstance(url.query["port"], (list, tuple)):
3366 ports_str = url.query["port"]
3367 elif isinstance(url.query["port"], str):
3368 ports_str = tuple(url.query["port"].split(","))
3369
3370 ports: Optional[Tuple[Optional[int], ...]] = None
3371
3372 if ports_str:
3373 try:
3374 ports = tuple(int(x) if x else None for x in ports_str)
3375 except ValueError:
3376 raise exc.ArgumentError(
3377 f"Received non-integer port arguments: {ports_str}"
3378 ) from None
3379
3380 if ports and (
3381 (not hosts and len(ports) > 1)
3382 or (
3383 hosts
3384 and ports
3385 and len(hosts) != len(ports)
3386 and (len(hosts) > 1 or len(ports) > 1)
3387 )
3388 ):
3389 raise exc.ArgumentError("number of hosts and ports don't match")
3390
3391 if hosts is not None:
3392 if ports is None:
3393 ports = tuple(None for _ in hosts)
3394
3395 return hosts, ports # type: ignore
3396
3397 def do_begin_twophase(self, connection, xid):
3398 self.do_begin(connection.connection)
3399
3400 def do_prepare_twophase(self, connection, xid):
3401 connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid)
3402
3403 def do_rollback_twophase(
3404 self, connection, xid, is_prepared=True, recover=False
3405 ):
3406 if is_prepared:
3407 if recover:
3408 # FIXME: ugly hack to get out of transaction
3409 # context when committing recoverable transactions
3410 # Must find out a way how to make the dbapi not
3411 # open a transaction.
3412 connection.exec_driver_sql("ROLLBACK")
3413 connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
3414 connection.exec_driver_sql("BEGIN")
3415 self.do_rollback(connection.connection)
3416 else:
3417 self.do_rollback(connection.connection)
3418
3419 def do_commit_twophase(
3420 self, connection, xid, is_prepared=True, recover=False
3421 ):
3422 if is_prepared:
3423 if recover:
3424 connection.exec_driver_sql("ROLLBACK")
3425 connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid)
3426 connection.exec_driver_sql("BEGIN")
3427 self.do_rollback(connection.connection)
3428 else:
3429 self.do_commit(connection.connection)
3430
3431 def do_recover_twophase(self, connection):
3432 return connection.scalars(
3433 sql.text("SELECT gid FROM pg_prepared_xacts")
3434 ).all()
3435
3436 def _get_default_schema_name(self, connection):
3437 return connection.exec_driver_sql("select current_schema()").scalar()
3438
3439 @reflection.cache
3440 def has_schema(self, connection, schema, **kw):
3441 query = select(pg_catalog.pg_namespace.c.nspname).where(
3442 pg_catalog.pg_namespace.c.nspname == schema
3443 )
3444 return bool(connection.scalar(query))
3445
3446 def _pg_class_filter_scope_schema(
3447 self, query, schema, scope, pg_class_table=None
3448 ):
3449 if pg_class_table is None:
3450 pg_class_table = pg_catalog.pg_class
3451 query = query.join(
3452 pg_catalog.pg_namespace,
3453 pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace,
3454 )
3455
3456 if scope is ObjectScope.DEFAULT:
3457 query = query.where(pg_class_table.c.relpersistence != "t")
3458 elif scope is ObjectScope.TEMPORARY:
3459 query = query.where(pg_class_table.c.relpersistence == "t")
3460
3461 if schema is None:
3462 query = query.where(
3463 pg_catalog.pg_table_is_visible(pg_class_table.c.oid),
3464 # ignore pg_catalog schema
3465 pg_catalog.pg_namespace.c.nspname != "pg_catalog",
3466 )
3467 else:
3468 query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
3469 return query
3470
3471 def _pg_class_relkind_condition(self, relkinds, pg_class_table=None):
3472 if pg_class_table is None:
3473 pg_class_table = pg_catalog.pg_class
3474 # uses the any form instead of in otherwise postgresql complaings
3475 # that 'IN could not convert type character to "char"'
3476 return pg_class_table.c.relkind == sql.any_(_array.array(relkinds))
3477
3478 @lru_cache()
3479 def _has_table_query(self, schema):
3480 query = select(pg_catalog.pg_class.c.relname).where(
3481 pg_catalog.pg_class.c.relname == bindparam("table_name"),
3482 self._pg_class_relkind_condition(
3483 pg_catalog.RELKINDS_ALL_TABLE_LIKE
3484 ),
3485 )
3486 return self._pg_class_filter_scope_schema(
3487 query, schema, scope=ObjectScope.ANY
3488 )
3489
3490 @reflection.cache
3491 def has_table(self, connection, table_name, schema=None, **kw):
3492 self._ensure_has_table_connection(connection)
3493 query = self._has_table_query(schema)
3494 return bool(connection.scalar(query, {"table_name": table_name}))
3495
3496 @reflection.cache
3497 def has_sequence(self, connection, sequence_name, schema=None, **kw):
3498 query = select(pg_catalog.pg_class.c.relname).where(
3499 pg_catalog.pg_class.c.relkind == "S",
3500 pg_catalog.pg_class.c.relname == sequence_name,
3501 )
3502 query = self._pg_class_filter_scope_schema(
3503 query, schema, scope=ObjectScope.ANY
3504 )
3505 return bool(connection.scalar(query))
3506
3507 @reflection.cache
3508 def has_type(self, connection, type_name, schema=None, **kw):
3509 query = (
3510 select(pg_catalog.pg_type.c.typname)
3511 .join(
3512 pg_catalog.pg_namespace,
3513 pg_catalog.pg_namespace.c.oid
3514 == pg_catalog.pg_type.c.typnamespace,
3515 )
3516 .where(pg_catalog.pg_type.c.typname == type_name)
3517 )
3518 if schema is None:
3519 query = query.where(
3520 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid),
3521 # ignore pg_catalog schema
3522 pg_catalog.pg_namespace.c.nspname != "pg_catalog",
3523 )
3524 elif schema != "*":
3525 query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
3526
3527 return bool(connection.scalar(query))
3528
3529 def _get_server_version_info(self, connection):
3530 v = connection.exec_driver_sql("select pg_catalog.version()").scalar()
3531 m = re.match(
3532 r".*(?:PostgreSQL|EnterpriseDB) "
3533 r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?",
3534 v,
3535 )
3536 if not m:
3537 raise AssertionError(
3538 "Could not determine version from string '%s'" % v
3539 )
3540 return tuple([int(x) for x in m.group(1, 2, 3) if x is not None])
3541
3542 @reflection.cache
3543 def get_table_oid(self, connection, table_name, schema=None, **kw):
3544 """Fetch the oid for schema.table_name."""
3545 query = select(pg_catalog.pg_class.c.oid).where(
3546 pg_catalog.pg_class.c.relname == table_name,
3547 self._pg_class_relkind_condition(
3548 pg_catalog.RELKINDS_ALL_TABLE_LIKE
3549 ),
3550 )
3551 query = self._pg_class_filter_scope_schema(
3552 query, schema, scope=ObjectScope.ANY
3553 )
3554 table_oid = connection.scalar(query)
3555 if table_oid is None:
3556 raise exc.NoSuchTableError(
3557 f"{schema}.{table_name}" if schema else table_name
3558 )
3559 return table_oid
3560
3561 @reflection.cache
3562 def get_schema_names(self, connection, **kw):
3563 query = (
3564 select(pg_catalog.pg_namespace.c.nspname)
3565 .where(pg_catalog.pg_namespace.c.nspname.not_like("pg_%"))
3566 .order_by(pg_catalog.pg_namespace.c.nspname)
3567 )
3568 return connection.scalars(query).all()
3569
3570 def _get_relnames_for_relkinds(self, connection, schema, relkinds, scope):
3571 query = select(pg_catalog.pg_class.c.relname).where(
3572 self._pg_class_relkind_condition(relkinds)
3573 )
3574 query = self._pg_class_filter_scope_schema(query, schema, scope=scope)
3575 return connection.scalars(query).all()
3576
3577 @reflection.cache
3578 def get_table_names(self, connection, schema=None, **kw):
3579 return self._get_relnames_for_relkinds(
3580 connection,
3581 schema,
3582 pg_catalog.RELKINDS_TABLE_NO_FOREIGN,
3583 scope=ObjectScope.DEFAULT,
3584 )
3585
3586 @reflection.cache
3587 def get_temp_table_names(self, connection, **kw):
3588 return self._get_relnames_for_relkinds(
3589 connection,
3590 schema=None,
3591 relkinds=pg_catalog.RELKINDS_TABLE_NO_FOREIGN,
3592 scope=ObjectScope.TEMPORARY,
3593 )
3594
3595 @reflection.cache
3596 def _get_foreign_table_names(self, connection, schema=None, **kw):
3597 return self._get_relnames_for_relkinds(
3598 connection, schema, relkinds=("f",), scope=ObjectScope.ANY
3599 )
3600
3601 @reflection.cache
3602 def get_view_names(self, connection, schema=None, **kw):
3603 return self._get_relnames_for_relkinds(
3604 connection,
3605 schema,
3606 pg_catalog.RELKINDS_VIEW,
3607 scope=ObjectScope.DEFAULT,
3608 )
3609
3610 @reflection.cache
3611 def get_materialized_view_names(self, connection, schema=None, **kw):
3612 return self._get_relnames_for_relkinds(
3613 connection,
3614 schema,
3615 pg_catalog.RELKINDS_MAT_VIEW,
3616 scope=ObjectScope.DEFAULT,
3617 )
3618
3619 @reflection.cache
3620 def get_temp_view_names(self, connection, schema=None, **kw):
3621 return self._get_relnames_for_relkinds(
3622 connection,
3623 schema,
3624 # NOTE: do not include temp materialzied views (that do not
3625 # seem to be a thing at least up to version 14)
3626 pg_catalog.RELKINDS_VIEW,
3627 scope=ObjectScope.TEMPORARY,
3628 )
3629
3630 @reflection.cache
3631 def get_sequence_names(self, connection, schema=None, **kw):
3632 return self._get_relnames_for_relkinds(
3633 connection, schema, relkinds=("S",), scope=ObjectScope.ANY
3634 )
3635
3636 @reflection.cache
3637 def get_view_definition(self, connection, view_name, schema=None, **kw):
3638 query = (
3639 select(pg_catalog.pg_get_viewdef(pg_catalog.pg_class.c.oid))
3640 .select_from(pg_catalog.pg_class)
3641 .where(
3642 pg_catalog.pg_class.c.relname == view_name,
3643 self._pg_class_relkind_condition(
3644 pg_catalog.RELKINDS_VIEW + pg_catalog.RELKINDS_MAT_VIEW
3645 ),
3646 )
3647 )
3648 query = self._pg_class_filter_scope_schema(
3649 query, schema, scope=ObjectScope.ANY
3650 )
3651 res = connection.scalar(query)
3652 if res is None:
3653 raise exc.NoSuchTableError(
3654 f"{schema}.{view_name}" if schema else view_name
3655 )
3656 else:
3657 return res
3658
3659 def _value_or_raise(self, data, table, schema):
3660 try:
3661 return dict(data)[(schema, table)]
3662 except KeyError:
3663 raise exc.NoSuchTableError(
3664 f"{schema}.{table}" if schema else table
3665 ) from None
3666
3667 def _prepare_filter_names(self, filter_names):
3668 if filter_names:
3669 return True, {"filter_names": filter_names}
3670 else:
3671 return False, {}
3672
3673 def _kind_to_relkinds(self, kind: ObjectKind) -> Tuple[str, ...]:
3674 if kind is ObjectKind.ANY:
3675 return pg_catalog.RELKINDS_ALL_TABLE_LIKE
3676 relkinds = ()
3677 if ObjectKind.TABLE in kind:
3678 relkinds += pg_catalog.RELKINDS_TABLE
3679 if ObjectKind.VIEW in kind:
3680 relkinds += pg_catalog.RELKINDS_VIEW
3681 if ObjectKind.MATERIALIZED_VIEW in kind:
3682 relkinds += pg_catalog.RELKINDS_MAT_VIEW
3683 return relkinds
3684
3685 @reflection.cache
3686 def get_columns(self, connection, table_name, schema=None, **kw):
3687 data = self.get_multi_columns(
3688 connection,
3689 schema=schema,
3690 filter_names=[table_name],
3691 scope=ObjectScope.ANY,
3692 kind=ObjectKind.ANY,
3693 **kw,
3694 )
3695 return self._value_or_raise(data, table_name, schema)
3696
3697 @lru_cache()
3698 def _columns_query(self, schema, has_filter_names, scope, kind):
3699 # NOTE: the query with the default and identity options scalar
3700 # subquery is faster than trying to use outer joins for them
3701 generated = (
3702 pg_catalog.pg_attribute.c.attgenerated.label("generated")
3703 if self.server_version_info >= (12,)
3704 else sql.null().label("generated")
3705 )
3706 if self.server_version_info >= (10,):
3707 # join lateral performs worse (~2x slower) than a scalar_subquery
3708 identity = (
3709 select(
3710 sql.func.json_build_object(
3711 "always",
3712 pg_catalog.pg_attribute.c.attidentity == "a",
3713 "start",
3714 pg_catalog.pg_sequence.c.seqstart,
3715 "increment",
3716 pg_catalog.pg_sequence.c.seqincrement,
3717 "minvalue",
3718 pg_catalog.pg_sequence.c.seqmin,
3719 "maxvalue",
3720 pg_catalog.pg_sequence.c.seqmax,
3721 "cache",
3722 pg_catalog.pg_sequence.c.seqcache,
3723 "cycle",
3724 pg_catalog.pg_sequence.c.seqcycle,
3725 type_=sqltypes.JSON(),
3726 )
3727 )
3728 .select_from(pg_catalog.pg_sequence)
3729 .where(
3730 # attidentity != '' is required or it will reflect also
3731 # serial columns as identity.
3732 pg_catalog.pg_attribute.c.attidentity != "",
3733 pg_catalog.pg_sequence.c.seqrelid
3734 == sql.cast(
3735 sql.cast(
3736 pg_catalog.pg_get_serial_sequence(
3737 sql.cast(
3738 sql.cast(
3739 pg_catalog.pg_attribute.c.attrelid,
3740 REGCLASS,
3741 ),
3742 TEXT,
3743 ),
3744 pg_catalog.pg_attribute.c.attname,
3745 ),
3746 REGCLASS,
3747 ),
3748 OID,
3749 ),
3750 )
3751 .correlate(pg_catalog.pg_attribute)
3752 .scalar_subquery()
3753 .label("identity_options")
3754 )
3755 else:
3756 identity = sql.null().label("identity_options")
3757
3758 # join lateral performs the same as scalar_subquery here
3759 default = (
3760 select(
3761 pg_catalog.pg_get_expr(
3762 pg_catalog.pg_attrdef.c.adbin,
3763 pg_catalog.pg_attrdef.c.adrelid,
3764 )
3765 )
3766 .select_from(pg_catalog.pg_attrdef)
3767 .where(
3768 pg_catalog.pg_attrdef.c.adrelid
3769 == pg_catalog.pg_attribute.c.attrelid,
3770 pg_catalog.pg_attrdef.c.adnum
3771 == pg_catalog.pg_attribute.c.attnum,
3772 pg_catalog.pg_attribute.c.atthasdef,
3773 )
3774 .correlate(pg_catalog.pg_attribute)
3775 .scalar_subquery()
3776 .label("default")
3777 )
3778 relkinds = self._kind_to_relkinds(kind)
3779 query = (
3780 select(
3781 pg_catalog.pg_attribute.c.attname.label("name"),
3782 pg_catalog.format_type(
3783 pg_catalog.pg_attribute.c.atttypid,
3784 pg_catalog.pg_attribute.c.atttypmod,
3785 ).label("format_type"),
3786 default,
3787 pg_catalog.pg_attribute.c.attnotnull.label("not_null"),
3788 pg_catalog.pg_class.c.relname.label("table_name"),
3789 pg_catalog.pg_description.c.description.label("comment"),
3790 generated,
3791 identity,
3792 )
3793 .select_from(pg_catalog.pg_class)
3794 # NOTE: postgresql support table with no user column, meaning
3795 # there is no row with pg_attribute.attnum > 0. use a left outer
3796 # join to avoid filtering these tables.
3797 .outerjoin(
3798 pg_catalog.pg_attribute,
3799 sql.and_(
3800 pg_catalog.pg_class.c.oid
3801 == pg_catalog.pg_attribute.c.attrelid,
3802 pg_catalog.pg_attribute.c.attnum > 0,
3803 ~pg_catalog.pg_attribute.c.attisdropped,
3804 ),
3805 )
3806 .outerjoin(
3807 pg_catalog.pg_description,
3808 sql.and_(
3809 pg_catalog.pg_description.c.objoid
3810 == pg_catalog.pg_attribute.c.attrelid,
3811 pg_catalog.pg_description.c.objsubid
3812 == pg_catalog.pg_attribute.c.attnum,
3813 ),
3814 )
3815 .where(self._pg_class_relkind_condition(relkinds))
3816 .order_by(
3817 pg_catalog.pg_class.c.relname, pg_catalog.pg_attribute.c.attnum
3818 )
3819 )
3820 query = self._pg_class_filter_scope_schema(query, schema, scope=scope)
3821 if has_filter_names:
3822 query = query.where(
3823 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
3824 )
3825 return query
3826
3827 def get_multi_columns(
3828 self, connection, schema, filter_names, scope, kind, **kw
3829 ):
3830 has_filter_names, params = self._prepare_filter_names(filter_names)
3831 query = self._columns_query(schema, has_filter_names, scope, kind)
3832 rows = connection.execute(query, params).mappings()
3833
3834 # dictionary with (name, ) if default search path or (schema, name)
3835 # as keys
3836 domains = {
3837 ((d["schema"], d["name"]) if not d["visible"] else (d["name"],)): d
3838 for d in self._load_domains(
3839 connection, schema="*", info_cache=kw.get("info_cache")
3840 )
3841 }
3842
3843 # dictionary with (name, ) if default search path or (schema, name)
3844 # as keys
3845 enums = dict(
3846 (
3847 ((rec["name"],), rec)
3848 if rec["visible"]
3849 else ((rec["schema"], rec["name"]), rec)
3850 )
3851 for rec in self._load_enums(
3852 connection, schema="*", info_cache=kw.get("info_cache")
3853 )
3854 )
3855
3856 columns = self._get_columns_info(rows, domains, enums, schema)
3857
3858 return columns.items()
3859
3860 _format_type_args_pattern = re.compile(r"\((.*)\)")
3861 _format_type_args_delim = re.compile(r"\s*,\s*")
3862 _format_array_spec_pattern = re.compile(r"((?:\[\])*)$")
3863
3864 def _reflect_type(
3865 self,
3866 format_type: Optional[str],
3867 domains: Dict[str, ReflectedDomain],
3868 enums: Dict[str, ReflectedEnum],
3869 type_description: str,
3870 ) -> sqltypes.TypeEngine[Any]:
3871 """
3872 Attempts to reconstruct a column type defined in ischema_names based
3873 on the information available in the format_type.
3874
3875 If the `format_type` cannot be associated with a known `ischema_names`,
3876 it is treated as a reference to a known PostgreSQL named `ENUM` or
3877 `DOMAIN` type.
3878 """
3879 type_description = type_description or "unknown type"
3880 if format_type is None:
3881 util.warn(
3882 "PostgreSQL format_type() returned NULL for %s"
3883 % type_description
3884 )
3885 return sqltypes.NULLTYPE
3886
3887 attype_args_match = self._format_type_args_pattern.search(format_type)
3888 if attype_args_match and attype_args_match.group(1):
3889 attype_args = self._format_type_args_delim.split(
3890 attype_args_match.group(1)
3891 )
3892 else:
3893 attype_args = ()
3894
3895 match_array_dim = self._format_array_spec_pattern.search(format_type)
3896 # Each "[]" in array specs corresponds to an array dimension
3897 array_dim = len(match_array_dim.group(1) or "") // 2
3898
3899 # Remove all parameters and array specs from format_type to obtain an
3900 # ischema_name candidate
3901 attype = self._format_type_args_pattern.sub("", format_type)
3902 attype = self._format_array_spec_pattern.sub("", attype)
3903
3904 schema_type = self.ischema_names.get(attype.lower(), None)
3905 args, kwargs = (), {}
3906
3907 if attype == "numeric":
3908 if len(attype_args) == 2:
3909 precision, scale = map(int, attype_args)
3910 args = (precision, scale)
3911
3912 elif attype == "double precision":
3913 args = (53,)
3914
3915 elif attype == "integer":
3916 args = ()
3917
3918 elif attype in ("timestamp with time zone", "time with time zone"):
3919 kwargs["timezone"] = True
3920 if len(attype_args) == 1:
3921 kwargs["precision"] = int(attype_args[0])
3922
3923 elif attype in (
3924 "timestamp without time zone",
3925 "time without time zone",
3926 "time",
3927 ):
3928 kwargs["timezone"] = False
3929 if len(attype_args) == 1:
3930 kwargs["precision"] = int(attype_args[0])
3931
3932 elif attype == "bit varying":
3933 kwargs["varying"] = True
3934 if len(attype_args) == 1:
3935 charlen = int(attype_args[0])
3936 args = (charlen,)
3937
3938 # a domain or enum can start with interval, so be mindful of that.
3939 elif attype == "interval" or attype.startswith("interval "):
3940 schema_type = INTERVAL
3941
3942 field_match = re.match(r"interval (.+)", attype)
3943 if field_match:
3944 kwargs["fields"] = field_match.group(1)
3945
3946 if len(attype_args) == 1:
3947 kwargs["precision"] = int(attype_args[0])
3948
3949 else:
3950 enum_or_domain_key = tuple(util.quoted_token_parser(attype))
3951
3952 if enum_or_domain_key in enums:
3953 schema_type = ENUM
3954 enum = enums[enum_or_domain_key]
3955
3956 kwargs["name"] = enum["name"]
3957
3958 if not enum["visible"]:
3959 kwargs["schema"] = enum["schema"]
3960 args = tuple(enum["labels"])
3961 elif enum_or_domain_key in domains:
3962 schema_type = DOMAIN
3963 domain = domains[enum_or_domain_key]
3964
3965 data_type = self._reflect_type(
3966 domain["type"],
3967 domains,
3968 enums,
3969 type_description="DOMAIN '%s'" % domain["name"],
3970 )
3971 args = (domain["name"], data_type)
3972
3973 kwargs["collation"] = domain["collation"]
3974 kwargs["default"] = domain["default"]
3975 kwargs["not_null"] = not domain["nullable"]
3976 kwargs["create_type"] = False
3977
3978 if domain["constraints"]:
3979 # We only support a single constraint
3980 check_constraint = domain["constraints"][0]
3981
3982 kwargs["constraint_name"] = check_constraint["name"]
3983 kwargs["check"] = check_constraint["check"]
3984
3985 if not domain["visible"]:
3986 kwargs["schema"] = domain["schema"]
3987
3988 else:
3989 try:
3990 charlen = int(attype_args[0])
3991 args = (charlen, *attype_args[1:])
3992 except (ValueError, IndexError):
3993 args = attype_args
3994
3995 if not schema_type:
3996 util.warn(
3997 "Did not recognize type '%s' of %s"
3998 % (attype, type_description)
3999 )
4000 return sqltypes.NULLTYPE
4001
4002 data_type = schema_type(*args, **kwargs)
4003 if array_dim >= 1:
4004 # postgres does not preserve dimensionality or size of array types.
4005 data_type = _array.ARRAY(data_type)
4006
4007 return data_type
4008
4009 def _get_columns_info(self, rows, domains, enums, schema):
4010 columns = defaultdict(list)
4011 for row_dict in rows:
4012 # ensure that each table has an entry, even if it has no columns
4013 if row_dict["name"] is None:
4014 columns[(schema, row_dict["table_name"])] = (
4015 ReflectionDefaults.columns()
4016 )
4017 continue
4018 table_cols = columns[(schema, row_dict["table_name"])]
4019
4020 coltype = self._reflect_type(
4021 row_dict["format_type"],
4022 domains,
4023 enums,
4024 type_description="column '%s'" % row_dict["name"],
4025 )
4026
4027 default = row_dict["default"]
4028 name = row_dict["name"]
4029 generated = row_dict["generated"]
4030 nullable = not row_dict["not_null"]
4031
4032 if isinstance(coltype, DOMAIN):
4033 if not default:
4034 # domain can override the default value but
4035 # cant set it to None
4036 if coltype.default is not None:
4037 default = coltype.default
4038
4039 nullable = nullable and not coltype.not_null
4040
4041 identity = row_dict["identity_options"]
4042
4043 # If a zero byte or blank string depending on driver (is also
4044 # absent for older PG versions), then not a generated column.
4045 # Otherwise, s = stored. (Other values might be added in the
4046 # future.)
4047 if generated not in (None, "", b"\x00"):
4048 computed = dict(
4049 sqltext=default, persisted=generated in ("s", b"s")
4050 )
4051 default = None
4052 else:
4053 computed = None
4054
4055 # adjust the default value
4056 autoincrement = False
4057 if default is not None:
4058 match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
4059 if match is not None:
4060 if issubclass(coltype._type_affinity, sqltypes.Integer):
4061 autoincrement = True
4062 # the default is related to a Sequence
4063 if "." not in match.group(2) and schema is not None:
4064 # unconditionally quote the schema name. this could
4065 # later be enhanced to obey quoting rules /
4066 # "quote schema"
4067 default = (
4068 match.group(1)
4069 + ('"%s"' % schema)
4070 + "."
4071 + match.group(2)
4072 + match.group(3)
4073 )
4074
4075 column_info = {
4076 "name": name,
4077 "type": coltype,
4078 "nullable": nullable,
4079 "default": default,
4080 "autoincrement": autoincrement or identity is not None,
4081 "comment": row_dict["comment"],
4082 }
4083 if computed is not None:
4084 column_info["computed"] = computed
4085 if identity is not None:
4086 column_info["identity"] = identity
4087
4088 table_cols.append(column_info)
4089
4090 return columns
4091
4092 @lru_cache()
4093 def _table_oids_query(self, schema, has_filter_names, scope, kind):
4094 relkinds = self._kind_to_relkinds(kind)
4095 oid_q = select(
4096 pg_catalog.pg_class.c.oid, pg_catalog.pg_class.c.relname
4097 ).where(self._pg_class_relkind_condition(relkinds))
4098 oid_q = self._pg_class_filter_scope_schema(oid_q, schema, scope=scope)
4099
4100 if has_filter_names:
4101 oid_q = oid_q.where(
4102 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
4103 )
4104 return oid_q
4105
4106 @reflection.flexi_cache(
4107 ("schema", InternalTraversal.dp_string),
4108 ("filter_names", InternalTraversal.dp_string_list),
4109 ("kind", InternalTraversal.dp_plain_obj),
4110 ("scope", InternalTraversal.dp_plain_obj),
4111 )
4112 def _get_table_oids(
4113 self, connection, schema, filter_names, scope, kind, **kw
4114 ):
4115 has_filter_names, params = self._prepare_filter_names(filter_names)
4116 oid_q = self._table_oids_query(schema, has_filter_names, scope, kind)
4117 result = connection.execute(oid_q, params)
4118 return result.all()
4119
4120 @util.memoized_property
4121 def _constraint_query(self):
4122 if self.server_version_info >= (11, 0):
4123 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts
4124 else:
4125 indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts")
4126
4127 if self.server_version_info >= (15,):
4128 indnullsnotdistinct = pg_catalog.pg_index.c.indnullsnotdistinct
4129 else:
4130 indnullsnotdistinct = sql.false().label("indnullsnotdistinct")
4131
4132 con_sq = (
4133 select(
4134 pg_catalog.pg_constraint.c.conrelid,
4135 pg_catalog.pg_constraint.c.conname,
4136 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"),
4137 sql.func.generate_subscripts(
4138 pg_catalog.pg_index.c.indkey, 1
4139 ).label("ord"),
4140 indnkeyatts,
4141 indnullsnotdistinct,
4142 pg_catalog.pg_description.c.description,
4143 )
4144 .join(
4145 pg_catalog.pg_index,
4146 pg_catalog.pg_constraint.c.conindid
4147 == pg_catalog.pg_index.c.indexrelid,
4148 )
4149 .outerjoin(
4150 pg_catalog.pg_description,
4151 pg_catalog.pg_description.c.objoid
4152 == pg_catalog.pg_constraint.c.oid,
4153 )
4154 .where(
4155 pg_catalog.pg_constraint.c.contype == bindparam("contype"),
4156 pg_catalog.pg_constraint.c.conrelid.in_(bindparam("oids")),
4157 # NOTE: filtering also on pg_index.indrelid for oids does
4158 # not seem to have a performance effect, but it may be an
4159 # option if perf problems are reported
4160 )
4161 .subquery("con")
4162 )
4163
4164 attr_sq = (
4165 select(
4166 con_sq.c.conrelid,
4167 con_sq.c.conname,
4168 con_sq.c.description,
4169 con_sq.c.ord,
4170 con_sq.c.indnkeyatts,
4171 con_sq.c.indnullsnotdistinct,
4172 pg_catalog.pg_attribute.c.attname,
4173 )
4174 .select_from(pg_catalog.pg_attribute)
4175 .join(
4176 con_sq,
4177 sql.and_(
4178 pg_catalog.pg_attribute.c.attnum == con_sq.c.attnum,
4179 pg_catalog.pg_attribute.c.attrelid == con_sq.c.conrelid,
4180 ),
4181 )
4182 .where(
4183 # NOTE: restate the condition here, since pg15 otherwise
4184 # seems to get confused on pscopg2 sometimes, doing
4185 # a sequential scan of pg_attribute.
4186 # The condition in the con_sq subquery is not actually needed
4187 # in pg15, but it may be needed in older versions. Keeping it
4188 # does not seems to have any inpact in any case.
4189 con_sq.c.conrelid.in_(bindparam("oids"))
4190 )
4191 .subquery("attr")
4192 )
4193
4194 return (
4195 select(
4196 attr_sq.c.conrelid,
4197 sql.func.array_agg(
4198 # NOTE: cast since some postgresql derivatives may
4199 # not support array_agg on the name type
4200 aggregate_order_by(
4201 attr_sq.c.attname.cast(TEXT), attr_sq.c.ord
4202 )
4203 ).label("cols"),
4204 attr_sq.c.conname,
4205 sql.func.min(attr_sq.c.description).label("description"),
4206 sql.func.min(attr_sq.c.indnkeyatts).label("indnkeyatts"),
4207 sql.func.bool_and(attr_sq.c.indnullsnotdistinct).label(
4208 "indnullsnotdistinct"
4209 ),
4210 )
4211 .group_by(attr_sq.c.conrelid, attr_sq.c.conname)
4212 .order_by(attr_sq.c.conrelid, attr_sq.c.conname)
4213 )
4214
4215 def _reflect_constraint(
4216 self, connection, contype, schema, filter_names, scope, kind, **kw
4217 ):
4218 # used to reflect primary and unique constraint
4219 table_oids = self._get_table_oids(
4220 connection, schema, filter_names, scope, kind, **kw
4221 )
4222 batches = list(table_oids)
4223 is_unique = contype == "u"
4224
4225 while batches:
4226 batch = batches[0:3000]
4227 batches[0:3000] = []
4228
4229 result = connection.execute(
4230 self._constraint_query,
4231 {"oids": [r[0] for r in batch], "contype": contype},
4232 ).mappings()
4233
4234 result_by_oid = defaultdict(list)
4235 for row_dict in result:
4236 result_by_oid[row_dict["conrelid"]].append(row_dict)
4237
4238 for oid, tablename in batch:
4239 for_oid = result_by_oid.get(oid, ())
4240 if for_oid:
4241 for row in for_oid:
4242 # See note in get_multi_indexes
4243 all_cols = row["cols"]
4244 indnkeyatts = row["indnkeyatts"]
4245 if len(all_cols) > indnkeyatts:
4246 inc_cols = all_cols[indnkeyatts:]
4247 cst_cols = all_cols[:indnkeyatts]
4248 else:
4249 inc_cols = []
4250 cst_cols = all_cols
4251
4252 opts = {}
4253 if self.server_version_info >= (11,):
4254 opts["postgresql_include"] = inc_cols
4255 if is_unique:
4256 opts["postgresql_nulls_not_distinct"] = row[
4257 "indnullsnotdistinct"
4258 ]
4259 yield (
4260 tablename,
4261 cst_cols,
4262 row["conname"],
4263 row["description"],
4264 opts,
4265 )
4266 else:
4267 yield tablename, None, None, None, None
4268
4269 @reflection.cache
4270 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
4271 data = self.get_multi_pk_constraint(
4272 connection,
4273 schema=schema,
4274 filter_names=[table_name],
4275 scope=ObjectScope.ANY,
4276 kind=ObjectKind.ANY,
4277 **kw,
4278 )
4279 return self._value_or_raise(data, table_name, schema)
4280
4281 def get_multi_pk_constraint(
4282 self, connection, schema, filter_names, scope, kind, **kw
4283 ):
4284 result = self._reflect_constraint(
4285 connection, "p", schema, filter_names, scope, kind, **kw
4286 )
4287
4288 # only a single pk can be present for each table. Return an entry
4289 # even if a table has no primary key
4290 default = ReflectionDefaults.pk_constraint
4291
4292 def pk_constraint(pk_name, cols, comment, opts):
4293 info = {
4294 "constrained_columns": cols,
4295 "name": pk_name,
4296 "comment": comment,
4297 }
4298 if opts:
4299 info["dialect_options"] = opts
4300 return info
4301
4302 return (
4303 (
4304 (schema, table_name),
4305 (
4306 pk_constraint(pk_name, cols, comment, opts)
4307 if pk_name is not None
4308 else default()
4309 ),
4310 )
4311 for table_name, cols, pk_name, comment, opts in result
4312 )
4313
4314 @reflection.cache
4315 def get_foreign_keys(
4316 self,
4317 connection,
4318 table_name,
4319 schema=None,
4320 postgresql_ignore_search_path=False,
4321 **kw,
4322 ):
4323 data = self.get_multi_foreign_keys(
4324 connection,
4325 schema=schema,
4326 filter_names=[table_name],
4327 postgresql_ignore_search_path=postgresql_ignore_search_path,
4328 scope=ObjectScope.ANY,
4329 kind=ObjectKind.ANY,
4330 **kw,
4331 )
4332 return self._value_or_raise(data, table_name, schema)
4333
4334 @lru_cache()
4335 def _foreing_key_query(self, schema, has_filter_names, scope, kind):
4336 pg_class_ref = pg_catalog.pg_class.alias("cls_ref")
4337 pg_namespace_ref = pg_catalog.pg_namespace.alias("nsp_ref")
4338 relkinds = self._kind_to_relkinds(kind)
4339 query = (
4340 select(
4341 pg_catalog.pg_class.c.relname,
4342 pg_catalog.pg_constraint.c.conname,
4343 # NOTE: avoid calling pg_get_constraintdef when not needed
4344 # to speed up the query
4345 sql.case(
4346 (
4347 pg_catalog.pg_constraint.c.oid.is_not(None),
4348 pg_catalog.pg_get_constraintdef(
4349 pg_catalog.pg_constraint.c.oid, True
4350 ),
4351 ),
4352 else_=None,
4353 ),
4354 pg_namespace_ref.c.nspname,
4355 pg_catalog.pg_description.c.description,
4356 )
4357 .select_from(pg_catalog.pg_class)
4358 .outerjoin(
4359 pg_catalog.pg_constraint,
4360 sql.and_(
4361 pg_catalog.pg_class.c.oid
4362 == pg_catalog.pg_constraint.c.conrelid,
4363 pg_catalog.pg_constraint.c.contype == "f",
4364 ),
4365 )
4366 .outerjoin(
4367 pg_class_ref,
4368 pg_class_ref.c.oid == pg_catalog.pg_constraint.c.confrelid,
4369 )
4370 .outerjoin(
4371 pg_namespace_ref,
4372 pg_class_ref.c.relnamespace == pg_namespace_ref.c.oid,
4373 )
4374 .outerjoin(
4375 pg_catalog.pg_description,
4376 pg_catalog.pg_description.c.objoid
4377 == pg_catalog.pg_constraint.c.oid,
4378 )
4379 .order_by(
4380 pg_catalog.pg_class.c.relname,
4381 pg_catalog.pg_constraint.c.conname,
4382 )
4383 .where(self._pg_class_relkind_condition(relkinds))
4384 )
4385 query = self._pg_class_filter_scope_schema(query, schema, scope)
4386 if has_filter_names:
4387 query = query.where(
4388 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
4389 )
4390 return query
4391
4392 @util.memoized_property
4393 def _fk_regex_pattern(self):
4394 # optionally quoted token
4395 qtoken = '(?:"[^"]+"|[A-Za-z0-9_]+?)'
4396
4397 # https://www.postgresql.org/docs/current/static/sql-createtable.html
4398 return re.compile(
4399 r"FOREIGN KEY \((.*?)\) "
4400 rf"REFERENCES (?:({qtoken})\.)?({qtoken})\(((?:{qtoken}(?: *, *)?)+)\)" # noqa: E501
4401 r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?"
4402 r"[\s]?(ON UPDATE "
4403 r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
4404 r"[\s]?(ON DELETE "
4405 r"(CASCADE|RESTRICT|NO ACTION|"
4406 r"SET (?:NULL|DEFAULT)(?:\s\(.+\))?)+)?"
4407 r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?"
4408 r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?"
4409 )
4410
4411 def get_multi_foreign_keys(
4412 self,
4413 connection,
4414 schema,
4415 filter_names,
4416 scope,
4417 kind,
4418 postgresql_ignore_search_path=False,
4419 **kw,
4420 ):
4421 preparer = self.identifier_preparer
4422
4423 has_filter_names, params = self._prepare_filter_names(filter_names)
4424 query = self._foreing_key_query(schema, has_filter_names, scope, kind)
4425 result = connection.execute(query, params)
4426
4427 FK_REGEX = self._fk_regex_pattern
4428
4429 fkeys = defaultdict(list)
4430 default = ReflectionDefaults.foreign_keys
4431 for table_name, conname, condef, conschema, comment in result:
4432 # ensure that each table has an entry, even if it has
4433 # no foreign keys
4434 if conname is None:
4435 fkeys[(schema, table_name)] = default()
4436 continue
4437 table_fks = fkeys[(schema, table_name)]
4438 m = re.search(FK_REGEX, condef).groups()
4439
4440 (
4441 constrained_columns,
4442 referred_schema,
4443 referred_table,
4444 referred_columns,
4445 _,
4446 match,
4447 _,
4448 onupdate,
4449 _,
4450 ondelete,
4451 deferrable,
4452 _,
4453 initially,
4454 ) = m
4455
4456 if deferrable is not None:
4457 deferrable = True if deferrable == "DEFERRABLE" else False
4458 constrained_columns = [
4459 preparer._unquote_identifier(x)
4460 for x in re.split(r"\s*,\s*", constrained_columns)
4461 ]
4462
4463 if postgresql_ignore_search_path:
4464 # when ignoring search path, we use the actual schema
4465 # provided it isn't the "default" schema
4466 if conschema != self.default_schema_name:
4467 referred_schema = conschema
4468 else:
4469 referred_schema = schema
4470 elif referred_schema:
4471 # referred_schema is the schema that we regexp'ed from
4472 # pg_get_constraintdef(). If the schema is in the search
4473 # path, pg_get_constraintdef() will give us None.
4474 referred_schema = preparer._unquote_identifier(referred_schema)
4475 elif schema is not None and schema == conschema:
4476 # If the actual schema matches the schema of the table
4477 # we're reflecting, then we will use that.
4478 referred_schema = schema
4479
4480 referred_table = preparer._unquote_identifier(referred_table)
4481 referred_columns = [
4482 preparer._unquote_identifier(x)
4483 for x in re.split(r"\s*,\s", referred_columns)
4484 ]
4485 options = {
4486 k: v
4487 for k, v in [
4488 ("onupdate", onupdate),
4489 ("ondelete", ondelete),
4490 ("initially", initially),
4491 ("deferrable", deferrable),
4492 ("match", match),
4493 ]
4494 if v is not None and v != "NO ACTION"
4495 }
4496 fkey_d = {
4497 "name": conname,
4498 "constrained_columns": constrained_columns,
4499 "referred_schema": referred_schema,
4500 "referred_table": referred_table,
4501 "referred_columns": referred_columns,
4502 "options": options,
4503 "comment": comment,
4504 }
4505 table_fks.append(fkey_d)
4506 return fkeys.items()
4507
4508 @reflection.cache
4509 def get_indexes(self, connection, table_name, schema=None, **kw):
4510 data = self.get_multi_indexes(
4511 connection,
4512 schema=schema,
4513 filter_names=[table_name],
4514 scope=ObjectScope.ANY,
4515 kind=ObjectKind.ANY,
4516 **kw,
4517 )
4518 return self._value_or_raise(data, table_name, schema)
4519
4520 @util.memoized_property
4521 def _index_query(self):
4522 # NOTE: pg_index is used as from two times to improve performance,
4523 # since extraing all the index information from `idx_sq` to avoid
4524 # the second pg_index use leads to a worse performing query in
4525 # particular when querying for a single table (as of pg 17)
4526 # NOTE: repeating oids clause improve query performance
4527
4528 # subquery to get the columns
4529 idx_sq = (
4530 select(
4531 pg_catalog.pg_index.c.indexrelid,
4532 pg_catalog.pg_index.c.indrelid,
4533 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"),
4534 sql.func.unnest(pg_catalog.pg_index.c.indclass).label(
4535 "att_opclass"
4536 ),
4537 sql.func.generate_subscripts(
4538 pg_catalog.pg_index.c.indkey, 1
4539 ).label("ord"),
4540 )
4541 .where(
4542 ~pg_catalog.pg_index.c.indisprimary,
4543 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")),
4544 )
4545 .subquery("idx")
4546 )
4547
4548 attr_sq = (
4549 select(
4550 idx_sq.c.indexrelid,
4551 idx_sq.c.indrelid,
4552 idx_sq.c.ord,
4553 # NOTE: always using pg_get_indexdef is too slow so just
4554 # invoke when the element is an expression
4555 sql.case(
4556 (
4557 idx_sq.c.attnum == 0,
4558 pg_catalog.pg_get_indexdef(
4559 idx_sq.c.indexrelid, idx_sq.c.ord + 1, True
4560 ),
4561 ),
4562 # NOTE: need to cast this since attname is of type "name"
4563 # that's limited to 63 bytes, while pg_get_indexdef
4564 # returns "text" so its output may get cut
4565 else_=pg_catalog.pg_attribute.c.attname.cast(TEXT),
4566 ).label("element"),
4567 (idx_sq.c.attnum == 0).label("is_expr"),
4568 pg_catalog.pg_opclass.c.opcname,
4569 pg_catalog.pg_opclass.c.opcdefault,
4570 )
4571 .select_from(idx_sq)
4572 .outerjoin(
4573 # do not remove rows where idx_sq.c.attnum is 0
4574 pg_catalog.pg_attribute,
4575 sql.and_(
4576 pg_catalog.pg_attribute.c.attnum == idx_sq.c.attnum,
4577 pg_catalog.pg_attribute.c.attrelid == idx_sq.c.indrelid,
4578 ),
4579 )
4580 .outerjoin(
4581 pg_catalog.pg_opclass,
4582 pg_catalog.pg_opclass.c.oid == idx_sq.c.att_opclass,
4583 )
4584 .where(idx_sq.c.indrelid.in_(bindparam("oids")))
4585 .subquery("idx_attr")
4586 )
4587
4588 cols_sq = (
4589 select(
4590 attr_sq.c.indexrelid,
4591 sql.func.min(attr_sq.c.indrelid),
4592 sql.func.array_agg(
4593 aggregate_order_by(attr_sq.c.element, attr_sq.c.ord)
4594 ).label("elements"),
4595 sql.func.array_agg(
4596 aggregate_order_by(attr_sq.c.is_expr, attr_sq.c.ord)
4597 ).label("elements_is_expr"),
4598 sql.func.array_agg(
4599 aggregate_order_by(attr_sq.c.opcname, attr_sq.c.ord)
4600 ).label("elements_opclass"),
4601 sql.func.array_agg(
4602 aggregate_order_by(attr_sq.c.opcdefault, attr_sq.c.ord)
4603 ).label("elements_opdefault"),
4604 )
4605 .group_by(attr_sq.c.indexrelid)
4606 .subquery("idx_cols")
4607 )
4608
4609 if self.server_version_info >= (11, 0):
4610 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts
4611 else:
4612 indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts")
4613
4614 if self.server_version_info >= (15,):
4615 nulls_not_distinct = pg_catalog.pg_index.c.indnullsnotdistinct
4616 else:
4617 nulls_not_distinct = sql.false().label("indnullsnotdistinct")
4618
4619 return (
4620 select(
4621 pg_catalog.pg_index.c.indrelid,
4622 pg_catalog.pg_class.c.relname,
4623 pg_catalog.pg_index.c.indisunique,
4624 pg_catalog.pg_constraint.c.conrelid.is_not(None).label(
4625 "has_constraint"
4626 ),
4627 pg_catalog.pg_index.c.indoption,
4628 pg_catalog.pg_class.c.reloptions,
4629 pg_catalog.pg_am.c.amname,
4630 # NOTE: pg_get_expr is very fast so this case has almost no
4631 # performance impact
4632 sql.case(
4633 (
4634 pg_catalog.pg_index.c.indpred.is_not(None),
4635 pg_catalog.pg_get_expr(
4636 pg_catalog.pg_index.c.indpred,
4637 pg_catalog.pg_index.c.indrelid,
4638 ),
4639 ),
4640 else_=None,
4641 ).label("filter_definition"),
4642 indnkeyatts,
4643 nulls_not_distinct,
4644 cols_sq.c.elements,
4645 cols_sq.c.elements_is_expr,
4646 cols_sq.c.elements_opclass,
4647 cols_sq.c.elements_opdefault,
4648 )
4649 .select_from(pg_catalog.pg_index)
4650 .where(
4651 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")),
4652 ~pg_catalog.pg_index.c.indisprimary,
4653 )
4654 .join(
4655 pg_catalog.pg_class,
4656 pg_catalog.pg_index.c.indexrelid == pg_catalog.pg_class.c.oid,
4657 )
4658 .join(
4659 pg_catalog.pg_am,
4660 pg_catalog.pg_class.c.relam == pg_catalog.pg_am.c.oid,
4661 )
4662 .outerjoin(
4663 cols_sq,
4664 pg_catalog.pg_index.c.indexrelid == cols_sq.c.indexrelid,
4665 )
4666 .outerjoin(
4667 pg_catalog.pg_constraint,
4668 sql.and_(
4669 pg_catalog.pg_index.c.indrelid
4670 == pg_catalog.pg_constraint.c.conrelid,
4671 pg_catalog.pg_index.c.indexrelid
4672 == pg_catalog.pg_constraint.c.conindid,
4673 pg_catalog.pg_constraint.c.contype
4674 == sql.any_(_array.array(("p", "u", "x"))),
4675 ),
4676 )
4677 .order_by(
4678 pg_catalog.pg_index.c.indrelid, pg_catalog.pg_class.c.relname
4679 )
4680 )
4681
4682 def get_multi_indexes(
4683 self, connection, schema, filter_names, scope, kind, **kw
4684 ):
4685 table_oids = self._get_table_oids(
4686 connection, schema, filter_names, scope, kind, **kw
4687 )
4688
4689 indexes = defaultdict(list)
4690 default = ReflectionDefaults.indexes
4691
4692 batches = list(table_oids)
4693
4694 while batches:
4695 batch = batches[0:3000]
4696 batches[0:3000] = []
4697
4698 result = connection.execute(
4699 self._index_query, {"oids": [r[0] for r in batch]}
4700 ).mappings()
4701
4702 result_by_oid = defaultdict(list)
4703 for row_dict in result:
4704 result_by_oid[row_dict["indrelid"]].append(row_dict)
4705
4706 for oid, table_name in batch:
4707 if oid not in result_by_oid:
4708 # ensure that each table has an entry, even if reflection
4709 # is skipped because not supported
4710 indexes[(schema, table_name)] = default()
4711 continue
4712
4713 for row in result_by_oid[oid]:
4714 index_name = row["relname"]
4715
4716 table_indexes = indexes[(schema, table_name)]
4717
4718 all_elements = row["elements"]
4719 all_elements_is_expr = row["elements_is_expr"]
4720 all_elements_opclass = row["elements_opclass"]
4721 all_elements_opdefault = row["elements_opdefault"]
4722 indnkeyatts = row["indnkeyatts"]
4723 # "The number of key columns in the index, not counting any
4724 # included columns, which are merely stored and do not
4725 # participate in the index semantics"
4726 if len(all_elements) > indnkeyatts:
4727 # this is a "covering index" which has INCLUDE columns
4728 # as well as regular index columns
4729 inc_cols = all_elements[indnkeyatts:]
4730 idx_elements = all_elements[:indnkeyatts]
4731 idx_elements_is_expr = all_elements_is_expr[
4732 :indnkeyatts
4733 ]
4734 # postgresql does not support expression on included
4735 # columns as of v14: "ERROR: expressions are not
4736 # supported in included columns".
4737 assert all(
4738 not is_expr
4739 for is_expr in all_elements_is_expr[indnkeyatts:]
4740 )
4741 idx_elements_opclass = all_elements_opclass[
4742 :indnkeyatts
4743 ]
4744 idx_elements_opdefault = all_elements_opdefault[
4745 :indnkeyatts
4746 ]
4747 else:
4748 idx_elements = all_elements
4749 idx_elements_is_expr = all_elements_is_expr
4750 inc_cols = []
4751 idx_elements_opclass = all_elements_opclass
4752 idx_elements_opdefault = all_elements_opdefault
4753
4754 index = {"name": index_name, "unique": row["indisunique"]}
4755 if any(idx_elements_is_expr):
4756 index["column_names"] = [
4757 None if is_expr else expr
4758 for expr, is_expr in zip(
4759 idx_elements, idx_elements_is_expr
4760 )
4761 ]
4762 index["expressions"] = idx_elements
4763 else:
4764 index["column_names"] = idx_elements
4765
4766 dialect_options = {}
4767
4768 if not all(idx_elements_opdefault):
4769 dialect_options["postgresql_ops"] = {
4770 name: opclass
4771 for name, opclass, is_default in zip(
4772 idx_elements,
4773 idx_elements_opclass,
4774 idx_elements_opdefault,
4775 )
4776 if not is_default
4777 }
4778
4779 sorting = {}
4780 for col_index, col_flags in enumerate(row["indoption"]):
4781 col_sorting = ()
4782 # try to set flags only if they differ from PG
4783 # defaults...
4784 if col_flags & 0x01:
4785 col_sorting += ("desc",)
4786 if not (col_flags & 0x02):
4787 col_sorting += ("nulls_last",)
4788 else:
4789 if col_flags & 0x02:
4790 col_sorting += ("nulls_first",)
4791 if col_sorting:
4792 sorting[idx_elements[col_index]] = col_sorting
4793 if sorting:
4794 index["column_sorting"] = sorting
4795 if row["has_constraint"]:
4796 index["duplicates_constraint"] = index_name
4797
4798 if row["reloptions"]:
4799 dialect_options["postgresql_with"] = dict(
4800 [
4801 option.split("=", 1)
4802 for option in row["reloptions"]
4803 ]
4804 )
4805 # it *might* be nice to include that this is 'btree' in the
4806 # reflection info. But we don't want an Index object
4807 # to have a ``postgresql_using`` in it that is just the
4808 # default, so for the moment leaving this out.
4809 amname = row["amname"]
4810 if amname != "btree":
4811 dialect_options["postgresql_using"] = row["amname"]
4812 if row["filter_definition"]:
4813 dialect_options["postgresql_where"] = row[
4814 "filter_definition"
4815 ]
4816 if self.server_version_info >= (11,):
4817 # NOTE: this is legacy, this is part of
4818 # dialect_options now as of #7382
4819 index["include_columns"] = inc_cols
4820 dialect_options["postgresql_include"] = inc_cols
4821 if row["indnullsnotdistinct"]:
4822 # the default is False, so ignore it.
4823 dialect_options["postgresql_nulls_not_distinct"] = row[
4824 "indnullsnotdistinct"
4825 ]
4826
4827 if dialect_options:
4828 index["dialect_options"] = dialect_options
4829
4830 table_indexes.append(index)
4831 return indexes.items()
4832
4833 @reflection.cache
4834 def get_unique_constraints(
4835 self, connection, table_name, schema=None, **kw
4836 ):
4837 data = self.get_multi_unique_constraints(
4838 connection,
4839 schema=schema,
4840 filter_names=[table_name],
4841 scope=ObjectScope.ANY,
4842 kind=ObjectKind.ANY,
4843 **kw,
4844 )
4845 return self._value_or_raise(data, table_name, schema)
4846
4847 def get_multi_unique_constraints(
4848 self,
4849 connection,
4850 schema,
4851 filter_names,
4852 scope,
4853 kind,
4854 **kw,
4855 ):
4856 result = self._reflect_constraint(
4857 connection, "u", schema, filter_names, scope, kind, **kw
4858 )
4859
4860 # each table can have multiple unique constraints
4861 uniques = defaultdict(list)
4862 default = ReflectionDefaults.unique_constraints
4863 for table_name, cols, con_name, comment, options in result:
4864 # ensure a list is created for each table. leave it empty if
4865 # the table has no unique cosntraint
4866 if con_name is None:
4867 uniques[(schema, table_name)] = default()
4868 continue
4869
4870 uc_dict = {
4871 "column_names": cols,
4872 "name": con_name,
4873 "comment": comment,
4874 }
4875 if options:
4876 uc_dict["dialect_options"] = options
4877
4878 uniques[(schema, table_name)].append(uc_dict)
4879 return uniques.items()
4880
4881 @reflection.cache
4882 def get_table_comment(self, connection, table_name, schema=None, **kw):
4883 data = self.get_multi_table_comment(
4884 connection,
4885 schema,
4886 [table_name],
4887 scope=ObjectScope.ANY,
4888 kind=ObjectKind.ANY,
4889 **kw,
4890 )
4891 return self._value_or_raise(data, table_name, schema)
4892
4893 @lru_cache()
4894 def _comment_query(self, schema, has_filter_names, scope, kind):
4895 relkinds = self._kind_to_relkinds(kind)
4896 query = (
4897 select(
4898 pg_catalog.pg_class.c.relname,
4899 pg_catalog.pg_description.c.description,
4900 )
4901 .select_from(pg_catalog.pg_class)
4902 .outerjoin(
4903 pg_catalog.pg_description,
4904 sql.and_(
4905 pg_catalog.pg_class.c.oid
4906 == pg_catalog.pg_description.c.objoid,
4907 pg_catalog.pg_description.c.objsubid == 0,
4908 pg_catalog.pg_description.c.classoid
4909 == sql.func.cast("pg_catalog.pg_class", REGCLASS),
4910 ),
4911 )
4912 .where(self._pg_class_relkind_condition(relkinds))
4913 )
4914 query = self._pg_class_filter_scope_schema(query, schema, scope)
4915 if has_filter_names:
4916 query = query.where(
4917 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
4918 )
4919 return query
4920
4921 def get_multi_table_comment(
4922 self, connection, schema, filter_names, scope, kind, **kw
4923 ):
4924 has_filter_names, params = self._prepare_filter_names(filter_names)
4925 query = self._comment_query(schema, has_filter_names, scope, kind)
4926 result = connection.execute(query, params)
4927
4928 default = ReflectionDefaults.table_comment
4929 return (
4930 (
4931 (schema, table),
4932 {"text": comment} if comment is not None else default(),
4933 )
4934 for table, comment in result
4935 )
4936
4937 @reflection.cache
4938 def get_check_constraints(self, connection, table_name, schema=None, **kw):
4939 data = self.get_multi_check_constraints(
4940 connection,
4941 schema,
4942 [table_name],
4943 scope=ObjectScope.ANY,
4944 kind=ObjectKind.ANY,
4945 **kw,
4946 )
4947 return self._value_or_raise(data, table_name, schema)
4948
4949 @lru_cache()
4950 def _check_constraint_query(self, schema, has_filter_names, scope, kind):
4951 relkinds = self._kind_to_relkinds(kind)
4952 query = (
4953 select(
4954 pg_catalog.pg_class.c.relname,
4955 pg_catalog.pg_constraint.c.conname,
4956 # NOTE: avoid calling pg_get_constraintdef when not needed
4957 # to speed up the query
4958 sql.case(
4959 (
4960 pg_catalog.pg_constraint.c.oid.is_not(None),
4961 pg_catalog.pg_get_constraintdef(
4962 pg_catalog.pg_constraint.c.oid, True
4963 ),
4964 ),
4965 else_=None,
4966 ),
4967 pg_catalog.pg_description.c.description,
4968 )
4969 .select_from(pg_catalog.pg_class)
4970 .outerjoin(
4971 pg_catalog.pg_constraint,
4972 sql.and_(
4973 pg_catalog.pg_class.c.oid
4974 == pg_catalog.pg_constraint.c.conrelid,
4975 pg_catalog.pg_constraint.c.contype == "c",
4976 ),
4977 )
4978 .outerjoin(
4979 pg_catalog.pg_description,
4980 pg_catalog.pg_description.c.objoid
4981 == pg_catalog.pg_constraint.c.oid,
4982 )
4983 .order_by(
4984 pg_catalog.pg_class.c.relname,
4985 pg_catalog.pg_constraint.c.conname,
4986 )
4987 .where(self._pg_class_relkind_condition(relkinds))
4988 )
4989 query = self._pg_class_filter_scope_schema(query, schema, scope)
4990 if has_filter_names:
4991 query = query.where(
4992 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
4993 )
4994 return query
4995
4996 def get_multi_check_constraints(
4997 self, connection, schema, filter_names, scope, kind, **kw
4998 ):
4999 has_filter_names, params = self._prepare_filter_names(filter_names)
5000 query = self._check_constraint_query(
5001 schema, has_filter_names, scope, kind
5002 )
5003 result = connection.execute(query, params)
5004
5005 check_constraints = defaultdict(list)
5006 default = ReflectionDefaults.check_constraints
5007 for table_name, check_name, src, comment in result:
5008 # only two cases for check_name and src: both null or both defined
5009 if check_name is None and src is None:
5010 check_constraints[(schema, table_name)] = default()
5011 continue
5012 # samples:
5013 # "CHECK (((a > 1) AND (a < 5)))"
5014 # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))"
5015 # "CHECK (((a > 1) AND (a < 5))) NOT VALID"
5016 # "CHECK (some_boolean_function(a))"
5017 # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)"
5018 # "CHECK (a NOT NULL) NO INHERIT"
5019 # "CHECK (a NOT NULL) NO INHERIT NOT VALID"
5020
5021 m = re.match(
5022 r"^CHECK *\((.+)\)( NO INHERIT)?( NOT VALID)?$",
5023 src,
5024 flags=re.DOTALL,
5025 )
5026 if not m:
5027 util.warn("Could not parse CHECK constraint text: %r" % src)
5028 sqltext = ""
5029 else:
5030 sqltext = re.compile(
5031 r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL
5032 ).sub(r"\1", m.group(1))
5033 entry = {
5034 "name": check_name,
5035 "sqltext": sqltext,
5036 "comment": comment,
5037 }
5038 if m:
5039 do = {}
5040 if " NOT VALID" in m.groups():
5041 do["not_valid"] = True
5042 if " NO INHERIT" in m.groups():
5043 do["no_inherit"] = True
5044 if do:
5045 entry["dialect_options"] = do
5046
5047 check_constraints[(schema, table_name)].append(entry)
5048 return check_constraints.items()
5049
5050 def _pg_type_filter_schema(self, query, schema):
5051 if schema is None:
5052 query = query.where(
5053 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid),
5054 # ignore pg_catalog schema
5055 pg_catalog.pg_namespace.c.nspname != "pg_catalog",
5056 )
5057 elif schema != "*":
5058 query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
5059 return query
5060
5061 @lru_cache()
5062 def _enum_query(self, schema):
5063 lbl_agg_sq = (
5064 select(
5065 pg_catalog.pg_enum.c.enumtypid,
5066 sql.func.array_agg(
5067 aggregate_order_by(
5068 # NOTE: cast since some postgresql derivatives may
5069 # not support array_agg on the name type
5070 pg_catalog.pg_enum.c.enumlabel.cast(TEXT),
5071 pg_catalog.pg_enum.c.enumsortorder,
5072 )
5073 ).label("labels"),
5074 )
5075 .group_by(pg_catalog.pg_enum.c.enumtypid)
5076 .subquery("lbl_agg")
5077 )
5078
5079 query = (
5080 select(
5081 pg_catalog.pg_type.c.typname.label("name"),
5082 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label(
5083 "visible"
5084 ),
5085 pg_catalog.pg_namespace.c.nspname.label("schema"),
5086 lbl_agg_sq.c.labels.label("labels"),
5087 )
5088 .join(
5089 pg_catalog.pg_namespace,
5090 pg_catalog.pg_namespace.c.oid
5091 == pg_catalog.pg_type.c.typnamespace,
5092 )
5093 .outerjoin(
5094 lbl_agg_sq, pg_catalog.pg_type.c.oid == lbl_agg_sq.c.enumtypid
5095 )
5096 .where(pg_catalog.pg_type.c.typtype == "e")
5097 .order_by(
5098 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname
5099 )
5100 )
5101
5102 return self._pg_type_filter_schema(query, schema)
5103
5104 @reflection.cache
5105 def _load_enums(self, connection, schema=None, **kw):
5106 if not self.supports_native_enum:
5107 return []
5108
5109 result = connection.execute(self._enum_query(schema))
5110
5111 enums = []
5112 for name, visible, schema, labels in result:
5113 enums.append(
5114 {
5115 "name": name,
5116 "schema": schema,
5117 "visible": visible,
5118 "labels": [] if labels is None else labels,
5119 }
5120 )
5121 return enums
5122
5123 @lru_cache()
5124 def _domain_query(self, schema):
5125 con_sq = (
5126 select(
5127 pg_catalog.pg_constraint.c.contypid,
5128 sql.func.array_agg(
5129 pg_catalog.pg_get_constraintdef(
5130 pg_catalog.pg_constraint.c.oid, True
5131 )
5132 ).label("condefs"),
5133 sql.func.array_agg(
5134 # NOTE: cast since some postgresql derivatives may
5135 # not support array_agg on the name type
5136 pg_catalog.pg_constraint.c.conname.cast(TEXT)
5137 ).label("connames"),
5138 )
5139 # The domain this constraint is on; zero if not a domain constraint
5140 .where(pg_catalog.pg_constraint.c.contypid != 0)
5141 .group_by(pg_catalog.pg_constraint.c.contypid)
5142 .subquery("domain_constraints")
5143 )
5144
5145 query = (
5146 select(
5147 pg_catalog.pg_type.c.typname.label("name"),
5148 pg_catalog.format_type(
5149 pg_catalog.pg_type.c.typbasetype,
5150 pg_catalog.pg_type.c.typtypmod,
5151 ).label("attype"),
5152 (~pg_catalog.pg_type.c.typnotnull).label("nullable"),
5153 pg_catalog.pg_type.c.typdefault.label("default"),
5154 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label(
5155 "visible"
5156 ),
5157 pg_catalog.pg_namespace.c.nspname.label("schema"),
5158 con_sq.c.condefs,
5159 con_sq.c.connames,
5160 pg_catalog.pg_collation.c.collname,
5161 )
5162 .join(
5163 pg_catalog.pg_namespace,
5164 pg_catalog.pg_namespace.c.oid
5165 == pg_catalog.pg_type.c.typnamespace,
5166 )
5167 .outerjoin(
5168 pg_catalog.pg_collation,
5169 pg_catalog.pg_type.c.typcollation
5170 == pg_catalog.pg_collation.c.oid,
5171 )
5172 .outerjoin(
5173 con_sq,
5174 pg_catalog.pg_type.c.oid == con_sq.c.contypid,
5175 )
5176 .where(pg_catalog.pg_type.c.typtype == "d")
5177 .order_by(
5178 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname
5179 )
5180 )
5181 return self._pg_type_filter_schema(query, schema)
5182
5183 @reflection.cache
5184 def _load_domains(self, connection, schema=None, **kw):
5185 result = connection.execute(self._domain_query(schema))
5186
5187 domains: List[ReflectedDomain] = []
5188 for domain in result.mappings():
5189 # strip (30) from character varying(30)
5190 attype = re.search(r"([^\(]+)", domain["attype"]).group(1)
5191 constraints: List[ReflectedDomainConstraint] = []
5192 if domain["connames"]:
5193 # When a domain has multiple CHECK constraints, they will
5194 # be tested in alphabetical order by name.
5195 sorted_constraints = sorted(
5196 zip(domain["connames"], domain["condefs"]),
5197 key=lambda t: t[0],
5198 )
5199 for name, def_ in sorted_constraints:
5200 # constraint is in the form "CHECK (expression)"
5201 # or "NOT NULL". Ignore the "NOT NULL" and
5202 # remove "CHECK (" and the tailing ")".
5203 if def_.casefold().startswith("check"):
5204 check = def_[7:-1]
5205 constraints.append({"name": name, "check": check})
5206 domain_rec: ReflectedDomain = {
5207 "name": domain["name"],
5208 "schema": domain["schema"],
5209 "visible": domain["visible"],
5210 "type": attype,
5211 "nullable": domain["nullable"],
5212 "default": domain["default"],
5213 "constraints": constraints,
5214 "collation": domain["collname"],
5215 }
5216 domains.append(domain_rec)
5217
5218 return domains
5219
5220 def _set_backslash_escapes(self, connection):
5221 # this method is provided as an override hook for descendant
5222 # dialects (e.g. Redshift), so removing it may break them
5223 std_string = connection.exec_driver_sql(
5224 "show standard_conforming_strings"
5225 ).scalar()
5226 self._backslash_escapes = std_string == "off"