1# dialects/postgresql/base.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9r"""
10.. dialect:: postgresql
11 :name: PostgreSQL
12 :normal_support: 9.6+
13 :best_effort: 9+
14
15.. _postgresql_sequences:
16
17Sequences/SERIAL/IDENTITY
18-------------------------
19
20PostgreSQL supports sequences, and SQLAlchemy uses these as the default means
21of creating new primary key values for integer-based primary key columns. When
22creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for
23integer-based primary key columns, which generates a sequence and server side
24default corresponding to the column.
25
26To specify a specific named sequence to be used for primary key generation,
27use the :func:`~sqlalchemy.schema.Sequence` construct::
28
29 Table(
30 "sometable",
31 metadata,
32 Column(
33 "id", Integer, Sequence("some_id_seq", start=1), primary_key=True
34 ),
35 )
36
37When SQLAlchemy issues a single INSERT statement, to fulfill the contract of
38having the "last insert identifier" available, a RETURNING clause is added to
39the INSERT statement which specifies the primary key columns should be
40returned after the statement completes. The RETURNING functionality only takes
41place if PostgreSQL 8.2 or later is in use. As a fallback approach, the
42sequence, whether specified explicitly or implicitly via ``SERIAL``, is
43executed independently beforehand, the returned value to be used in the
44subsequent insert. Note that when an
45:func:`~sqlalchemy.sql.expression.insert()` construct is executed using
46"executemany" semantics, the "last inserted identifier" functionality does not
47apply; no RETURNING clause is emitted nor is the sequence pre-executed in this
48case.
49
50
51PostgreSQL 10 and above IDENTITY columns
52^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
53
54PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use
55of SERIAL. The :class:`_schema.Identity` construct in a
56:class:`_schema.Column` can be used to control its behavior::
57
58 from sqlalchemy import Table, Column, MetaData, Integer, Computed
59
60 metadata = MetaData()
61
62 data = Table(
63 "data",
64 metadata,
65 Column(
66 "id", Integer, Identity(start=42, cycle=True), primary_key=True
67 ),
68 Column("data", String),
69 )
70
71The CREATE TABLE for the above :class:`_schema.Table` object would be:
72
73.. sourcecode:: sql
74
75 CREATE TABLE data (
76 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE),
77 data VARCHAR,
78 PRIMARY KEY (id)
79 )
80
81.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
82 in a :class:`_schema.Column` to specify the option of an autoincrementing
83 column.
84
85.. note::
86
87 Previous versions of SQLAlchemy did not have built-in support for rendering
88 of IDENTITY, and could use the following compilation hook to replace
89 occurrences of SERIAL with IDENTITY::
90
91 from sqlalchemy.schema import CreateColumn
92 from sqlalchemy.ext.compiler import compiles
93
94
95 @compiles(CreateColumn, "postgresql")
96 def use_identity(element, compiler, **kw):
97 text = compiler.visit_create_column(element, **kw)
98 text = text.replace("SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY")
99 return text
100
101 Using the above, a table such as::
102
103 t = Table(
104 "t", m, Column("id", Integer, primary_key=True), Column("data", String)
105 )
106
107 Will generate on the backing database as:
108
109 .. sourcecode:: sql
110
111 CREATE TABLE t (
112 id INT GENERATED BY DEFAULT AS IDENTITY,
113 data VARCHAR,
114 PRIMARY KEY (id)
115 )
116
117.. _postgresql_ss_cursors:
118
119Server Side Cursors
120-------------------
121
122Server-side cursor support is available for the psycopg2, asyncpg
123dialects and may also be available in others.
124
125Server side cursors are enabled on a per-statement basis by using the
126:paramref:`.Connection.execution_options.stream_results` connection execution
127option::
128
129 with engine.connect() as conn:
130 result = conn.execution_options(stream_results=True).execute(
131 text("select * from table")
132 )
133
134Note that some kinds of SQL statements may not be supported with
135server side cursors; generally, only SQL statements that return rows should be
136used with this option.
137
138.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated
139 and will be removed in a future release. Please use the
140 :paramref:`_engine.Connection.stream_results` execution option for
141 unbuffered cursor support.
142
143.. seealso::
144
145 :ref:`engine_stream_results`
146
147.. _postgresql_isolation_level:
148
149Transaction Isolation Level
150---------------------------
151
152Most SQLAlchemy dialects support setting of transaction isolation level
153using the :paramref:`_sa.create_engine.isolation_level` parameter
154at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection`
155level via the :paramref:`.Connection.execution_options.isolation_level`
156parameter.
157
158For PostgreSQL dialects, this feature works either by making use of the
159DBAPI-specific features, such as psycopg2's isolation level flags which will
160embed the isolation level setting inline with the ``"BEGIN"`` statement, or for
161DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS
162TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement
163emitted by the DBAPI. For the special AUTOCOMMIT isolation level,
164DBAPI-specific techniques are used which is typically an ``.autocommit``
165flag on the DBAPI connection object.
166
167To set isolation level using :func:`_sa.create_engine`::
168
169 engine = create_engine(
170 "postgresql+pg8000://scott:tiger@localhost/test",
171 isolation_level="REPEATABLE READ",
172 )
173
174To set using per-connection execution options::
175
176 with engine.connect() as conn:
177 conn = conn.execution_options(isolation_level="REPEATABLE READ")
178 with conn.begin():
179 ... # work with transaction
180
181There are also more options for isolation level configurations, such as
182"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
183different isolation level settings. See the discussion at
184:ref:`dbapi_autocommit` for background.
185
186Valid values for ``isolation_level`` on most PostgreSQL dialects include:
187
188* ``READ COMMITTED``
189* ``READ UNCOMMITTED``
190* ``REPEATABLE READ``
191* ``SERIALIZABLE``
192* ``AUTOCOMMIT``
193
194.. seealso::
195
196 :ref:`dbapi_autocommit`
197
198 :ref:`postgresql_readonly_deferrable`
199
200 :ref:`psycopg2_isolation_level`
201
202 :ref:`pg8000_isolation_level`
203
204.. _postgresql_readonly_deferrable:
205
206Setting READ ONLY / DEFERRABLE
207------------------------------
208
209Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE"
210characteristics of the transaction, which is in addition to the isolation level
211setting. These two attributes can be established either in conjunction with or
212independently of the isolation level by passing the ``postgresql_readonly`` and
213``postgresql_deferrable`` flags with
214:meth:`_engine.Connection.execution_options`. The example below illustrates
215passing the ``"SERIALIZABLE"`` isolation level at the same time as setting
216"READ ONLY" and "DEFERRABLE"::
217
218 with engine.connect() as conn:
219 conn = conn.execution_options(
220 isolation_level="SERIALIZABLE",
221 postgresql_readonly=True,
222 postgresql_deferrable=True,
223 )
224 with conn.begin():
225 ... # work with transaction
226
227Note that some DBAPIs such as asyncpg only support "readonly" with
228SERIALIZABLE isolation.
229
230.. versionadded:: 1.4 added support for the ``postgresql_readonly``
231 and ``postgresql_deferrable`` execution options.
232
233.. _postgresql_reset_on_return:
234
235Temporary Table / Resource Reset for Connection Pooling
236-------------------------------------------------------
237
238The :class:`.QueuePool` connection pool implementation used
239by the SQLAlchemy :class:`.Engine` object includes
240:ref:`reset on return <pool_reset_on_return>` behavior that will invoke
241the DBAPI ``.rollback()`` method when connections are returned to the pool.
242While this rollback will clear out the immediate state used by the previous
243transaction, it does not cover a wider range of session-level state, including
244temporary tables as well as other server state such as prepared statement
245handles and statement caches. The PostgreSQL database includes a variety
246of commands which may be used to reset this state, including
247``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``.
248
249
250To install
251one or more of these commands as the means of performing reset-on-return,
252the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated
253in the example below. The implementation
254will end transactions in progress as well as discard temporary tables
255using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL
256documentation for background on what each of these statements do.
257
258The :paramref:`_sa.create_engine.pool_reset_on_return` parameter
259is set to ``None`` so that the custom scheme can replace the default behavior
260completely. The custom hook implementation calls ``.rollback()`` in any case,
261as it's usually important that the DBAPI's own tracking of commit/rollback
262will remain consistent with the state of the transaction::
263
264
265 from sqlalchemy import create_engine
266 from sqlalchemy import event
267
268 postgresql_engine = create_engine(
269 "postgresql+psycopg2://scott:tiger@hostname/dbname",
270 # disable default reset-on-return scheme
271 pool_reset_on_return=None,
272 )
273
274
275 @event.listens_for(postgresql_engine, "reset")
276 def _reset_postgresql(dbapi_connection, connection_record, reset_state):
277 if not reset_state.terminate_only:
278 dbapi_connection.execute("CLOSE ALL")
279 dbapi_connection.execute("RESET ALL")
280 dbapi_connection.execute("DISCARD TEMP")
281
282 # so that the DBAPI itself knows that the connection has been
283 # reset
284 dbapi_connection.rollback()
285
286.. versionchanged:: 2.0.0b3 Added additional state arguments to
287 the :meth:`.PoolEvents.reset` event and additionally ensured the event
288 is invoked for all "reset" occurrences, so that it's appropriate
289 as a place for custom "reset" handlers. Previous schemes which
290 use the :meth:`.PoolEvents.checkin` handler remain usable as well.
291
292.. seealso::
293
294 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation
295
296.. _postgresql_alternate_search_path:
297
298Setting Alternate Search Paths on Connect
299------------------------------------------
300
301The PostgreSQL ``search_path`` variable refers to the list of schema names
302that will be implicitly referenced when a particular table or other
303object is referenced in a SQL statement. As detailed in the next section
304:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around
305the concept of keeping this variable at its default value of ``public``,
306however, in order to have it set to any arbitrary name or names when connections
307are used automatically, the "SET SESSION search_path" command may be invoked
308for all connections in a pool using the following event handler, as discussed
309at :ref:`schema_set_default_connections`::
310
311 from sqlalchemy import event
312 from sqlalchemy import create_engine
313
314 engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname")
315
316
317 @event.listens_for(engine, "connect", insert=True)
318 def set_search_path(dbapi_connection, connection_record):
319 existing_autocommit = dbapi_connection.autocommit
320 dbapi_connection.autocommit = True
321 cursor = dbapi_connection.cursor()
322 cursor.execute("SET SESSION search_path='%s'" % schema_name)
323 cursor.close()
324 dbapi_connection.autocommit = existing_autocommit
325
326The reason the recipe is complicated by use of the ``.autocommit`` DBAPI
327attribute is so that when the ``SET SESSION search_path`` directive is invoked,
328it is invoked outside of the scope of any transaction and therefore will not
329be reverted when the DBAPI connection has a rollback.
330
331.. seealso::
332
333 :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation
334
335.. _postgresql_schema_reflection:
336
337Remote-Schema Table Introspection and PostgreSQL search_path
338------------------------------------------------------------
339
340.. admonition:: Section Best Practices Summarized
341
342 keep the ``search_path`` variable set to its default of ``public``, without
343 any other schema names. Ensure the username used to connect **does not**
344 match remote schemas, or ensure the ``"$user"`` token is **removed** from
345 ``search_path``. For other schema names, name these explicitly
346 within :class:`_schema.Table` definitions. Alternatively, the
347 ``postgresql_ignore_search_path`` option will cause all reflected
348 :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema`
349 attribute set up.
350
351The PostgreSQL dialect can reflect tables from any schema, as outlined in
352:ref:`metadata_reflection_schemas`.
353
354In all cases, the first thing SQLAlchemy does when reflecting tables is
355to **determine the default schema for the current database connection**.
356It does this using the PostgreSQL ``current_schema()``
357function, illustated below using a PostgreSQL client session (i.e. using
358the ``psql`` tool):
359
360.. sourcecode:: sql
361
362 test=> select current_schema();
363 current_schema
364 ----------------
365 public
366 (1 row)
367
368Above we see that on a plain install of PostgreSQL, the default schema name
369is the name ``public``.
370
371However, if your database username **matches the name of a schema**, PostgreSQL's
372default is to then **use that name as the default schema**. Below, we log in
373using the username ``scott``. When we create a schema named ``scott``, **it
374implicitly changes the default schema**:
375
376.. sourcecode:: sql
377
378 test=> select current_schema();
379 current_schema
380 ----------------
381 public
382 (1 row)
383
384 test=> create schema scott;
385 CREATE SCHEMA
386 test=> select current_schema();
387 current_schema
388 ----------------
389 scott
390 (1 row)
391
392The behavior of ``current_schema()`` is derived from the
393`PostgreSQL search path
394<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
395variable ``search_path``, which in modern PostgreSQL versions defaults to this:
396
397.. sourcecode:: sql
398
399 test=> show search_path;
400 search_path
401 -----------------
402 "$user", public
403 (1 row)
404
405Where above, the ``"$user"`` variable will inject the current username as the
406default schema, if one exists. Otherwise, ``public`` is used.
407
408When a :class:`_schema.Table` object is reflected, if it is present in the
409schema indicated by the ``current_schema()`` function, **the schema name assigned
410to the ".schema" attribute of the Table is the Python "None" value**. Otherwise, the
411".schema" attribute will be assigned the string name of that schema.
412
413With regards to tables which these :class:`_schema.Table`
414objects refer to via foreign key constraint, a decision must be made as to how
415the ``.schema`` is represented in those remote tables, in the case where that
416remote schema name is also a member of the current ``search_path``.
417
418By default, the PostgreSQL dialect mimics the behavior encouraged by
419PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function
420returns a sample definition for a particular foreign key constraint,
421omitting the referenced schema name from that definition when the name is
422also in the PostgreSQL schema search path. The interaction below
423illustrates this behavior:
424
425.. sourcecode:: sql
426
427 test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY);
428 CREATE TABLE
429 test=> CREATE TABLE referring(
430 test(> id INTEGER PRIMARY KEY,
431 test(> referred_id INTEGER REFERENCES test_schema.referred(id));
432 CREATE TABLE
433 test=> SET search_path TO public, test_schema;
434 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
435 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
436 test-> ON n.oid = c.relnamespace
437 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
438 test-> WHERE c.relname='referring' AND r.contype = 'f'
439 test-> ;
440 pg_get_constraintdef
441 ---------------------------------------------------
442 FOREIGN KEY (referred_id) REFERENCES referred(id)
443 (1 row)
444
445Above, we created a table ``referred`` as a member of the remote schema
446``test_schema``, however when we added ``test_schema`` to the
447PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the
448``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of
449the function.
450
451On the other hand, if we set the search path back to the typical default
452of ``public``:
453
454.. sourcecode:: sql
455
456 test=> SET search_path TO public;
457 SET
458
459The same query against ``pg_get_constraintdef()`` now returns the fully
460schema-qualified name for us:
461
462.. sourcecode:: sql
463
464 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
465 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
466 test-> ON n.oid = c.relnamespace
467 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
468 test-> WHERE c.relname='referring' AND r.contype = 'f';
469 pg_get_constraintdef
470 ---------------------------------------------------------------
471 FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id)
472 (1 row)
473
474SQLAlchemy will by default use the return value of ``pg_get_constraintdef()``
475in order to determine the remote schema name. That is, if our ``search_path``
476were set to include ``test_schema``, and we invoked a table
477reflection process as follows::
478
479 >>> from sqlalchemy import Table, MetaData, create_engine, text
480 >>> engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
481 >>> with engine.connect() as conn:
482 ... conn.execute(text("SET search_path TO test_schema, public"))
483 ... metadata_obj = MetaData()
484 ... referring = Table("referring", metadata_obj, autoload_with=conn)
485 <sqlalchemy.engine.result.CursorResult object at 0x101612ed0>
486
487The above process would deliver to the :attr:`_schema.MetaData.tables`
488collection
489``referred`` table named **without** the schema::
490
491 >>> metadata_obj.tables["referred"].schema is None
492 True
493
494To alter the behavior of reflection such that the referred schema is
495maintained regardless of the ``search_path`` setting, use the
496``postgresql_ignore_search_path`` option, which can be specified as a
497dialect-specific argument to both :class:`_schema.Table` as well as
498:meth:`_schema.MetaData.reflect`::
499
500 >>> with engine.connect() as conn:
501 ... conn.execute(text("SET search_path TO test_schema, public"))
502 ... metadata_obj = MetaData()
503 ... referring = Table(
504 ... "referring",
505 ... metadata_obj,
506 ... autoload_with=conn,
507 ... postgresql_ignore_search_path=True,
508 ... )
509 <sqlalchemy.engine.result.CursorResult object at 0x1016126d0>
510
511We will now have ``test_schema.referred`` stored as schema-qualified::
512
513 >>> metadata_obj.tables["test_schema.referred"].schema
514 'test_schema'
515
516.. sidebar:: Best Practices for PostgreSQL Schema reflection
517
518 The description of PostgreSQL schema reflection behavior is complex, and
519 is the product of many years of dealing with widely varied use cases and
520 user preferences. But in fact, there's no need to understand any of it if
521 you just stick to the simplest use pattern: leave the ``search_path`` set
522 to its default of ``public`` only, never refer to the name ``public`` as
523 an explicit schema name otherwise, and refer to all other schema names
524 explicitly when building up a :class:`_schema.Table` object. The options
525 described here are only for those users who can't, or prefer not to, stay
526 within these guidelines.
527
528.. seealso::
529
530 :ref:`reflection_schema_qualified_interaction` - discussion of the issue
531 from a backend-agnostic perspective
532
533 `The Schema Search Path
534 <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
535 - on the PostgreSQL website.
536
537INSERT/UPDATE...RETURNING
538-------------------------
539
540The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and
541``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default
542for single-row INSERT statements in order to fetch newly generated
543primary key identifiers. To specify an explicit ``RETURNING`` clause,
544use the :meth:`._UpdateBase.returning` method on a per-statement basis::
545
546 # INSERT..RETURNING
547 result = (
548 table.insert().returning(table.c.col1, table.c.col2).values(name="foo")
549 )
550 print(result.fetchall())
551
552 # UPDATE..RETURNING
553 result = (
554 table.update()
555 .returning(table.c.col1, table.c.col2)
556 .where(table.c.name == "foo")
557 .values(name="bar")
558 )
559 print(result.fetchall())
560
561 # DELETE..RETURNING
562 result = (
563 table.delete()
564 .returning(table.c.col1, table.c.col2)
565 .where(table.c.name == "foo")
566 )
567 print(result.fetchall())
568
569.. _postgresql_insert_on_conflict:
570
571INSERT...ON CONFLICT (Upsert)
572------------------------------
573
574Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of
575rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A
576candidate row will only be inserted if that row does not violate any unique
577constraints. In the case of a unique constraint violation, a secondary action
578can occur which can be either "DO UPDATE", indicating that the data in the
579target row should be updated, or "DO NOTHING", which indicates to silently skip
580this row.
581
582Conflicts are determined using existing unique constraints and indexes. These
583constraints may be identified either using their name as stated in DDL,
584or they may be inferred by stating the columns and conditions that comprise
585the indexes.
586
587SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific
588:func:`_postgresql.insert()` function, which provides
589the generative methods :meth:`_postgresql.Insert.on_conflict_do_update`
590and :meth:`~.postgresql.Insert.on_conflict_do_nothing`:
591
592.. sourcecode:: pycon+sql
593
594 >>> from sqlalchemy.dialects.postgresql import insert
595 >>> insert_stmt = insert(my_table).values(
596 ... id="some_existing_id", data="inserted value"
597 ... )
598 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"])
599 >>> print(do_nothing_stmt)
600 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
601 ON CONFLICT (id) DO NOTHING
602 {stop}
603
604 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
605 ... constraint="pk_my_table", set_=dict(data="updated value")
606 ... )
607 >>> print(do_update_stmt)
608 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
609 ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s
610
611.. seealso::
612
613 `INSERT .. ON CONFLICT
614 <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_
615 - in the PostgreSQL documentation.
616
617Specifying the Target
618^^^^^^^^^^^^^^^^^^^^^
619
620Both methods supply the "target" of the conflict using either the
621named constraint or by column inference:
622
623* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument
624 specifies a sequence containing string column names, :class:`_schema.Column`
625 objects, and/or SQL expression elements, which would identify a unique
626 index:
627
628 .. sourcecode:: pycon+sql
629
630 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
631 ... index_elements=["id"], set_=dict(data="updated value")
632 ... )
633 >>> print(do_update_stmt)
634 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
635 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
636 {stop}
637
638 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
639 ... index_elements=[my_table.c.id], set_=dict(data="updated value")
640 ... )
641 >>> print(do_update_stmt)
642 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
643 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
644
645* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to
646 infer an index, a partial index can be inferred by also specifying the
647 use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter:
648
649 .. sourcecode:: pycon+sql
650
651 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data")
652 >>> stmt = stmt.on_conflict_do_update(
653 ... index_elements=[my_table.c.user_email],
654 ... index_where=my_table.c.user_email.like("%@gmail.com"),
655 ... set_=dict(data=stmt.excluded.data),
656 ... )
657 >>> print(stmt)
658 {printsql}INSERT INTO my_table (data, user_email)
659 VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email)
660 WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data
661
662* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is
663 used to specify an index directly rather than inferring it. This can be
664 the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX:
665
666 .. sourcecode:: pycon+sql
667
668 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
669 ... constraint="my_table_idx_1", set_=dict(data="updated value")
670 ... )
671 >>> print(do_update_stmt)
672 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
673 ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s
674 {stop}
675
676 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
677 ... constraint="my_table_pk", set_=dict(data="updated value")
678 ... )
679 >>> print(do_update_stmt)
680 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
681 ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s
682 {stop}
683
684* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may
685 also refer to a SQLAlchemy construct representing a constraint,
686 e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`,
687 :class:`.Index`, or :class:`.ExcludeConstraint`. In this use,
688 if the constraint has a name, it is used directly. Otherwise, if the
689 constraint is unnamed, then inference will be used, where the expressions
690 and optional WHERE clause of the constraint will be spelled out in the
691 construct. This use is especially convenient
692 to refer to the named or unnamed primary key of a :class:`_schema.Table`
693 using the
694 :attr:`_schema.Table.primary_key` attribute:
695
696 .. sourcecode:: pycon+sql
697
698 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
699 ... constraint=my_table.primary_key, set_=dict(data="updated value")
700 ... )
701 >>> print(do_update_stmt)
702 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
703 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
704
705The SET Clause
706^^^^^^^^^^^^^^^
707
708``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
709existing row, using any combination of new values as well as values
710from the proposed insertion. These values are specified using the
711:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This
712parameter accepts a dictionary which consists of direct values
713for UPDATE:
714
715.. sourcecode:: pycon+sql
716
717 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
718 >>> do_update_stmt = stmt.on_conflict_do_update(
719 ... index_elements=["id"], set_=dict(data="updated value")
720 ... )
721 >>> print(do_update_stmt)
722 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
723 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
724
725.. warning::
726
727 The :meth:`_expression.Insert.on_conflict_do_update`
728 method does **not** take into
729 account Python-side default UPDATE values or generation functions, e.g.
730 those specified using :paramref:`_schema.Column.onupdate`.
731 These values will not be exercised for an ON CONFLICT style of UPDATE,
732 unless they are manually specified in the
733 :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary.
734
735Updating using the Excluded INSERT Values
736^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
737
738In order to refer to the proposed insertion row, the special alias
739:attr:`~.postgresql.Insert.excluded` is available as an attribute on
740the :class:`_postgresql.Insert` object; this object is a
741:class:`_expression.ColumnCollection`
742which alias contains all columns of the target
743table:
744
745.. sourcecode:: pycon+sql
746
747 >>> stmt = insert(my_table).values(
748 ... id="some_id", data="inserted value", author="jlh"
749 ... )
750 >>> do_update_stmt = stmt.on_conflict_do_update(
751 ... index_elements=["id"],
752 ... set_=dict(data="updated value", author=stmt.excluded.author),
753 ... )
754 >>> print(do_update_stmt)
755 {printsql}INSERT INTO my_table (id, data, author)
756 VALUES (%(id)s, %(data)s, %(author)s)
757 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
758
759Additional WHERE Criteria
760^^^^^^^^^^^^^^^^^^^^^^^^^
761
762The :meth:`_expression.Insert.on_conflict_do_update` method also accepts
763a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where`
764parameter, which will limit those rows which receive an UPDATE:
765
766.. sourcecode:: pycon+sql
767
768 >>> stmt = insert(my_table).values(
769 ... id="some_id", data="inserted value", author="jlh"
770 ... )
771 >>> on_update_stmt = stmt.on_conflict_do_update(
772 ... index_elements=["id"],
773 ... set_=dict(data="updated value", author=stmt.excluded.author),
774 ... where=(my_table.c.status == 2),
775 ... )
776 >>> print(on_update_stmt)
777 {printsql}INSERT INTO my_table (id, data, author)
778 VALUES (%(id)s, %(data)s, %(author)s)
779 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
780 WHERE my_table.status = %(status_1)s
781
782Skipping Rows with DO NOTHING
783^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
784
785``ON CONFLICT`` may be used to skip inserting a row entirely
786if any conflict with a unique or exclusion constraint occurs; below
787this is illustrated using the
788:meth:`~.postgresql.Insert.on_conflict_do_nothing` method:
789
790.. sourcecode:: pycon+sql
791
792 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
793 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
794 >>> print(stmt)
795 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
796 ON CONFLICT (id) DO NOTHING
797
798If ``DO NOTHING`` is used without specifying any columns or constraint,
799it has the effect of skipping the INSERT for any unique or exclusion
800constraint violation which occurs:
801
802.. sourcecode:: pycon+sql
803
804 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
805 >>> stmt = stmt.on_conflict_do_nothing()
806 >>> print(stmt)
807 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
808 ON CONFLICT DO NOTHING
809
810.. _postgresql_match:
811
812Full Text Search
813----------------
814
815PostgreSQL's full text search system is available through the use of the
816:data:`.func` namespace, combined with the use of custom operators
817via the :meth:`.Operators.bool_op` method. For simple cases with some
818degree of cross-backend compatibility, the :meth:`.Operators.match` operator
819may also be used.
820
821.. _postgresql_simple_match:
822
823Simple plain text matching with ``match()``
824^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
825
826The :meth:`.Operators.match` operator provides for cross-compatible simple
827text matching. For the PostgreSQL backend, it's hardcoded to generate
828an expression using the ``@@`` operator in conjunction with the
829``plainto_tsquery()`` PostgreSQL function.
830
831On the PostgreSQL dialect, an expression like the following::
832
833 select(sometable.c.text.match("search string"))
834
835would emit to the database:
836
837.. sourcecode:: sql
838
839 SELECT text @@ plainto_tsquery('search string') FROM table
840
841Above, passing a plain string to :meth:`.Operators.match` will automatically
842make use of ``plainto_tsquery()`` to specify the type of tsquery. This
843establishes basic database cross-compatibility for :meth:`.Operators.match`
844with other backends.
845
846.. versionchanged:: 2.0 The default tsquery generation function used by the
847 PostgreSQL dialect with :meth:`.Operators.match` is ``plainto_tsquery()``.
848
849 To render exactly what was rendered in 1.4, use the following form::
850
851 from sqlalchemy import func
852
853 select(sometable.c.text.bool_op("@@")(func.to_tsquery("search string")))
854
855 Which would emit:
856
857 .. sourcecode:: sql
858
859 SELECT text @@ to_tsquery('search string') FROM table
860
861Using PostgreSQL full text functions and operators directly
862^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
863
864Text search operations beyond the simple use of :meth:`.Operators.match`
865may make use of the :data:`.func` namespace to generate PostgreSQL full-text
866functions, in combination with :meth:`.Operators.bool_op` to generate
867any boolean operator.
868
869For example, the query::
870
871 select(func.to_tsquery("cat").bool_op("@>")(func.to_tsquery("cat & rat")))
872
873would generate:
874
875.. sourcecode:: sql
876
877 SELECT to_tsquery('cat') @> to_tsquery('cat & rat')
878
879
880The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST::
881
882 from sqlalchemy.dialects.postgresql import TSVECTOR
883 from sqlalchemy import select, cast
884
885 select(cast("some text", TSVECTOR))
886
887produces a statement equivalent to:
888
889.. sourcecode:: sql
890
891 SELECT CAST('some text' AS TSVECTOR) AS anon_1
892
893The ``func`` namespace is augmented by the PostgreSQL dialect to set up
894correct argument and return types for most full text search functions.
895These functions are used automatically by the :attr:`_sql.func` namespace
896assuming the ``sqlalchemy.dialects.postgresql`` package has been imported,
897or :func:`_sa.create_engine` has been invoked using a ``postgresql``
898dialect. These functions are documented at:
899
900* :class:`_postgresql.to_tsvector`
901* :class:`_postgresql.to_tsquery`
902* :class:`_postgresql.plainto_tsquery`
903* :class:`_postgresql.phraseto_tsquery`
904* :class:`_postgresql.websearch_to_tsquery`
905* :class:`_postgresql.ts_headline`
906
907Specifying the "regconfig" with ``match()`` or custom operators
908^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
909
910PostgreSQL's ``plainto_tsquery()`` function accepts an optional
911"regconfig" argument that is used to instruct PostgreSQL to use a
912particular pre-computed GIN or GiST index in order to perform the search.
913When using :meth:`.Operators.match`, this additional parameter may be
914specified using the ``postgresql_regconfig`` parameter, such as::
915
916 select(mytable.c.id).where(
917 mytable.c.title.match("somestring", postgresql_regconfig="english")
918 )
919
920Which would emit:
921
922.. sourcecode:: sql
923
924 SELECT mytable.id FROM mytable
925 WHERE mytable.title @@ plainto_tsquery('english', 'somestring')
926
927When using other PostgreSQL search functions with :data:`.func`, the
928"regconfig" parameter may be passed directly as the initial argument::
929
930 select(mytable.c.id).where(
931 func.to_tsvector("english", mytable.c.title).bool_op("@@")(
932 func.to_tsquery("english", "somestring")
933 )
934 )
935
936produces a statement equivalent to:
937
938.. sourcecode:: sql
939
940 SELECT mytable.id FROM mytable
941 WHERE to_tsvector('english', mytable.title) @@
942 to_tsquery('english', 'somestring')
943
944It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from
945PostgreSQL to ensure that you are generating queries with SQLAlchemy that
946take full advantage of any indexes you may have created for full text search.
947
948.. seealso::
949
950 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation
951
952
953FROM ONLY ...
954-------------
955
956The dialect supports PostgreSQL's ONLY keyword for targeting only a particular
957table in an inheritance hierarchy. This can be used to produce the
958``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...``
959syntaxes. It uses SQLAlchemy's hints mechanism::
960
961 # SELECT ... FROM ONLY ...
962 result = table.select().with_hint(table, "ONLY", "postgresql")
963 print(result.fetchall())
964
965 # UPDATE ONLY ...
966 table.update(values=dict(foo="bar")).with_hint(
967 "ONLY", dialect_name="postgresql"
968 )
969
970 # DELETE FROM ONLY ...
971 table.delete().with_hint("ONLY", dialect_name="postgresql")
972
973.. _postgresql_indexes:
974
975PostgreSQL-Specific Index Options
976---------------------------------
977
978Several extensions to the :class:`.Index` construct are available, specific
979to the PostgreSQL dialect.
980
981.. _postgresql_covering_indexes:
982
983Covering Indexes
984^^^^^^^^^^^^^^^^
985
986The ``postgresql_include`` option renders INCLUDE(colname) for the given
987string names::
988
989 Index("my_index", table.c.x, postgresql_include=["y"])
990
991would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
992
993Note that this feature requires PostgreSQL 11 or later.
994
995.. seealso::
996
997 :ref:`postgresql_constraint_options`
998
999.. versionadded:: 1.4
1000
1001.. _postgresql_partial_indexes:
1002
1003Partial Indexes
1004^^^^^^^^^^^^^^^
1005
1006Partial indexes add criterion to the index definition so that the index is
1007applied to a subset of rows. These can be specified on :class:`.Index`
1008using the ``postgresql_where`` keyword argument::
1009
1010 Index("my_index", my_table.c.id, postgresql_where=my_table.c.value > 10)
1011
1012.. _postgresql_operator_classes:
1013
1014Operator Classes
1015^^^^^^^^^^^^^^^^
1016
1017PostgreSQL allows the specification of an *operator class* for each column of
1018an index (see
1019https://www.postgresql.org/docs/current/interactive/indexes-opclass.html).
1020The :class:`.Index` construct allows these to be specified via the
1021``postgresql_ops`` keyword argument::
1022
1023 Index(
1024 "my_index",
1025 my_table.c.id,
1026 my_table.c.data,
1027 postgresql_ops={"data": "text_pattern_ops", "id": "int4_ops"},
1028 )
1029
1030Note that the keys in the ``postgresql_ops`` dictionaries are the
1031"key" name of the :class:`_schema.Column`, i.e. the name used to access it from
1032the ``.c`` collection of :class:`_schema.Table`, which can be configured to be
1033different than the actual name of the column as expressed in the database.
1034
1035If ``postgresql_ops`` is to be used against a complex SQL expression such
1036as a function call, then to apply to the column it must be given a label
1037that is identified in the dictionary by name, e.g.::
1038
1039 Index(
1040 "my_index",
1041 my_table.c.id,
1042 func.lower(my_table.c.data).label("data_lower"),
1043 postgresql_ops={"data_lower": "text_pattern_ops", "id": "int4_ops"},
1044 )
1045
1046Operator classes are also supported by the
1047:class:`_postgresql.ExcludeConstraint` construct using the
1048:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for
1049details.
1050
1051.. versionadded:: 1.3.21 added support for operator classes with
1052 :class:`_postgresql.ExcludeConstraint`.
1053
1054
1055Index Types
1056^^^^^^^^^^^
1057
1058PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well
1059as the ability for users to create their own (see
1060https://www.postgresql.org/docs/current/static/indexes-types.html). These can be
1061specified on :class:`.Index` using the ``postgresql_using`` keyword argument::
1062
1063 Index("my_index", my_table.c.data, postgresql_using="gin")
1064
1065The value passed to the keyword argument will be simply passed through to the
1066underlying CREATE INDEX command, so it *must* be a valid index type for your
1067version of PostgreSQL.
1068
1069.. _postgresql_index_storage:
1070
1071Index Storage Parameters
1072^^^^^^^^^^^^^^^^^^^^^^^^
1073
1074PostgreSQL allows storage parameters to be set on indexes. The storage
1075parameters available depend on the index method used by the index. Storage
1076parameters can be specified on :class:`.Index` using the ``postgresql_with``
1077keyword argument::
1078
1079 Index("my_index", my_table.c.data, postgresql_with={"fillfactor": 50})
1080
1081PostgreSQL allows to define the tablespace in which to create the index.
1082The tablespace can be specified on :class:`.Index` using the
1083``postgresql_tablespace`` keyword argument::
1084
1085 Index("my_index", my_table.c.data, postgresql_tablespace="my_tablespace")
1086
1087Note that the same option is available on :class:`_schema.Table` as well.
1088
1089.. _postgresql_index_concurrently:
1090
1091Indexes with CONCURRENTLY
1092^^^^^^^^^^^^^^^^^^^^^^^^^
1093
1094The PostgreSQL index option CONCURRENTLY is supported by passing the
1095flag ``postgresql_concurrently`` to the :class:`.Index` construct::
1096
1097 tbl = Table("testtbl", m, Column("data", Integer))
1098
1099 idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True)
1100
1101The above index construct will render DDL for CREATE INDEX, assuming
1102PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as:
1103
1104.. sourcecode:: sql
1105
1106 CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)
1107
1108For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for
1109a connection-less dialect, it will emit:
1110
1111.. sourcecode:: sql
1112
1113 DROP INDEX CONCURRENTLY test_idx1
1114
1115When using CONCURRENTLY, the PostgreSQL database requires that the statement
1116be invoked outside of a transaction block. The Python DBAPI enforces that
1117even for a single statement, a transaction is present, so to use this
1118construct, the DBAPI's "autocommit" mode must be used::
1119
1120 metadata = MetaData()
1121 table = Table("foo", metadata, Column("id", String))
1122 index = Index("foo_idx", table.c.id, postgresql_concurrently=True)
1123
1124 with engine.connect() as conn:
1125 with conn.execution_options(isolation_level="AUTOCOMMIT"):
1126 table.create(conn)
1127
1128.. seealso::
1129
1130 :ref:`postgresql_isolation_level`
1131
1132.. _postgresql_index_reflection:
1133
1134PostgreSQL Index Reflection
1135---------------------------
1136
1137The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the
1138UNIQUE CONSTRAINT construct is used. When inspecting a table using
1139:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes`
1140and the :meth:`_reflection.Inspector.get_unique_constraints`
1141will report on these
1142two constructs distinctly; in the case of the index, the key
1143``duplicates_constraint`` will be present in the index entry if it is
1144detected as mirroring a constraint. When performing reflection using
1145``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned
1146in :attr:`_schema.Table.indexes` when it is detected as mirroring a
1147:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection
1148.
1149
1150Special Reflection Options
1151--------------------------
1152
1153The :class:`_reflection.Inspector`
1154used for the PostgreSQL backend is an instance
1155of :class:`.PGInspector`, which offers additional methods::
1156
1157 from sqlalchemy import create_engine, inspect
1158
1159 engine = create_engine("postgresql+psycopg2://localhost/test")
1160 insp = inspect(engine) # will be a PGInspector
1161
1162 print(insp.get_enums())
1163
1164.. autoclass:: PGInspector
1165 :members:
1166
1167.. _postgresql_table_options:
1168
1169PostgreSQL Table Options
1170------------------------
1171
1172Several options for CREATE TABLE are supported directly by the PostgreSQL
1173dialect in conjunction with the :class:`_schema.Table` construct:
1174
1175* ``INHERITS``::
1176
1177 Table("some_table", metadata, ..., postgresql_inherits="some_supertable")
1178
1179 Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...))
1180
1181* ``ON COMMIT``::
1182
1183 Table("some_table", metadata, ..., postgresql_on_commit="PRESERVE ROWS")
1184
1185*
1186 ``PARTITION BY``::
1187
1188 Table(
1189 "some_table",
1190 metadata,
1191 ...,
1192 postgresql_partition_by="LIST (part_column)",
1193 )
1194
1195 .. versionadded:: 1.2.6
1196
1197*
1198 ``TABLESPACE``::
1199
1200 Table("some_table", metadata, ..., postgresql_tablespace="some_tablespace")
1201
1202 The above option is also available on the :class:`.Index` construct.
1203
1204*
1205 ``USING``::
1206
1207 Table("some_table", metadata, ..., postgresql_using="heap")
1208
1209 .. versionadded:: 2.0.26
1210
1211* ``WITH OIDS``::
1212
1213 Table("some_table", metadata, ..., postgresql_with_oids=True)
1214
1215* ``WITHOUT OIDS``::
1216
1217 Table("some_table", metadata, ..., postgresql_with_oids=False)
1218
1219.. seealso::
1220
1221 `PostgreSQL CREATE TABLE options
1222 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ -
1223 in the PostgreSQL documentation.
1224
1225.. _postgresql_constraint_options:
1226
1227PostgreSQL Constraint Options
1228-----------------------------
1229
1230The following option(s) are supported by the PostgreSQL dialect in conjunction
1231with selected constraint constructs:
1232
1233* ``NOT VALID``: This option applies towards CHECK and FOREIGN KEY constraints
1234 when the constraint is being added to an existing table via ALTER TABLE,
1235 and has the effect that existing rows are not scanned during the ALTER
1236 operation against the constraint being added.
1237
1238 When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_
1239 that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument
1240 may be specified as an additional keyword argument within the operation
1241 that creates the constraint, as in the following Alembic example::
1242
1243 def update():
1244 op.create_foreign_key(
1245 "fk_user_address",
1246 "address",
1247 "user",
1248 ["user_id"],
1249 ["id"],
1250 postgresql_not_valid=True,
1251 )
1252
1253 The keyword is ultimately accepted directly by the
1254 :class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint`
1255 and :class:`_schema.ForeignKey` constructs; when using a tool like
1256 Alembic, dialect-specific keyword arguments are passed through to
1257 these constructs from the migration operation directives::
1258
1259 CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True)
1260
1261 ForeignKeyConstraint(
1262 ["some_id"], ["some_table.some_id"], postgresql_not_valid=True
1263 )
1264
1265 .. versionadded:: 1.4.32
1266
1267 .. seealso::
1268
1269 `PostgreSQL ALTER TABLE options
1270 <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ -
1271 in the PostgreSQL documentation.
1272
1273* ``INCLUDE``: This option adds one or more columns as a "payload" to the
1274 unique index created automatically by PostgreSQL for the constraint.
1275 For example, the following table definition::
1276
1277 Table(
1278 "mytable",
1279 metadata,
1280 Column("id", Integer, nullable=False),
1281 Column("value", Integer, nullable=False),
1282 UniqueConstraint("id", postgresql_include=["value"]),
1283 )
1284
1285 would produce the DDL statement
1286
1287 .. sourcecode:: sql
1288
1289 CREATE TABLE mytable (
1290 id INTEGER NOT NULL,
1291 value INTEGER NOT NULL,
1292 UNIQUE (id) INCLUDE (value)
1293 )
1294
1295 Note that this feature requires PostgreSQL 11 or later.
1296
1297 .. versionadded:: 2.0.41
1298
1299 .. seealso::
1300
1301 :ref:`postgresql_covering_indexes`
1302
1303 .. seealso::
1304
1305 `PostgreSQL CREATE TABLE options
1306 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ -
1307 in the PostgreSQL documentation.
1308
1309* Column list with foreign key ``ON DELETE SET`` actions: This applies to
1310 :class:`.ForeignKey` and :class:`.ForeignKeyConstraint`, the :paramref:`.ForeignKey.ondelete`
1311 parameter will accept on the PostgreSQL backend only a string list of column
1312 names inside parenthesis, following the ``SET NULL`` or ``SET DEFAULT``
1313 phrases, which will limit the set of columns that are subject to the
1314 action::
1315
1316 fktable = Table(
1317 "fktable",
1318 metadata,
1319 Column("tid", Integer),
1320 Column("id", Integer),
1321 Column("fk_id_del_set_null", Integer),
1322 ForeignKeyConstraint(
1323 columns=["tid", "fk_id_del_set_null"],
1324 refcolumns=[pktable.c.tid, pktable.c.id],
1325 ondelete="SET NULL (fk_id_del_set_null)",
1326 ),
1327 )
1328
1329 .. versionadded:: 2.0.40
1330
1331
1332.. _postgresql_table_valued_overview:
1333
1334Table values, Table and Column valued functions, Row and Tuple objects
1335-----------------------------------------------------------------------
1336
1337PostgreSQL makes great use of modern SQL forms such as table-valued functions,
1338tables and rows as values. These constructs are commonly used as part
1339of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other
1340datatypes. SQLAlchemy's SQL expression language has native support for
1341most table-valued and row-valued forms.
1342
1343.. _postgresql_table_valued:
1344
1345Table-Valued Functions
1346^^^^^^^^^^^^^^^^^^^^^^^
1347
1348Many PostgreSQL built-in functions are intended to be used in the FROM clause
1349of a SELECT statement, and are capable of returning table rows or sets of table
1350rows. A large portion of PostgreSQL's JSON functions for example such as
1351``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``,
1352``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such
1353forms. These classes of SQL function calling forms in SQLAlchemy are available
1354using the :meth:`_functions.FunctionElement.table_valued` method in conjunction
1355with :class:`_functions.Function` objects generated from the :data:`_sql.func`
1356namespace.
1357
1358Examples from PostgreSQL's reference documentation follow below:
1359
1360* ``json_each()``:
1361
1362 .. sourcecode:: pycon+sql
1363
1364 >>> from sqlalchemy import select, func
1365 >>> stmt = select(
1366 ... func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value")
1367 ... )
1368 >>> print(stmt)
1369 {printsql}SELECT anon_1.key, anon_1.value
1370 FROM json_each(:json_each_1) AS anon_1
1371
1372* ``json_populate_record()``:
1373
1374 .. sourcecode:: pycon+sql
1375
1376 >>> from sqlalchemy import select, func, literal_column
1377 >>> stmt = select(
1378 ... func.json_populate_record(
1379 ... literal_column("null::myrowtype"), '{"a":1,"b":2}'
1380 ... ).table_valued("a", "b", name="x")
1381 ... )
1382 >>> print(stmt)
1383 {printsql}SELECT x.a, x.b
1384 FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x
1385
1386* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived
1387 columns in the alias, where we may make use of :func:`_sql.column` elements with
1388 types to produce them. The :meth:`_functions.FunctionElement.table_valued`
1389 method produces a :class:`_sql.TableValuedAlias` construct, and the method
1390 :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived
1391 columns specification:
1392
1393 .. sourcecode:: pycon+sql
1394
1395 >>> from sqlalchemy import select, func, column, Integer, Text
1396 >>> stmt = select(
1397 ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}')
1398 ... .table_valued(
1399 ... column("a", Integer),
1400 ... column("b", Text),
1401 ... column("d", Text),
1402 ... )
1403 ... .render_derived(name="x", with_types=True)
1404 ... )
1405 >>> print(stmt)
1406 {printsql}SELECT x.a, x.b, x.d
1407 FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT)
1408
1409* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an
1410 ordinal counter to the output of a function and is accepted by a limited set
1411 of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The
1412 :meth:`_functions.FunctionElement.table_valued` method accepts a keyword
1413 parameter ``with_ordinality`` for this purpose, which accepts the string name
1414 that will be applied to the "ordinality" column:
1415
1416 .. sourcecode:: pycon+sql
1417
1418 >>> from sqlalchemy import select, func
1419 >>> stmt = select(
1420 ... func.generate_series(4, 1, -1)
1421 ... .table_valued("value", with_ordinality="ordinality")
1422 ... .render_derived()
1423 ... )
1424 >>> print(stmt)
1425 {printsql}SELECT anon_1.value, anon_1.ordinality
1426 FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3)
1427 WITH ORDINALITY AS anon_1(value, ordinality)
1428
1429.. versionadded:: 1.4.0b2
1430
1431.. seealso::
1432
1433 :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial`
1434
1435.. _postgresql_column_valued:
1436
1437Column Valued Functions
1438^^^^^^^^^^^^^^^^^^^^^^^
1439
1440Similar to the table valued function, a column valued function is present
1441in the FROM clause, but delivers itself to the columns clause as a single
1442scalar value. PostgreSQL functions such as ``json_array_elements()``,
1443``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the
1444:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`:
1445
1446* ``json_array_elements()``:
1447
1448 .. sourcecode:: pycon+sql
1449
1450 >>> from sqlalchemy import select, func
1451 >>> stmt = select(
1452 ... func.json_array_elements('["one", "two"]').column_valued("x")
1453 ... )
1454 >>> print(stmt)
1455 {printsql}SELECT x
1456 FROM json_array_elements(:json_array_elements_1) AS x
1457
1458* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the
1459 :func:`_postgresql.array` construct may be used:
1460
1461 .. sourcecode:: pycon+sql
1462
1463 >>> from sqlalchemy.dialects.postgresql import array
1464 >>> from sqlalchemy import select, func
1465 >>> stmt = select(func.unnest(array([1, 2])).column_valued())
1466 >>> print(stmt)
1467 {printsql}SELECT anon_1
1468 FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1
1469
1470 The function can of course be used against an existing table-bound column
1471 that's of type :class:`_types.ARRAY`:
1472
1473 .. sourcecode:: pycon+sql
1474
1475 >>> from sqlalchemy import table, column, ARRAY, Integer
1476 >>> from sqlalchemy import select, func
1477 >>> t = table("t", column("value", ARRAY(Integer)))
1478 >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value"))
1479 >>> print(stmt)
1480 {printsql}SELECT unnested_value
1481 FROM unnest(t.value) AS unnested_value
1482
1483.. seealso::
1484
1485 :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial`
1486
1487
1488Row Types
1489^^^^^^^^^
1490
1491Built-in support for rendering a ``ROW`` may be approximated using
1492``func.ROW`` with the :attr:`_sa.func` namespace, or by using the
1493:func:`_sql.tuple_` construct:
1494
1495.. sourcecode:: pycon+sql
1496
1497 >>> from sqlalchemy import table, column, func, tuple_
1498 >>> t = table("t", column("id"), column("fk"))
1499 >>> stmt = (
1500 ... t.select()
1501 ... .where(tuple_(t.c.id, t.c.fk) > (1, 2))
1502 ... .where(func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7))
1503 ... )
1504 >>> print(stmt)
1505 {printsql}SELECT t.id, t.fk
1506 FROM t
1507 WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2)
1508
1509.. seealso::
1510
1511 `PostgreSQL Row Constructors
1512 <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_
1513
1514 `PostgreSQL Row Constructor Comparison
1515 <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_
1516
1517Table Types passed to Functions
1518^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1519
1520PostgreSQL supports passing a table as an argument to a function, which is
1521known as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects
1522such as :class:`_schema.Table` support this special form using the
1523:meth:`_sql.FromClause.table_valued` method, which is comparable to the
1524:meth:`_functions.FunctionElement.table_valued` method except that the collection
1525of columns is already established by that of the :class:`_sql.FromClause`
1526itself:
1527
1528.. sourcecode:: pycon+sql
1529
1530 >>> from sqlalchemy import table, column, func, select
1531 >>> a = table("a", column("id"), column("x"), column("y"))
1532 >>> stmt = select(func.row_to_json(a.table_valued()))
1533 >>> print(stmt)
1534 {printsql}SELECT row_to_json(a) AS row_to_json_1
1535 FROM a
1536
1537.. versionadded:: 1.4.0b2
1538
1539
1540
1541""" # noqa: E501
1542
1543from __future__ import annotations
1544
1545from collections import defaultdict
1546from functools import lru_cache
1547import re
1548from typing import Any
1549from typing import cast
1550from typing import Dict
1551from typing import List
1552from typing import Optional
1553from typing import Tuple
1554from typing import TYPE_CHECKING
1555from typing import Union
1556
1557from . import arraylib as _array
1558from . import json as _json
1559from . import pg_catalog
1560from . import ranges as _ranges
1561from .ext import _regconfig_fn
1562from .ext import aggregate_order_by
1563from .hstore import HSTORE
1564from .named_types import CreateDomainType as CreateDomainType # noqa: F401
1565from .named_types import CreateEnumType as CreateEnumType # noqa: F401
1566from .named_types import DOMAIN as DOMAIN # noqa: F401
1567from .named_types import DropDomainType as DropDomainType # noqa: F401
1568from .named_types import DropEnumType as DropEnumType # noqa: F401
1569from .named_types import ENUM as ENUM # noqa: F401
1570from .named_types import NamedType as NamedType # noqa: F401
1571from .types import _DECIMAL_TYPES # noqa: F401
1572from .types import _FLOAT_TYPES # noqa: F401
1573from .types import _INT_TYPES # noqa: F401
1574from .types import BIT as BIT
1575from .types import BYTEA as BYTEA
1576from .types import CIDR as CIDR
1577from .types import CITEXT as CITEXT
1578from .types import INET as INET
1579from .types import INTERVAL as INTERVAL
1580from .types import MACADDR as MACADDR
1581from .types import MACADDR8 as MACADDR8
1582from .types import MONEY as MONEY
1583from .types import OID as OID
1584from .types import PGBit as PGBit # noqa: F401
1585from .types import PGCidr as PGCidr # noqa: F401
1586from .types import PGInet as PGInet # noqa: F401
1587from .types import PGInterval as PGInterval # noqa: F401
1588from .types import PGMacAddr as PGMacAddr # noqa: F401
1589from .types import PGMacAddr8 as PGMacAddr8 # noqa: F401
1590from .types import PGUuid as PGUuid
1591from .types import REGCLASS as REGCLASS
1592from .types import REGCONFIG as REGCONFIG # noqa: F401
1593from .types import TIME as TIME
1594from .types import TIMESTAMP as TIMESTAMP
1595from .types import TSVECTOR as TSVECTOR
1596from ... import exc
1597from ... import schema
1598from ... import select
1599from ... import sql
1600from ... import util
1601from ...engine import characteristics
1602from ...engine import default
1603from ...engine import interfaces
1604from ...engine import ObjectKind
1605from ...engine import ObjectScope
1606from ...engine import reflection
1607from ...engine import URL
1608from ...engine.reflection import ReflectionDefaults
1609from ...sql import bindparam
1610from ...sql import coercions
1611from ...sql import compiler
1612from ...sql import elements
1613from ...sql import expression
1614from ...sql import roles
1615from ...sql import sqltypes
1616from ...sql import util as sql_util
1617from ...sql.compiler import InsertmanyvaluesSentinelOpts
1618from ...sql.visitors import InternalTraversal
1619from ...types import BIGINT
1620from ...types import BOOLEAN
1621from ...types import CHAR
1622from ...types import DATE
1623from ...types import DOUBLE_PRECISION
1624from ...types import FLOAT
1625from ...types import INTEGER
1626from ...types import NUMERIC
1627from ...types import REAL
1628from ...types import SMALLINT
1629from ...types import TEXT
1630from ...types import UUID as UUID
1631from ...types import VARCHAR
1632from ...util.typing import TypedDict
1633
1634IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I)
1635
1636RESERVED_WORDS = {
1637 "all",
1638 "analyse",
1639 "analyze",
1640 "and",
1641 "any",
1642 "array",
1643 "as",
1644 "asc",
1645 "asymmetric",
1646 "both",
1647 "case",
1648 "cast",
1649 "check",
1650 "collate",
1651 "column",
1652 "constraint",
1653 "create",
1654 "current_catalog",
1655 "current_date",
1656 "current_role",
1657 "current_time",
1658 "current_timestamp",
1659 "current_user",
1660 "default",
1661 "deferrable",
1662 "desc",
1663 "distinct",
1664 "do",
1665 "else",
1666 "end",
1667 "except",
1668 "false",
1669 "fetch",
1670 "for",
1671 "foreign",
1672 "from",
1673 "grant",
1674 "group",
1675 "having",
1676 "in",
1677 "initially",
1678 "intersect",
1679 "into",
1680 "leading",
1681 "limit",
1682 "localtime",
1683 "localtimestamp",
1684 "new",
1685 "not",
1686 "null",
1687 "of",
1688 "off",
1689 "offset",
1690 "old",
1691 "on",
1692 "only",
1693 "or",
1694 "order",
1695 "placing",
1696 "primary",
1697 "references",
1698 "returning",
1699 "select",
1700 "session_user",
1701 "some",
1702 "symmetric",
1703 "table",
1704 "then",
1705 "to",
1706 "trailing",
1707 "true",
1708 "union",
1709 "unique",
1710 "user",
1711 "using",
1712 "variadic",
1713 "when",
1714 "where",
1715 "window",
1716 "with",
1717 "authorization",
1718 "between",
1719 "binary",
1720 "cross",
1721 "current_schema",
1722 "freeze",
1723 "full",
1724 "ilike",
1725 "inner",
1726 "is",
1727 "isnull",
1728 "join",
1729 "left",
1730 "like",
1731 "natural",
1732 "notnull",
1733 "outer",
1734 "over",
1735 "overlaps",
1736 "right",
1737 "similar",
1738 "verbose",
1739}
1740
1741
1742colspecs = {
1743 sqltypes.ARRAY: _array.ARRAY,
1744 sqltypes.Interval: INTERVAL,
1745 sqltypes.Enum: ENUM,
1746 sqltypes.JSON.JSONPathType: _json.JSONPATH,
1747 sqltypes.JSON: _json.JSON,
1748 sqltypes.Uuid: PGUuid,
1749}
1750
1751
1752ischema_names = {
1753 "_array": _array.ARRAY,
1754 "hstore": HSTORE,
1755 "json": _json.JSON,
1756 "jsonb": _json.JSONB,
1757 "int4range": _ranges.INT4RANGE,
1758 "int8range": _ranges.INT8RANGE,
1759 "numrange": _ranges.NUMRANGE,
1760 "daterange": _ranges.DATERANGE,
1761 "tsrange": _ranges.TSRANGE,
1762 "tstzrange": _ranges.TSTZRANGE,
1763 "int4multirange": _ranges.INT4MULTIRANGE,
1764 "int8multirange": _ranges.INT8MULTIRANGE,
1765 "nummultirange": _ranges.NUMMULTIRANGE,
1766 "datemultirange": _ranges.DATEMULTIRANGE,
1767 "tsmultirange": _ranges.TSMULTIRANGE,
1768 "tstzmultirange": _ranges.TSTZMULTIRANGE,
1769 "integer": INTEGER,
1770 "bigint": BIGINT,
1771 "smallint": SMALLINT,
1772 "character varying": VARCHAR,
1773 "character": CHAR,
1774 '"char"': sqltypes.String,
1775 "name": sqltypes.String,
1776 "text": TEXT,
1777 "numeric": NUMERIC,
1778 "float": FLOAT,
1779 "real": REAL,
1780 "inet": INET,
1781 "cidr": CIDR,
1782 "citext": CITEXT,
1783 "uuid": UUID,
1784 "bit": BIT,
1785 "bit varying": BIT,
1786 "macaddr": MACADDR,
1787 "macaddr8": MACADDR8,
1788 "money": MONEY,
1789 "oid": OID,
1790 "regclass": REGCLASS,
1791 "double precision": DOUBLE_PRECISION,
1792 "timestamp": TIMESTAMP,
1793 "timestamp with time zone": TIMESTAMP,
1794 "timestamp without time zone": TIMESTAMP,
1795 "time with time zone": TIME,
1796 "time without time zone": TIME,
1797 "date": DATE,
1798 "time": TIME,
1799 "bytea": BYTEA,
1800 "boolean": BOOLEAN,
1801 "interval": INTERVAL,
1802 "tsvector": TSVECTOR,
1803}
1804
1805
1806class PGCompiler(compiler.SQLCompiler):
1807 def visit_to_tsvector_func(self, element, **kw):
1808 return self._assert_pg_ts_ext(element, **kw)
1809
1810 def visit_to_tsquery_func(self, element, **kw):
1811 return self._assert_pg_ts_ext(element, **kw)
1812
1813 def visit_plainto_tsquery_func(self, element, **kw):
1814 return self._assert_pg_ts_ext(element, **kw)
1815
1816 def visit_phraseto_tsquery_func(self, element, **kw):
1817 return self._assert_pg_ts_ext(element, **kw)
1818
1819 def visit_websearch_to_tsquery_func(self, element, **kw):
1820 return self._assert_pg_ts_ext(element, **kw)
1821
1822 def visit_ts_headline_func(self, element, **kw):
1823 return self._assert_pg_ts_ext(element, **kw)
1824
1825 def _assert_pg_ts_ext(self, element, **kw):
1826 if not isinstance(element, _regconfig_fn):
1827 # other options here include trying to rewrite the function
1828 # with the correct types. however, that means we have to
1829 # "un-SQL-ize" the first argument, which can't work in a
1830 # generalized way. Also, parent compiler class has already added
1831 # the incorrect return type to the result map. So let's just
1832 # make sure the function we want is used up front.
1833
1834 raise exc.CompileError(
1835 f'Can\'t compile "{element.name}()" full text search '
1836 f"function construct that does not originate from the "
1837 f'"sqlalchemy.dialects.postgresql" package. '
1838 f'Please ensure "import sqlalchemy.dialects.postgresql" is '
1839 f"called before constructing "
1840 f'"sqlalchemy.func.{element.name}()" to ensure registration '
1841 f"of the correct argument and return types."
1842 )
1843
1844 return f"{element.name}{self.function_argspec(element, **kw)}"
1845
1846 def render_bind_cast(self, type_, dbapi_type, sqltext):
1847 if dbapi_type._type_affinity is sqltypes.String and dbapi_type.length:
1848 # use VARCHAR with no length for VARCHAR cast.
1849 # see #9511
1850 dbapi_type = sqltypes.STRINGTYPE
1851 return f"""{sqltext}::{
1852 self.dialect.type_compiler_instance.process(
1853 dbapi_type, identifier_preparer=self.preparer
1854 )
1855 }"""
1856
1857 def visit_array(self, element, **kw):
1858 if not element.clauses and not element.type.item_type._isnull:
1859 return "ARRAY[]::%s" % element.type.compile(self.dialect)
1860 return "ARRAY[%s]" % self.visit_clauselist(element, **kw)
1861
1862 def visit_slice(self, element, **kw):
1863 return "%s:%s" % (
1864 self.process(element.start, **kw),
1865 self.process(element.stop, **kw),
1866 )
1867
1868 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1869 return self._generate_generic_binary(binary, " # ", **kw)
1870
1871 def visit_json_getitem_op_binary(
1872 self, binary, operator, _cast_applied=False, **kw
1873 ):
1874 if (
1875 not _cast_applied
1876 and binary.type._type_affinity is not sqltypes.JSON
1877 ):
1878 kw["_cast_applied"] = True
1879 return self.process(sql.cast(binary, binary.type), **kw)
1880
1881 kw["eager_grouping"] = True
1882
1883 return self._generate_generic_binary(
1884 binary, " -> " if not _cast_applied else " ->> ", **kw
1885 )
1886
1887 def visit_json_path_getitem_op_binary(
1888 self, binary, operator, _cast_applied=False, **kw
1889 ):
1890 if (
1891 not _cast_applied
1892 and binary.type._type_affinity is not sqltypes.JSON
1893 ):
1894 kw["_cast_applied"] = True
1895 return self.process(sql.cast(binary, binary.type), **kw)
1896
1897 kw["eager_grouping"] = True
1898 return self._generate_generic_binary(
1899 binary, " #> " if not _cast_applied else " #>> ", **kw
1900 )
1901
1902 def visit_getitem_binary(self, binary, operator, **kw):
1903 return "%s[%s]" % (
1904 self.process(binary.left, **kw),
1905 self.process(binary.right, **kw),
1906 )
1907
1908 def visit_aggregate_order_by(self, element, **kw):
1909 return "%s ORDER BY %s" % (
1910 self.process(element.target, **kw),
1911 self.process(element.order_by, **kw),
1912 )
1913
1914 def visit_match_op_binary(self, binary, operator, **kw):
1915 if "postgresql_regconfig" in binary.modifiers:
1916 regconfig = self.render_literal_value(
1917 binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE
1918 )
1919 if regconfig:
1920 return "%s @@ plainto_tsquery(%s, %s)" % (
1921 self.process(binary.left, **kw),
1922 regconfig,
1923 self.process(binary.right, **kw),
1924 )
1925 return "%s @@ plainto_tsquery(%s)" % (
1926 self.process(binary.left, **kw),
1927 self.process(binary.right, **kw),
1928 )
1929
1930 def visit_ilike_case_insensitive_operand(self, element, **kw):
1931 return element.element._compiler_dispatch(self, **kw)
1932
1933 def visit_ilike_op_binary(self, binary, operator, **kw):
1934 escape = binary.modifiers.get("escape", None)
1935
1936 return "%s ILIKE %s" % (
1937 self.process(binary.left, **kw),
1938 self.process(binary.right, **kw),
1939 ) + (
1940 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
1941 if escape is not None
1942 else ""
1943 )
1944
1945 def visit_not_ilike_op_binary(self, binary, operator, **kw):
1946 escape = binary.modifiers.get("escape", None)
1947 return "%s NOT ILIKE %s" % (
1948 self.process(binary.left, **kw),
1949 self.process(binary.right, **kw),
1950 ) + (
1951 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
1952 if escape is not None
1953 else ""
1954 )
1955
1956 def _regexp_match(self, base_op, binary, operator, kw):
1957 flags = binary.modifiers["flags"]
1958 if flags is None:
1959 return self._generate_generic_binary(
1960 binary, " %s " % base_op, **kw
1961 )
1962 if flags == "i":
1963 return self._generate_generic_binary(
1964 binary, " %s* " % base_op, **kw
1965 )
1966 return "%s %s CONCAT('(?', %s, ')', %s)" % (
1967 self.process(binary.left, **kw),
1968 base_op,
1969 self.render_literal_value(flags, sqltypes.STRINGTYPE),
1970 self.process(binary.right, **kw),
1971 )
1972
1973 def visit_regexp_match_op_binary(self, binary, operator, **kw):
1974 return self._regexp_match("~", binary, operator, kw)
1975
1976 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
1977 return self._regexp_match("!~", binary, operator, kw)
1978
1979 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
1980 string = self.process(binary.left, **kw)
1981 pattern_replace = self.process(binary.right, **kw)
1982 flags = binary.modifiers["flags"]
1983 if flags is None:
1984 return "REGEXP_REPLACE(%s, %s)" % (
1985 string,
1986 pattern_replace,
1987 )
1988 else:
1989 return "REGEXP_REPLACE(%s, %s, %s)" % (
1990 string,
1991 pattern_replace,
1992 self.render_literal_value(flags, sqltypes.STRINGTYPE),
1993 )
1994
1995 def visit_empty_set_expr(self, element_types, **kw):
1996 # cast the empty set to the type we are comparing against. if
1997 # we are comparing against the null type, pick an arbitrary
1998 # datatype for the empty set
1999 return "SELECT %s WHERE 1!=1" % (
2000 ", ".join(
2001 "CAST(NULL AS %s)"
2002 % self.dialect.type_compiler_instance.process(
2003 INTEGER() if type_._isnull else type_
2004 )
2005 for type_ in element_types or [INTEGER()]
2006 ),
2007 )
2008
2009 def render_literal_value(self, value, type_):
2010 value = super().render_literal_value(value, type_)
2011
2012 if self.dialect._backslash_escapes:
2013 value = value.replace("\\", "\\\\")
2014 return value
2015
2016 def visit_aggregate_strings_func(self, fn, **kw):
2017 return "string_agg%s" % self.function_argspec(fn)
2018
2019 def visit_sequence(self, seq, **kw):
2020 return "nextval('%s')" % self.preparer.format_sequence(seq)
2021
2022 def limit_clause(self, select, **kw):
2023 text = ""
2024 if select._limit_clause is not None:
2025 text += " \n LIMIT " + self.process(select._limit_clause, **kw)
2026 if select._offset_clause is not None:
2027 if select._limit_clause is None:
2028 text += "\n LIMIT ALL"
2029 text += " OFFSET " + self.process(select._offset_clause, **kw)
2030 return text
2031
2032 def format_from_hint_text(self, sqltext, table, hint, iscrud):
2033 if hint.upper() != "ONLY":
2034 raise exc.CompileError("Unrecognized hint: %r" % hint)
2035 return "ONLY " + sqltext
2036
2037 def get_select_precolumns(self, select, **kw):
2038 # Do not call super().get_select_precolumns because
2039 # it will warn/raise when distinct on is present
2040 if select._distinct or select._distinct_on:
2041 if select._distinct_on:
2042 return (
2043 "DISTINCT ON ("
2044 + ", ".join(
2045 [
2046 self.process(col, **kw)
2047 for col in select._distinct_on
2048 ]
2049 )
2050 + ") "
2051 )
2052 else:
2053 return "DISTINCT "
2054 else:
2055 return ""
2056
2057 def for_update_clause(self, select, **kw):
2058 if select._for_update_arg.read:
2059 if select._for_update_arg.key_share:
2060 tmp = " FOR KEY SHARE"
2061 else:
2062 tmp = " FOR SHARE"
2063 elif select._for_update_arg.key_share:
2064 tmp = " FOR NO KEY UPDATE"
2065 else:
2066 tmp = " FOR UPDATE"
2067
2068 if select._for_update_arg.of:
2069 tables = util.OrderedSet()
2070 for c in select._for_update_arg.of:
2071 tables.update(sql_util.surface_selectables_only(c))
2072
2073 of_kw = dict(kw)
2074 of_kw.update(ashint=True, use_schema=False)
2075 tmp += " OF " + ", ".join(
2076 self.process(table, **of_kw) for table in tables
2077 )
2078
2079 if select._for_update_arg.nowait:
2080 tmp += " NOWAIT"
2081 if select._for_update_arg.skip_locked:
2082 tmp += " SKIP LOCKED"
2083
2084 return tmp
2085
2086 def visit_substring_func(self, func, **kw):
2087 s = self.process(func.clauses.clauses[0], **kw)
2088 start = self.process(func.clauses.clauses[1], **kw)
2089 if len(func.clauses.clauses) > 2:
2090 length = self.process(func.clauses.clauses[2], **kw)
2091 return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length)
2092 else:
2093 return "SUBSTRING(%s FROM %s)" % (s, start)
2094
2095 def _on_conflict_target(self, clause, **kw):
2096 if clause.constraint_target is not None:
2097 # target may be a name of an Index, UniqueConstraint or
2098 # ExcludeConstraint. While there is a separate
2099 # "max_identifier_length" for indexes, PostgreSQL uses the same
2100 # length for all objects so we can use
2101 # truncate_and_render_constraint_name
2102 target_text = (
2103 "ON CONSTRAINT %s"
2104 % self.preparer.truncate_and_render_constraint_name(
2105 clause.constraint_target
2106 )
2107 )
2108 elif clause.inferred_target_elements is not None:
2109 target_text = "(%s)" % ", ".join(
2110 (
2111 self.preparer.quote(c)
2112 if isinstance(c, str)
2113 else self.process(c, include_table=False, use_schema=False)
2114 )
2115 for c in clause.inferred_target_elements
2116 )
2117 if clause.inferred_target_whereclause is not None:
2118 target_text += " WHERE %s" % self.process(
2119 clause.inferred_target_whereclause,
2120 include_table=False,
2121 use_schema=False,
2122 )
2123 else:
2124 target_text = ""
2125
2126 return target_text
2127
2128 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
2129 target_text = self._on_conflict_target(on_conflict, **kw)
2130
2131 if target_text:
2132 return "ON CONFLICT %s DO NOTHING" % target_text
2133 else:
2134 return "ON CONFLICT DO NOTHING"
2135
2136 def visit_on_conflict_do_update(self, on_conflict, **kw):
2137 clause = on_conflict
2138
2139 target_text = self._on_conflict_target(on_conflict, **kw)
2140
2141 action_set_ops = []
2142
2143 set_parameters = dict(clause.update_values_to_set)
2144 # create a list of column assignment clauses as tuples
2145
2146 insert_statement = self.stack[-1]["selectable"]
2147 cols = insert_statement.table.c
2148 for c in cols:
2149 col_key = c.key
2150
2151 if col_key in set_parameters:
2152 value = set_parameters.pop(col_key)
2153 elif c in set_parameters:
2154 value = set_parameters.pop(c)
2155 else:
2156 continue
2157
2158 # TODO: this coercion should be up front. we can't cache
2159 # SQL constructs with non-bound literals buried in them
2160 if coercions._is_literal(value):
2161 value = elements.BindParameter(None, value, type_=c.type)
2162
2163 else:
2164 if (
2165 isinstance(value, elements.BindParameter)
2166 and value.type._isnull
2167 ):
2168 value = value._clone()
2169 value.type = c.type
2170 value_text = self.process(value.self_group(), use_schema=False)
2171
2172 key_text = self.preparer.quote(c.name)
2173 action_set_ops.append("%s = %s" % (key_text, value_text))
2174
2175 # check for names that don't match columns
2176 if set_parameters:
2177 util.warn(
2178 "Additional column names not matching "
2179 "any column keys in table '%s': %s"
2180 % (
2181 self.current_executable.table.name,
2182 (", ".join("'%s'" % c for c in set_parameters)),
2183 )
2184 )
2185 for k, v in set_parameters.items():
2186 key_text = (
2187 self.preparer.quote(k)
2188 if isinstance(k, str)
2189 else self.process(k, use_schema=False)
2190 )
2191 value_text = self.process(
2192 coercions.expect(roles.ExpressionElementRole, v),
2193 use_schema=False,
2194 )
2195 action_set_ops.append("%s = %s" % (key_text, value_text))
2196
2197 action_text = ", ".join(action_set_ops)
2198 if clause.update_whereclause is not None:
2199 action_text += " WHERE %s" % self.process(
2200 clause.update_whereclause, include_table=True, use_schema=False
2201 )
2202
2203 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
2204
2205 def update_from_clause(
2206 self, update_stmt, from_table, extra_froms, from_hints, **kw
2207 ):
2208 kw["asfrom"] = True
2209 return "FROM " + ", ".join(
2210 t._compiler_dispatch(self, fromhints=from_hints, **kw)
2211 for t in extra_froms
2212 )
2213
2214 def delete_extra_from_clause(
2215 self, delete_stmt, from_table, extra_froms, from_hints, **kw
2216 ):
2217 """Render the DELETE .. USING clause specific to PostgreSQL."""
2218 kw["asfrom"] = True
2219 return "USING " + ", ".join(
2220 t._compiler_dispatch(self, fromhints=from_hints, **kw)
2221 for t in extra_froms
2222 )
2223
2224 def fetch_clause(self, select, **kw):
2225 # pg requires parens for non literal clauses. It's also required for
2226 # bind parameters if a ::type casts is used by the driver (asyncpg),
2227 # so it's easiest to just always add it
2228 text = ""
2229 if select._offset_clause is not None:
2230 text += "\n OFFSET (%s) ROWS" % self.process(
2231 select._offset_clause, **kw
2232 )
2233 if select._fetch_clause is not None:
2234 text += "\n FETCH FIRST (%s)%s ROWS %s" % (
2235 self.process(select._fetch_clause, **kw),
2236 " PERCENT" if select._fetch_clause_options["percent"] else "",
2237 (
2238 "WITH TIES"
2239 if select._fetch_clause_options["with_ties"]
2240 else "ONLY"
2241 ),
2242 )
2243 return text
2244
2245
2246class PGDDLCompiler(compiler.DDLCompiler):
2247 def get_column_specification(self, column, **kwargs):
2248 colspec = self.preparer.format_column(column)
2249 impl_type = column.type.dialect_impl(self.dialect)
2250 if isinstance(impl_type, sqltypes.TypeDecorator):
2251 impl_type = impl_type.impl
2252
2253 has_identity = (
2254 column.identity is not None
2255 and self.dialect.supports_identity_columns
2256 )
2257
2258 if (
2259 column.primary_key
2260 and column is column.table._autoincrement_column
2261 and (
2262 self.dialect.supports_smallserial
2263 or not isinstance(impl_type, sqltypes.SmallInteger)
2264 )
2265 and not has_identity
2266 and (
2267 column.default is None
2268 or (
2269 isinstance(column.default, schema.Sequence)
2270 and column.default.optional
2271 )
2272 )
2273 ):
2274 if isinstance(impl_type, sqltypes.BigInteger):
2275 colspec += " BIGSERIAL"
2276 elif isinstance(impl_type, sqltypes.SmallInteger):
2277 colspec += " SMALLSERIAL"
2278 else:
2279 colspec += " SERIAL"
2280 else:
2281 colspec += " " + self.dialect.type_compiler_instance.process(
2282 column.type,
2283 type_expression=column,
2284 identifier_preparer=self.preparer,
2285 )
2286 default = self.get_column_default_string(column)
2287 if default is not None:
2288 colspec += " DEFAULT " + default
2289
2290 if column.computed is not None:
2291 colspec += " " + self.process(column.computed)
2292 if has_identity:
2293 colspec += " " + self.process(column.identity)
2294
2295 if not column.nullable and not has_identity:
2296 colspec += " NOT NULL"
2297 elif column.nullable and has_identity:
2298 colspec += " NULL"
2299 return colspec
2300
2301 def _define_constraint_validity(self, constraint):
2302 not_valid = constraint.dialect_options["postgresql"]["not_valid"]
2303 return " NOT VALID" if not_valid else ""
2304
2305 def _define_include(self, obj):
2306 includeclause = obj.dialect_options["postgresql"]["include"]
2307 if not includeclause:
2308 return ""
2309 inclusions = [
2310 obj.table.c[col] if isinstance(col, str) else col
2311 for col in includeclause
2312 ]
2313 return " INCLUDE (%s)" % ", ".join(
2314 [self.preparer.quote(c.name) for c in inclusions]
2315 )
2316
2317 def visit_check_constraint(self, constraint, **kw):
2318 if constraint._type_bound:
2319 typ = list(constraint.columns)[0].type
2320 if (
2321 isinstance(typ, sqltypes.ARRAY)
2322 and isinstance(typ.item_type, sqltypes.Enum)
2323 and not typ.item_type.native_enum
2324 ):
2325 raise exc.CompileError(
2326 "PostgreSQL dialect cannot produce the CHECK constraint "
2327 "for ARRAY of non-native ENUM; please specify "
2328 "create_constraint=False on this Enum datatype."
2329 )
2330
2331 text = super().visit_check_constraint(constraint)
2332 text += self._define_constraint_validity(constraint)
2333 return text
2334
2335 def visit_foreign_key_constraint(self, constraint, **kw):
2336 text = super().visit_foreign_key_constraint(constraint)
2337 text += self._define_constraint_validity(constraint)
2338 return text
2339
2340 def visit_primary_key_constraint(self, constraint, **kw):
2341 text = super().visit_primary_key_constraint(constraint)
2342 text += self._define_include(constraint)
2343 return text
2344
2345 def visit_unique_constraint(self, constraint, **kw):
2346 text = super().visit_unique_constraint(constraint)
2347 text += self._define_include(constraint)
2348 return text
2349
2350 @util.memoized_property
2351 def _fk_ondelete_pattern(self):
2352 return re.compile(
2353 r"^(?:RESTRICT|CASCADE|SET (?:NULL|DEFAULT)(?:\s*\(.+\))?"
2354 r"|NO ACTION)$",
2355 re.I,
2356 )
2357
2358 def define_constraint_ondelete_cascade(self, constraint):
2359 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
2360 constraint.ondelete, self._fk_ondelete_pattern
2361 )
2362
2363 def visit_create_enum_type(self, create, **kw):
2364 type_ = create.element
2365
2366 return "CREATE TYPE %s AS ENUM (%s)" % (
2367 self.preparer.format_type(type_),
2368 ", ".join(
2369 self.sql_compiler.process(sql.literal(e), literal_binds=True)
2370 for e in type_.enums
2371 ),
2372 )
2373
2374 def visit_drop_enum_type(self, drop, **kw):
2375 type_ = drop.element
2376
2377 return "DROP TYPE %s" % (self.preparer.format_type(type_))
2378
2379 def visit_create_domain_type(self, create, **kw):
2380 domain: DOMAIN = create.element
2381
2382 options = []
2383 if domain.collation is not None:
2384 options.append(f"COLLATE {self.preparer.quote(domain.collation)}")
2385 if domain.default is not None:
2386 default = self.render_default_string(domain.default)
2387 options.append(f"DEFAULT {default}")
2388 if domain.constraint_name is not None:
2389 name = self.preparer.truncate_and_render_constraint_name(
2390 domain.constraint_name
2391 )
2392 options.append(f"CONSTRAINT {name}")
2393 if domain.not_null:
2394 options.append("NOT NULL")
2395 if domain.check is not None:
2396 check = self.sql_compiler.process(
2397 domain.check, include_table=False, literal_binds=True
2398 )
2399 options.append(f"CHECK ({check})")
2400
2401 return (
2402 f"CREATE DOMAIN {self.preparer.format_type(domain)} AS "
2403 f"{self.type_compiler.process(domain.data_type)} "
2404 f"{' '.join(options)}"
2405 )
2406
2407 def visit_drop_domain_type(self, drop, **kw):
2408 domain = drop.element
2409 return f"DROP DOMAIN {self.preparer.format_type(domain)}"
2410
2411 def visit_create_index(self, create, **kw):
2412 preparer = self.preparer
2413 index = create.element
2414 self._verify_index_table(index)
2415 text = "CREATE "
2416 if index.unique:
2417 text += "UNIQUE "
2418
2419 text += "INDEX "
2420
2421 if self.dialect._supports_create_index_concurrently:
2422 concurrently = index.dialect_options["postgresql"]["concurrently"]
2423 if concurrently:
2424 text += "CONCURRENTLY "
2425
2426 if create.if_not_exists:
2427 text += "IF NOT EXISTS "
2428
2429 text += "%s ON %s " % (
2430 self._prepared_index_name(index, include_schema=False),
2431 preparer.format_table(index.table),
2432 )
2433
2434 using = index.dialect_options["postgresql"]["using"]
2435 if using:
2436 text += (
2437 "USING %s "
2438 % self.preparer.validate_sql_phrase(using, IDX_USING).lower()
2439 )
2440
2441 ops = index.dialect_options["postgresql"]["ops"]
2442 text += "(%s)" % (
2443 ", ".join(
2444 [
2445 self.sql_compiler.process(
2446 (
2447 expr.self_group()
2448 if not isinstance(expr, expression.ColumnClause)
2449 else expr
2450 ),
2451 include_table=False,
2452 literal_binds=True,
2453 )
2454 + (
2455 (" " + ops[expr.key])
2456 if hasattr(expr, "key") and expr.key in ops
2457 else ""
2458 )
2459 for expr in index.expressions
2460 ]
2461 )
2462 )
2463
2464 text += self._define_include(index)
2465
2466 nulls_not_distinct = index.dialect_options["postgresql"][
2467 "nulls_not_distinct"
2468 ]
2469 if nulls_not_distinct is True:
2470 text += " NULLS NOT DISTINCT"
2471 elif nulls_not_distinct is False:
2472 text += " NULLS DISTINCT"
2473
2474 withclause = index.dialect_options["postgresql"]["with"]
2475 if withclause:
2476 text += " WITH (%s)" % (
2477 ", ".join(
2478 [
2479 "%s = %s" % storage_parameter
2480 for storage_parameter in withclause.items()
2481 ]
2482 )
2483 )
2484
2485 tablespace_name = index.dialect_options["postgresql"]["tablespace"]
2486 if tablespace_name:
2487 text += " TABLESPACE %s" % preparer.quote(tablespace_name)
2488
2489 whereclause = index.dialect_options["postgresql"]["where"]
2490 if whereclause is not None:
2491 whereclause = coercions.expect(
2492 roles.DDLExpressionRole, whereclause
2493 )
2494
2495 where_compiled = self.sql_compiler.process(
2496 whereclause, include_table=False, literal_binds=True
2497 )
2498 text += " WHERE " + where_compiled
2499
2500 return text
2501
2502 def define_unique_constraint_distinct(self, constraint, **kw):
2503 nulls_not_distinct = constraint.dialect_options["postgresql"][
2504 "nulls_not_distinct"
2505 ]
2506 if nulls_not_distinct is True:
2507 nulls_not_distinct_param = "NULLS NOT DISTINCT "
2508 elif nulls_not_distinct is False:
2509 nulls_not_distinct_param = "NULLS DISTINCT "
2510 else:
2511 nulls_not_distinct_param = ""
2512 return nulls_not_distinct_param
2513
2514 def visit_drop_index(self, drop, **kw):
2515 index = drop.element
2516
2517 text = "\nDROP INDEX "
2518
2519 if self.dialect._supports_drop_index_concurrently:
2520 concurrently = index.dialect_options["postgresql"]["concurrently"]
2521 if concurrently:
2522 text += "CONCURRENTLY "
2523
2524 if drop.if_exists:
2525 text += "IF EXISTS "
2526
2527 text += self._prepared_index_name(index, include_schema=True)
2528 return text
2529
2530 def visit_exclude_constraint(self, constraint, **kw):
2531 text = ""
2532 if constraint.name is not None:
2533 text += "CONSTRAINT %s " % self.preparer.format_constraint(
2534 constraint
2535 )
2536 elements = []
2537 kw["include_table"] = False
2538 kw["literal_binds"] = True
2539 for expr, name, op in constraint._render_exprs:
2540 exclude_element = self.sql_compiler.process(expr, **kw) + (
2541 (" " + constraint.ops[expr.key])
2542 if hasattr(expr, "key") and expr.key in constraint.ops
2543 else ""
2544 )
2545
2546 elements.append("%s WITH %s" % (exclude_element, op))
2547 text += "EXCLUDE USING %s (%s)" % (
2548 self.preparer.validate_sql_phrase(
2549 constraint.using, IDX_USING
2550 ).lower(),
2551 ", ".join(elements),
2552 )
2553 if constraint.where is not None:
2554 text += " WHERE (%s)" % self.sql_compiler.process(
2555 constraint.where, literal_binds=True
2556 )
2557 text += self.define_constraint_deferrability(constraint)
2558 return text
2559
2560 def post_create_table(self, table):
2561 table_opts = []
2562 pg_opts = table.dialect_options["postgresql"]
2563
2564 inherits = pg_opts.get("inherits")
2565 if inherits is not None:
2566 if not isinstance(inherits, (list, tuple)):
2567 inherits = (inherits,)
2568 table_opts.append(
2569 "\n INHERITS ( "
2570 + ", ".join(self.preparer.quote(name) for name in inherits)
2571 + " )"
2572 )
2573
2574 if pg_opts["partition_by"]:
2575 table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"])
2576
2577 if pg_opts["using"]:
2578 table_opts.append("\n USING %s" % pg_opts["using"])
2579
2580 if pg_opts["with_oids"] is True:
2581 table_opts.append("\n WITH OIDS")
2582 elif pg_opts["with_oids"] is False:
2583 table_opts.append("\n WITHOUT OIDS")
2584
2585 if pg_opts["on_commit"]:
2586 on_commit_options = pg_opts["on_commit"].replace("_", " ").upper()
2587 table_opts.append("\n ON COMMIT %s" % on_commit_options)
2588
2589 if pg_opts["tablespace"]:
2590 tablespace_name = pg_opts["tablespace"]
2591 table_opts.append(
2592 "\n TABLESPACE %s" % self.preparer.quote(tablespace_name)
2593 )
2594
2595 return "".join(table_opts)
2596
2597 def visit_computed_column(self, generated, **kw):
2598 if generated.persisted is False:
2599 raise exc.CompileError(
2600 "PostrgreSQL computed columns do not support 'virtual' "
2601 "persistence; set the 'persisted' flag to None or True for "
2602 "PostgreSQL support."
2603 )
2604
2605 return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process(
2606 generated.sqltext, include_table=False, literal_binds=True
2607 )
2608
2609 def visit_create_sequence(self, create, **kw):
2610 prefix = None
2611 if create.element.data_type is not None:
2612 prefix = " AS %s" % self.type_compiler.process(
2613 create.element.data_type
2614 )
2615
2616 return super().visit_create_sequence(create, prefix=prefix, **kw)
2617
2618 def _can_comment_on_constraint(self, ddl_instance):
2619 constraint = ddl_instance.element
2620 if constraint.name is None:
2621 raise exc.CompileError(
2622 f"Can't emit COMMENT ON for constraint {constraint!r}: "
2623 "it has no name"
2624 )
2625 if constraint.table is None:
2626 raise exc.CompileError(
2627 f"Can't emit COMMENT ON for constraint {constraint!r}: "
2628 "it has no associated table"
2629 )
2630
2631 def visit_set_constraint_comment(self, create, **kw):
2632 self._can_comment_on_constraint(create)
2633 return "COMMENT ON CONSTRAINT %s ON %s IS %s" % (
2634 self.preparer.format_constraint(create.element),
2635 self.preparer.format_table(create.element.table),
2636 self.sql_compiler.render_literal_value(
2637 create.element.comment, sqltypes.String()
2638 ),
2639 )
2640
2641 def visit_drop_constraint_comment(self, drop, **kw):
2642 self._can_comment_on_constraint(drop)
2643 return "COMMENT ON CONSTRAINT %s ON %s IS NULL" % (
2644 self.preparer.format_constraint(drop.element),
2645 self.preparer.format_table(drop.element.table),
2646 )
2647
2648
2649class PGTypeCompiler(compiler.GenericTypeCompiler):
2650 def visit_TSVECTOR(self, type_, **kw):
2651 return "TSVECTOR"
2652
2653 def visit_TSQUERY(self, type_, **kw):
2654 return "TSQUERY"
2655
2656 def visit_INET(self, type_, **kw):
2657 return "INET"
2658
2659 def visit_CIDR(self, type_, **kw):
2660 return "CIDR"
2661
2662 def visit_CITEXT(self, type_, **kw):
2663 return "CITEXT"
2664
2665 def visit_MACADDR(self, type_, **kw):
2666 return "MACADDR"
2667
2668 def visit_MACADDR8(self, type_, **kw):
2669 return "MACADDR8"
2670
2671 def visit_MONEY(self, type_, **kw):
2672 return "MONEY"
2673
2674 def visit_OID(self, type_, **kw):
2675 return "OID"
2676
2677 def visit_REGCONFIG(self, type_, **kw):
2678 return "REGCONFIG"
2679
2680 def visit_REGCLASS(self, type_, **kw):
2681 return "REGCLASS"
2682
2683 def visit_FLOAT(self, type_, **kw):
2684 if not type_.precision:
2685 return "FLOAT"
2686 else:
2687 return "FLOAT(%(precision)s)" % {"precision": type_.precision}
2688
2689 def visit_double(self, type_, **kw):
2690 return self.visit_DOUBLE_PRECISION(type, **kw)
2691
2692 def visit_BIGINT(self, type_, **kw):
2693 return "BIGINT"
2694
2695 def visit_HSTORE(self, type_, **kw):
2696 return "HSTORE"
2697
2698 def visit_JSON(self, type_, **kw):
2699 return "JSON"
2700
2701 def visit_JSONB(self, type_, **kw):
2702 return "JSONB"
2703
2704 def visit_INT4MULTIRANGE(self, type_, **kw):
2705 return "INT4MULTIRANGE"
2706
2707 def visit_INT8MULTIRANGE(self, type_, **kw):
2708 return "INT8MULTIRANGE"
2709
2710 def visit_NUMMULTIRANGE(self, type_, **kw):
2711 return "NUMMULTIRANGE"
2712
2713 def visit_DATEMULTIRANGE(self, type_, **kw):
2714 return "DATEMULTIRANGE"
2715
2716 def visit_TSMULTIRANGE(self, type_, **kw):
2717 return "TSMULTIRANGE"
2718
2719 def visit_TSTZMULTIRANGE(self, type_, **kw):
2720 return "TSTZMULTIRANGE"
2721
2722 def visit_INT4RANGE(self, type_, **kw):
2723 return "INT4RANGE"
2724
2725 def visit_INT8RANGE(self, type_, **kw):
2726 return "INT8RANGE"
2727
2728 def visit_NUMRANGE(self, type_, **kw):
2729 return "NUMRANGE"
2730
2731 def visit_DATERANGE(self, type_, **kw):
2732 return "DATERANGE"
2733
2734 def visit_TSRANGE(self, type_, **kw):
2735 return "TSRANGE"
2736
2737 def visit_TSTZRANGE(self, type_, **kw):
2738 return "TSTZRANGE"
2739
2740 def visit_json_int_index(self, type_, **kw):
2741 return "INT"
2742
2743 def visit_json_str_index(self, type_, **kw):
2744 return "TEXT"
2745
2746 def visit_datetime(self, type_, **kw):
2747 return self.visit_TIMESTAMP(type_, **kw)
2748
2749 def visit_enum(self, type_, **kw):
2750 if not type_.native_enum or not self.dialect.supports_native_enum:
2751 return super().visit_enum(type_, **kw)
2752 else:
2753 return self.visit_ENUM(type_, **kw)
2754
2755 def visit_ENUM(self, type_, identifier_preparer=None, **kw):
2756 if identifier_preparer is None:
2757 identifier_preparer = self.dialect.identifier_preparer
2758 return identifier_preparer.format_type(type_)
2759
2760 def visit_DOMAIN(self, type_, identifier_preparer=None, **kw):
2761 if identifier_preparer is None:
2762 identifier_preparer = self.dialect.identifier_preparer
2763 return identifier_preparer.format_type(type_)
2764
2765 def visit_TIMESTAMP(self, type_, **kw):
2766 return "TIMESTAMP%s %s" % (
2767 (
2768 "(%d)" % type_.precision
2769 if getattr(type_, "precision", None) is not None
2770 else ""
2771 ),
2772 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
2773 )
2774
2775 def visit_TIME(self, type_, **kw):
2776 return "TIME%s %s" % (
2777 (
2778 "(%d)" % type_.precision
2779 if getattr(type_, "precision", None) is not None
2780 else ""
2781 ),
2782 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
2783 )
2784
2785 def visit_INTERVAL(self, type_, **kw):
2786 text = "INTERVAL"
2787 if type_.fields is not None:
2788 text += " " + type_.fields
2789 if type_.precision is not None:
2790 text += " (%d)" % type_.precision
2791 return text
2792
2793 def visit_BIT(self, type_, **kw):
2794 if type_.varying:
2795 compiled = "BIT VARYING"
2796 if type_.length is not None:
2797 compiled += "(%d)" % type_.length
2798 else:
2799 compiled = "BIT(%d)" % type_.length
2800 return compiled
2801
2802 def visit_uuid(self, type_, **kw):
2803 if type_.native_uuid:
2804 return self.visit_UUID(type_, **kw)
2805 else:
2806 return super().visit_uuid(type_, **kw)
2807
2808 def visit_UUID(self, type_, **kw):
2809 return "UUID"
2810
2811 def visit_large_binary(self, type_, **kw):
2812 return self.visit_BYTEA(type_, **kw)
2813
2814 def visit_BYTEA(self, type_, **kw):
2815 return "BYTEA"
2816
2817 def visit_ARRAY(self, type_, **kw):
2818 inner = self.process(type_.item_type, **kw)
2819 return re.sub(
2820 r"((?: COLLATE.*)?)$",
2821 (
2822 r"%s\1"
2823 % (
2824 "[]"
2825 * (type_.dimensions if type_.dimensions is not None else 1)
2826 )
2827 ),
2828 inner,
2829 count=1,
2830 )
2831
2832 def visit_json_path(self, type_, **kw):
2833 return self.visit_JSONPATH(type_, **kw)
2834
2835 def visit_JSONPATH(self, type_, **kw):
2836 return "JSONPATH"
2837
2838
2839class PGIdentifierPreparer(compiler.IdentifierPreparer):
2840 reserved_words = RESERVED_WORDS
2841
2842 def _unquote_identifier(self, value):
2843 if value[0] == self.initial_quote:
2844 value = value[1:-1].replace(
2845 self.escape_to_quote, self.escape_quote
2846 )
2847 return value
2848
2849 def format_type(self, type_, use_schema=True):
2850 if not type_.name:
2851 raise exc.CompileError(
2852 f"PostgreSQL {type_.__class__.__name__} type requires a name."
2853 )
2854
2855 name = self.quote(type_.name)
2856 effective_schema = self.schema_for_object(type_)
2857
2858 if (
2859 not self.omit_schema
2860 and use_schema
2861 and effective_schema is not None
2862 ):
2863 name = f"{self.quote_schema(effective_schema)}.{name}"
2864 return name
2865
2866
2867class ReflectedNamedType(TypedDict):
2868 """Represents a reflected named type."""
2869
2870 name: str
2871 """Name of the type."""
2872 schema: str
2873 """The schema of the type."""
2874 visible: bool
2875 """Indicates if this type is in the current search path."""
2876
2877
2878class ReflectedDomainConstraint(TypedDict):
2879 """Represents a reflect check constraint of a domain."""
2880
2881 name: str
2882 """Name of the constraint."""
2883 check: str
2884 """The check constraint text."""
2885
2886
2887class ReflectedDomain(ReflectedNamedType):
2888 """Represents a reflected enum."""
2889
2890 type: str
2891 """The string name of the underlying data type of the domain."""
2892 nullable: bool
2893 """Indicates if the domain allows null or not."""
2894 default: Optional[str]
2895 """The string representation of the default value of this domain
2896 or ``None`` if none present.
2897 """
2898 constraints: List[ReflectedDomainConstraint]
2899 """The constraints defined in the domain, if any.
2900 The constraint are in order of evaluation by postgresql.
2901 """
2902 collation: Optional[str]
2903 """The collation for the domain."""
2904
2905
2906class ReflectedEnum(ReflectedNamedType):
2907 """Represents a reflected enum."""
2908
2909 labels: List[str]
2910 """The labels that compose the enum."""
2911
2912
2913class PGInspector(reflection.Inspector):
2914 dialect: PGDialect
2915
2916 def get_table_oid(
2917 self, table_name: str, schema: Optional[str] = None
2918 ) -> int:
2919 """Return the OID for the given table name.
2920
2921 :param table_name: string name of the table. For special quoting,
2922 use :class:`.quoted_name`.
2923
2924 :param schema: string schema name; if omitted, uses the default schema
2925 of the database connection. For special quoting,
2926 use :class:`.quoted_name`.
2927
2928 """
2929
2930 with self._operation_context() as conn:
2931 return self.dialect.get_table_oid(
2932 conn, table_name, schema, info_cache=self.info_cache
2933 )
2934
2935 def get_domains(
2936 self, schema: Optional[str] = None
2937 ) -> List[ReflectedDomain]:
2938 """Return a list of DOMAIN objects.
2939
2940 Each member is a dictionary containing these fields:
2941
2942 * name - name of the domain
2943 * schema - the schema name for the domain.
2944 * visible - boolean, whether or not this domain is visible
2945 in the default search path.
2946 * type - the type defined by this domain.
2947 * nullable - Indicates if this domain can be ``NULL``.
2948 * default - The default value of the domain or ``None`` if the
2949 domain has no default.
2950 * constraints - A list of dict wit the constraint defined by this
2951 domain. Each element constaints two keys: ``name`` of the
2952 constraint and ``check`` with the constraint text.
2953
2954 :param schema: schema name. If None, the default schema
2955 (typically 'public') is used. May also be set to ``'*'`` to
2956 indicate load domains for all schemas.
2957
2958 .. versionadded:: 2.0
2959
2960 """
2961 with self._operation_context() as conn:
2962 return self.dialect._load_domains(
2963 conn, schema, info_cache=self.info_cache
2964 )
2965
2966 def get_enums(self, schema: Optional[str] = None) -> List[ReflectedEnum]:
2967 """Return a list of ENUM objects.
2968
2969 Each member is a dictionary containing these fields:
2970
2971 * name - name of the enum
2972 * schema - the schema name for the enum.
2973 * visible - boolean, whether or not this enum is visible
2974 in the default search path.
2975 * labels - a list of string labels that apply to the enum.
2976
2977 :param schema: schema name. If None, the default schema
2978 (typically 'public') is used. May also be set to ``'*'`` to
2979 indicate load enums for all schemas.
2980
2981 """
2982 with self._operation_context() as conn:
2983 return self.dialect._load_enums(
2984 conn, schema, info_cache=self.info_cache
2985 )
2986
2987 def get_foreign_table_names(
2988 self, schema: Optional[str] = None
2989 ) -> List[str]:
2990 """Return a list of FOREIGN TABLE names.
2991
2992 Behavior is similar to that of
2993 :meth:`_reflection.Inspector.get_table_names`,
2994 except that the list is limited to those tables that report a
2995 ``relkind`` value of ``f``.
2996
2997 """
2998 with self._operation_context() as conn:
2999 return self.dialect._get_foreign_table_names(
3000 conn, schema, info_cache=self.info_cache
3001 )
3002
3003 def has_type(
3004 self, type_name: str, schema: Optional[str] = None, **kw: Any
3005 ) -> bool:
3006 """Return if the database has the specified type in the provided
3007 schema.
3008
3009 :param type_name: the type to check.
3010 :param schema: schema name. If None, the default schema
3011 (typically 'public') is used. May also be set to ``'*'`` to
3012 check in all schemas.
3013
3014 .. versionadded:: 2.0
3015
3016 """
3017 with self._operation_context() as conn:
3018 return self.dialect.has_type(
3019 conn, type_name, schema, info_cache=self.info_cache
3020 )
3021
3022
3023class PGExecutionContext(default.DefaultExecutionContext):
3024 def fire_sequence(self, seq, type_):
3025 return self._execute_scalar(
3026 (
3027 "select nextval('%s')"
3028 % self.identifier_preparer.format_sequence(seq)
3029 ),
3030 type_,
3031 )
3032
3033 def get_insert_default(self, column):
3034 if column.primary_key and column is column.table._autoincrement_column:
3035 if column.server_default and column.server_default.has_argument:
3036 # pre-execute passive defaults on primary key columns
3037 return self._execute_scalar(
3038 "select %s" % column.server_default.arg, column.type
3039 )
3040
3041 elif column.default is None or (
3042 column.default.is_sequence and column.default.optional
3043 ):
3044 # execute the sequence associated with a SERIAL primary
3045 # key column. for non-primary-key SERIAL, the ID just
3046 # generates server side.
3047
3048 try:
3049 seq_name = column._postgresql_seq_name
3050 except AttributeError:
3051 tab = column.table.name
3052 col = column.name
3053 tab = tab[0 : 29 + max(0, (29 - len(col)))]
3054 col = col[0 : 29 + max(0, (29 - len(tab)))]
3055 name = "%s_%s_seq" % (tab, col)
3056 column._postgresql_seq_name = seq_name = name
3057
3058 if column.table is not None:
3059 effective_schema = self.connection.schema_for_object(
3060 column.table
3061 )
3062 else:
3063 effective_schema = None
3064
3065 if effective_schema is not None:
3066 exc = 'select nextval(\'"%s"."%s"\')' % (
3067 effective_schema,
3068 seq_name,
3069 )
3070 else:
3071 exc = "select nextval('\"%s\"')" % (seq_name,)
3072
3073 return self._execute_scalar(exc, column.type)
3074
3075 return super().get_insert_default(column)
3076
3077
3078class PGReadOnlyConnectionCharacteristic(
3079 characteristics.ConnectionCharacteristic
3080):
3081 transactional = True
3082
3083 def reset_characteristic(self, dialect, dbapi_conn):
3084 dialect.set_readonly(dbapi_conn, False)
3085
3086 def set_characteristic(self, dialect, dbapi_conn, value):
3087 dialect.set_readonly(dbapi_conn, value)
3088
3089 def get_characteristic(self, dialect, dbapi_conn):
3090 return dialect.get_readonly(dbapi_conn)
3091
3092
3093class PGDeferrableConnectionCharacteristic(
3094 characteristics.ConnectionCharacteristic
3095):
3096 transactional = True
3097
3098 def reset_characteristic(self, dialect, dbapi_conn):
3099 dialect.set_deferrable(dbapi_conn, False)
3100
3101 def set_characteristic(self, dialect, dbapi_conn, value):
3102 dialect.set_deferrable(dbapi_conn, value)
3103
3104 def get_characteristic(self, dialect, dbapi_conn):
3105 return dialect.get_deferrable(dbapi_conn)
3106
3107
3108class PGDialect(default.DefaultDialect):
3109 name = "postgresql"
3110 supports_statement_cache = True
3111 supports_alter = True
3112 max_identifier_length = 63
3113 supports_sane_rowcount = True
3114
3115 bind_typing = interfaces.BindTyping.RENDER_CASTS
3116
3117 supports_native_enum = True
3118 supports_native_boolean = True
3119 supports_native_uuid = True
3120 supports_smallserial = True
3121
3122 supports_sequences = True
3123 sequences_optional = True
3124 preexecute_autoincrement_sequences = True
3125 postfetch_lastrowid = False
3126 use_insertmanyvalues = True
3127
3128 returns_native_bytes = True
3129
3130 insertmanyvalues_implicit_sentinel = (
3131 InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
3132 | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
3133 | InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
3134 )
3135
3136 supports_comments = True
3137 supports_constraint_comments = True
3138 supports_default_values = True
3139
3140 supports_default_metavalue = True
3141
3142 supports_empty_insert = False
3143 supports_multivalues_insert = True
3144
3145 supports_identity_columns = True
3146
3147 default_paramstyle = "pyformat"
3148 ischema_names = ischema_names
3149 colspecs = colspecs
3150
3151 statement_compiler = PGCompiler
3152 ddl_compiler = PGDDLCompiler
3153 type_compiler_cls = PGTypeCompiler
3154 preparer = PGIdentifierPreparer
3155 execution_ctx_cls = PGExecutionContext
3156 inspector = PGInspector
3157
3158 update_returning = True
3159 delete_returning = True
3160 insert_returning = True
3161 update_returning_multifrom = True
3162 delete_returning_multifrom = True
3163
3164 connection_characteristics = (
3165 default.DefaultDialect.connection_characteristics
3166 )
3167 connection_characteristics = connection_characteristics.union(
3168 {
3169 "postgresql_readonly": PGReadOnlyConnectionCharacteristic(),
3170 "postgresql_deferrable": PGDeferrableConnectionCharacteristic(),
3171 }
3172 )
3173
3174 construct_arguments = [
3175 (
3176 schema.Index,
3177 {
3178 "using": False,
3179 "include": None,
3180 "where": None,
3181 "ops": {},
3182 "concurrently": False,
3183 "with": {},
3184 "tablespace": None,
3185 "nulls_not_distinct": None,
3186 },
3187 ),
3188 (
3189 schema.Table,
3190 {
3191 "ignore_search_path": False,
3192 "tablespace": None,
3193 "partition_by": None,
3194 "with_oids": None,
3195 "on_commit": None,
3196 "inherits": None,
3197 "using": None,
3198 },
3199 ),
3200 (
3201 schema.CheckConstraint,
3202 {
3203 "not_valid": False,
3204 },
3205 ),
3206 (
3207 schema.ForeignKeyConstraint,
3208 {
3209 "not_valid": False,
3210 },
3211 ),
3212 (
3213 schema.PrimaryKeyConstraint,
3214 {"include": None},
3215 ),
3216 (
3217 schema.UniqueConstraint,
3218 {
3219 "include": None,
3220 "nulls_not_distinct": None,
3221 },
3222 ),
3223 ]
3224
3225 reflection_options = ("postgresql_ignore_search_path",)
3226
3227 _backslash_escapes = True
3228 _supports_create_index_concurrently = True
3229 _supports_drop_index_concurrently = True
3230
3231 def __init__(
3232 self,
3233 native_inet_types=None,
3234 json_serializer=None,
3235 json_deserializer=None,
3236 **kwargs,
3237 ):
3238 default.DefaultDialect.__init__(self, **kwargs)
3239
3240 self._native_inet_types = native_inet_types
3241 self._json_deserializer = json_deserializer
3242 self._json_serializer = json_serializer
3243
3244 def initialize(self, connection):
3245 super().initialize(connection)
3246
3247 # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689
3248 self.supports_smallserial = self.server_version_info >= (9, 2)
3249
3250 self._set_backslash_escapes(connection)
3251
3252 self._supports_drop_index_concurrently = self.server_version_info >= (
3253 9,
3254 2,
3255 )
3256 self.supports_identity_columns = self.server_version_info >= (10,)
3257
3258 def get_isolation_level_values(self, dbapi_conn):
3259 # note the generic dialect doesn't have AUTOCOMMIT, however
3260 # all postgresql dialects should include AUTOCOMMIT.
3261 return (
3262 "SERIALIZABLE",
3263 "READ UNCOMMITTED",
3264 "READ COMMITTED",
3265 "REPEATABLE READ",
3266 )
3267
3268 def set_isolation_level(self, dbapi_connection, level):
3269 cursor = dbapi_connection.cursor()
3270 cursor.execute(
3271 "SET SESSION CHARACTERISTICS AS TRANSACTION "
3272 f"ISOLATION LEVEL {level}"
3273 )
3274 cursor.execute("COMMIT")
3275 cursor.close()
3276
3277 def get_isolation_level(self, dbapi_connection):
3278 cursor = dbapi_connection.cursor()
3279 cursor.execute("show transaction isolation level")
3280 val = cursor.fetchone()[0]
3281 cursor.close()
3282 return val.upper()
3283
3284 def set_readonly(self, connection, value):
3285 raise NotImplementedError()
3286
3287 def get_readonly(self, connection):
3288 raise NotImplementedError()
3289
3290 def set_deferrable(self, connection, value):
3291 raise NotImplementedError()
3292
3293 def get_deferrable(self, connection):
3294 raise NotImplementedError()
3295
3296 def _split_multihost_from_url(self, url: URL) -> Union[
3297 Tuple[None, None],
3298 Tuple[Tuple[Optional[str], ...], Tuple[Optional[int], ...]],
3299 ]:
3300 hosts: Optional[Tuple[Optional[str], ...]] = None
3301 ports_str: Union[str, Tuple[Optional[str], ...], None] = None
3302
3303 integrated_multihost = False
3304
3305 if "host" in url.query:
3306 if isinstance(url.query["host"], (list, tuple)):
3307 integrated_multihost = True
3308 hosts, ports_str = zip(
3309 *[
3310 token.split(":") if ":" in token else (token, None)
3311 for token in url.query["host"]
3312 ]
3313 )
3314
3315 elif isinstance(url.query["host"], str):
3316 hosts = tuple(url.query["host"].split(","))
3317
3318 if (
3319 "port" not in url.query
3320 and len(hosts) == 1
3321 and ":" in hosts[0]
3322 ):
3323 # internet host is alphanumeric plus dots or hyphens.
3324 # this is essentially rfc1123, which refers to rfc952.
3325 # https://stackoverflow.com/questions/3523028/
3326 # valid-characters-of-a-hostname
3327 host_port_match = re.match(
3328 r"^([a-zA-Z0-9\-\.]*)(?:\:(\d*))?$", hosts[0]
3329 )
3330 if host_port_match:
3331 integrated_multihost = True
3332 h, p = host_port_match.group(1, 2)
3333 if TYPE_CHECKING:
3334 assert isinstance(h, str)
3335 assert isinstance(p, str)
3336 hosts = (h,)
3337 ports_str = cast(
3338 "Tuple[Optional[str], ...]", (p,) if p else (None,)
3339 )
3340
3341 if "port" in url.query:
3342 if integrated_multihost:
3343 raise exc.ArgumentError(
3344 "Can't mix 'multihost' formats together; use "
3345 '"host=h1,h2,h3&port=p1,p2,p3" or '
3346 '"host=h1:p1&host=h2:p2&host=h3:p3" separately'
3347 )
3348 if isinstance(url.query["port"], (list, tuple)):
3349 ports_str = url.query["port"]
3350 elif isinstance(url.query["port"], str):
3351 ports_str = tuple(url.query["port"].split(","))
3352
3353 ports: Optional[Tuple[Optional[int], ...]] = None
3354
3355 if ports_str:
3356 try:
3357 ports = tuple(int(x) if x else None for x in ports_str)
3358 except ValueError:
3359 raise exc.ArgumentError(
3360 f"Received non-integer port arguments: {ports_str}"
3361 ) from None
3362
3363 if ports and (
3364 (not hosts and len(ports) > 1)
3365 or (
3366 hosts
3367 and ports
3368 and len(hosts) != len(ports)
3369 and (len(hosts) > 1 or len(ports) > 1)
3370 )
3371 ):
3372 raise exc.ArgumentError("number of hosts and ports don't match")
3373
3374 if hosts is not None:
3375 if ports is None:
3376 ports = tuple(None for _ in hosts)
3377
3378 return hosts, ports # type: ignore
3379
3380 def do_begin_twophase(self, connection, xid):
3381 self.do_begin(connection.connection)
3382
3383 def do_prepare_twophase(self, connection, xid):
3384 connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid)
3385
3386 def do_rollback_twophase(
3387 self, connection, xid, is_prepared=True, recover=False
3388 ):
3389 if is_prepared:
3390 if recover:
3391 # FIXME: ugly hack to get out of transaction
3392 # context when committing recoverable transactions
3393 # Must find out a way how to make the dbapi not
3394 # open a transaction.
3395 connection.exec_driver_sql("ROLLBACK")
3396 connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
3397 connection.exec_driver_sql("BEGIN")
3398 self.do_rollback(connection.connection)
3399 else:
3400 self.do_rollback(connection.connection)
3401
3402 def do_commit_twophase(
3403 self, connection, xid, is_prepared=True, recover=False
3404 ):
3405 if is_prepared:
3406 if recover:
3407 connection.exec_driver_sql("ROLLBACK")
3408 connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid)
3409 connection.exec_driver_sql("BEGIN")
3410 self.do_rollback(connection.connection)
3411 else:
3412 self.do_commit(connection.connection)
3413
3414 def do_recover_twophase(self, connection):
3415 return connection.scalars(
3416 sql.text("SELECT gid FROM pg_prepared_xacts")
3417 ).all()
3418
3419 def _get_default_schema_name(self, connection):
3420 return connection.exec_driver_sql("select current_schema()").scalar()
3421
3422 @reflection.cache
3423 def has_schema(self, connection, schema, **kw):
3424 query = select(pg_catalog.pg_namespace.c.nspname).where(
3425 pg_catalog.pg_namespace.c.nspname == schema
3426 )
3427 return bool(connection.scalar(query))
3428
3429 def _pg_class_filter_scope_schema(
3430 self, query, schema, scope, pg_class_table=None
3431 ):
3432 if pg_class_table is None:
3433 pg_class_table = pg_catalog.pg_class
3434 query = query.join(
3435 pg_catalog.pg_namespace,
3436 pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace,
3437 )
3438
3439 if scope is ObjectScope.DEFAULT:
3440 query = query.where(pg_class_table.c.relpersistence != "t")
3441 elif scope is ObjectScope.TEMPORARY:
3442 query = query.where(pg_class_table.c.relpersistence == "t")
3443
3444 if schema is None:
3445 query = query.where(
3446 pg_catalog.pg_table_is_visible(pg_class_table.c.oid),
3447 # ignore pg_catalog schema
3448 pg_catalog.pg_namespace.c.nspname != "pg_catalog",
3449 )
3450 else:
3451 query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
3452 return query
3453
3454 def _pg_class_relkind_condition(self, relkinds, pg_class_table=None):
3455 if pg_class_table is None:
3456 pg_class_table = pg_catalog.pg_class
3457 # uses the any form instead of in otherwise postgresql complaings
3458 # that 'IN could not convert type character to "char"'
3459 return pg_class_table.c.relkind == sql.any_(_array.array(relkinds))
3460
3461 @lru_cache()
3462 def _has_table_query(self, schema):
3463 query = select(pg_catalog.pg_class.c.relname).where(
3464 pg_catalog.pg_class.c.relname == bindparam("table_name"),
3465 self._pg_class_relkind_condition(
3466 pg_catalog.RELKINDS_ALL_TABLE_LIKE
3467 ),
3468 )
3469 return self._pg_class_filter_scope_schema(
3470 query, schema, scope=ObjectScope.ANY
3471 )
3472
3473 @reflection.cache
3474 def has_table(self, connection, table_name, schema=None, **kw):
3475 self._ensure_has_table_connection(connection)
3476 query = self._has_table_query(schema)
3477 return bool(connection.scalar(query, {"table_name": table_name}))
3478
3479 @reflection.cache
3480 def has_sequence(self, connection, sequence_name, schema=None, **kw):
3481 query = select(pg_catalog.pg_class.c.relname).where(
3482 pg_catalog.pg_class.c.relkind == "S",
3483 pg_catalog.pg_class.c.relname == sequence_name,
3484 )
3485 query = self._pg_class_filter_scope_schema(
3486 query, schema, scope=ObjectScope.ANY
3487 )
3488 return bool(connection.scalar(query))
3489
3490 @reflection.cache
3491 def has_type(self, connection, type_name, schema=None, **kw):
3492 query = (
3493 select(pg_catalog.pg_type.c.typname)
3494 .join(
3495 pg_catalog.pg_namespace,
3496 pg_catalog.pg_namespace.c.oid
3497 == pg_catalog.pg_type.c.typnamespace,
3498 )
3499 .where(pg_catalog.pg_type.c.typname == type_name)
3500 )
3501 if schema is None:
3502 query = query.where(
3503 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid),
3504 # ignore pg_catalog schema
3505 pg_catalog.pg_namespace.c.nspname != "pg_catalog",
3506 )
3507 elif schema != "*":
3508 query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
3509
3510 return bool(connection.scalar(query))
3511
3512 def _get_server_version_info(self, connection):
3513 v = connection.exec_driver_sql("select pg_catalog.version()").scalar()
3514 m = re.match(
3515 r".*(?:PostgreSQL|EnterpriseDB) "
3516 r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?",
3517 v,
3518 )
3519 if not m:
3520 raise AssertionError(
3521 "Could not determine version from string '%s'" % v
3522 )
3523 return tuple([int(x) for x in m.group(1, 2, 3) if x is not None])
3524
3525 @reflection.cache
3526 def get_table_oid(self, connection, table_name, schema=None, **kw):
3527 """Fetch the oid for schema.table_name."""
3528 query = select(pg_catalog.pg_class.c.oid).where(
3529 pg_catalog.pg_class.c.relname == table_name,
3530 self._pg_class_relkind_condition(
3531 pg_catalog.RELKINDS_ALL_TABLE_LIKE
3532 ),
3533 )
3534 query = self._pg_class_filter_scope_schema(
3535 query, schema, scope=ObjectScope.ANY
3536 )
3537 table_oid = connection.scalar(query)
3538 if table_oid is None:
3539 raise exc.NoSuchTableError(
3540 f"{schema}.{table_name}" if schema else table_name
3541 )
3542 return table_oid
3543
3544 @reflection.cache
3545 def get_schema_names(self, connection, **kw):
3546 query = (
3547 select(pg_catalog.pg_namespace.c.nspname)
3548 .where(pg_catalog.pg_namespace.c.nspname.not_like("pg_%"))
3549 .order_by(pg_catalog.pg_namespace.c.nspname)
3550 )
3551 return connection.scalars(query).all()
3552
3553 def _get_relnames_for_relkinds(self, connection, schema, relkinds, scope):
3554 query = select(pg_catalog.pg_class.c.relname).where(
3555 self._pg_class_relkind_condition(relkinds)
3556 )
3557 query = self._pg_class_filter_scope_schema(query, schema, scope=scope)
3558 return connection.scalars(query).all()
3559
3560 @reflection.cache
3561 def get_table_names(self, connection, schema=None, **kw):
3562 return self._get_relnames_for_relkinds(
3563 connection,
3564 schema,
3565 pg_catalog.RELKINDS_TABLE_NO_FOREIGN,
3566 scope=ObjectScope.DEFAULT,
3567 )
3568
3569 @reflection.cache
3570 def get_temp_table_names(self, connection, **kw):
3571 return self._get_relnames_for_relkinds(
3572 connection,
3573 schema=None,
3574 relkinds=pg_catalog.RELKINDS_TABLE_NO_FOREIGN,
3575 scope=ObjectScope.TEMPORARY,
3576 )
3577
3578 @reflection.cache
3579 def _get_foreign_table_names(self, connection, schema=None, **kw):
3580 return self._get_relnames_for_relkinds(
3581 connection, schema, relkinds=("f",), scope=ObjectScope.ANY
3582 )
3583
3584 @reflection.cache
3585 def get_view_names(self, connection, schema=None, **kw):
3586 return self._get_relnames_for_relkinds(
3587 connection,
3588 schema,
3589 pg_catalog.RELKINDS_VIEW,
3590 scope=ObjectScope.DEFAULT,
3591 )
3592
3593 @reflection.cache
3594 def get_materialized_view_names(self, connection, schema=None, **kw):
3595 return self._get_relnames_for_relkinds(
3596 connection,
3597 schema,
3598 pg_catalog.RELKINDS_MAT_VIEW,
3599 scope=ObjectScope.DEFAULT,
3600 )
3601
3602 @reflection.cache
3603 def get_temp_view_names(self, connection, schema=None, **kw):
3604 return self._get_relnames_for_relkinds(
3605 connection,
3606 schema,
3607 # NOTE: do not include temp materialzied views (that do not
3608 # seem to be a thing at least up to version 14)
3609 pg_catalog.RELKINDS_VIEW,
3610 scope=ObjectScope.TEMPORARY,
3611 )
3612
3613 @reflection.cache
3614 def get_sequence_names(self, connection, schema=None, **kw):
3615 return self._get_relnames_for_relkinds(
3616 connection, schema, relkinds=("S",), scope=ObjectScope.ANY
3617 )
3618
3619 @reflection.cache
3620 def get_view_definition(self, connection, view_name, schema=None, **kw):
3621 query = (
3622 select(pg_catalog.pg_get_viewdef(pg_catalog.pg_class.c.oid))
3623 .select_from(pg_catalog.pg_class)
3624 .where(
3625 pg_catalog.pg_class.c.relname == view_name,
3626 self._pg_class_relkind_condition(
3627 pg_catalog.RELKINDS_VIEW + pg_catalog.RELKINDS_MAT_VIEW
3628 ),
3629 )
3630 )
3631 query = self._pg_class_filter_scope_schema(
3632 query, schema, scope=ObjectScope.ANY
3633 )
3634 res = connection.scalar(query)
3635 if res is None:
3636 raise exc.NoSuchTableError(
3637 f"{schema}.{view_name}" if schema else view_name
3638 )
3639 else:
3640 return res
3641
3642 def _value_or_raise(self, data, table, schema):
3643 try:
3644 return dict(data)[(schema, table)]
3645 except KeyError:
3646 raise exc.NoSuchTableError(
3647 f"{schema}.{table}" if schema else table
3648 ) from None
3649
3650 def _prepare_filter_names(self, filter_names):
3651 if filter_names:
3652 return True, {"filter_names": filter_names}
3653 else:
3654 return False, {}
3655
3656 def _kind_to_relkinds(self, kind: ObjectKind) -> Tuple[str, ...]:
3657 if kind is ObjectKind.ANY:
3658 return pg_catalog.RELKINDS_ALL_TABLE_LIKE
3659 relkinds = ()
3660 if ObjectKind.TABLE in kind:
3661 relkinds += pg_catalog.RELKINDS_TABLE
3662 if ObjectKind.VIEW in kind:
3663 relkinds += pg_catalog.RELKINDS_VIEW
3664 if ObjectKind.MATERIALIZED_VIEW in kind:
3665 relkinds += pg_catalog.RELKINDS_MAT_VIEW
3666 return relkinds
3667
3668 @reflection.cache
3669 def get_columns(self, connection, table_name, schema=None, **kw):
3670 data = self.get_multi_columns(
3671 connection,
3672 schema=schema,
3673 filter_names=[table_name],
3674 scope=ObjectScope.ANY,
3675 kind=ObjectKind.ANY,
3676 **kw,
3677 )
3678 return self._value_or_raise(data, table_name, schema)
3679
3680 @lru_cache()
3681 def _columns_query(self, schema, has_filter_names, scope, kind):
3682 # NOTE: the query with the default and identity options scalar
3683 # subquery is faster than trying to use outer joins for them
3684 generated = (
3685 pg_catalog.pg_attribute.c.attgenerated.label("generated")
3686 if self.server_version_info >= (12,)
3687 else sql.null().label("generated")
3688 )
3689 if self.server_version_info >= (10,):
3690 # join lateral performs worse (~2x slower) than a scalar_subquery
3691 identity = (
3692 select(
3693 sql.func.json_build_object(
3694 "always",
3695 pg_catalog.pg_attribute.c.attidentity == "a",
3696 "start",
3697 pg_catalog.pg_sequence.c.seqstart,
3698 "increment",
3699 pg_catalog.pg_sequence.c.seqincrement,
3700 "minvalue",
3701 pg_catalog.pg_sequence.c.seqmin,
3702 "maxvalue",
3703 pg_catalog.pg_sequence.c.seqmax,
3704 "cache",
3705 pg_catalog.pg_sequence.c.seqcache,
3706 "cycle",
3707 pg_catalog.pg_sequence.c.seqcycle,
3708 type_=sqltypes.JSON(),
3709 )
3710 )
3711 .select_from(pg_catalog.pg_sequence)
3712 .where(
3713 # attidentity != '' is required or it will reflect also
3714 # serial columns as identity.
3715 pg_catalog.pg_attribute.c.attidentity != "",
3716 pg_catalog.pg_sequence.c.seqrelid
3717 == sql.cast(
3718 sql.cast(
3719 pg_catalog.pg_get_serial_sequence(
3720 sql.cast(
3721 sql.cast(
3722 pg_catalog.pg_attribute.c.attrelid,
3723 REGCLASS,
3724 ),
3725 TEXT,
3726 ),
3727 pg_catalog.pg_attribute.c.attname,
3728 ),
3729 REGCLASS,
3730 ),
3731 OID,
3732 ),
3733 )
3734 .correlate(pg_catalog.pg_attribute)
3735 .scalar_subquery()
3736 .label("identity_options")
3737 )
3738 else:
3739 identity = sql.null().label("identity_options")
3740
3741 # join lateral performs the same as scalar_subquery here
3742 default = (
3743 select(
3744 pg_catalog.pg_get_expr(
3745 pg_catalog.pg_attrdef.c.adbin,
3746 pg_catalog.pg_attrdef.c.adrelid,
3747 )
3748 )
3749 .select_from(pg_catalog.pg_attrdef)
3750 .where(
3751 pg_catalog.pg_attrdef.c.adrelid
3752 == pg_catalog.pg_attribute.c.attrelid,
3753 pg_catalog.pg_attrdef.c.adnum
3754 == pg_catalog.pg_attribute.c.attnum,
3755 pg_catalog.pg_attribute.c.atthasdef,
3756 )
3757 .correlate(pg_catalog.pg_attribute)
3758 .scalar_subquery()
3759 .label("default")
3760 )
3761 relkinds = self._kind_to_relkinds(kind)
3762 query = (
3763 select(
3764 pg_catalog.pg_attribute.c.attname.label("name"),
3765 pg_catalog.format_type(
3766 pg_catalog.pg_attribute.c.atttypid,
3767 pg_catalog.pg_attribute.c.atttypmod,
3768 ).label("format_type"),
3769 default,
3770 pg_catalog.pg_attribute.c.attnotnull.label("not_null"),
3771 pg_catalog.pg_class.c.relname.label("table_name"),
3772 pg_catalog.pg_description.c.description.label("comment"),
3773 generated,
3774 identity,
3775 )
3776 .select_from(pg_catalog.pg_class)
3777 # NOTE: postgresql support table with no user column, meaning
3778 # there is no row with pg_attribute.attnum > 0. use a left outer
3779 # join to avoid filtering these tables.
3780 .outerjoin(
3781 pg_catalog.pg_attribute,
3782 sql.and_(
3783 pg_catalog.pg_class.c.oid
3784 == pg_catalog.pg_attribute.c.attrelid,
3785 pg_catalog.pg_attribute.c.attnum > 0,
3786 ~pg_catalog.pg_attribute.c.attisdropped,
3787 ),
3788 )
3789 .outerjoin(
3790 pg_catalog.pg_description,
3791 sql.and_(
3792 pg_catalog.pg_description.c.objoid
3793 == pg_catalog.pg_attribute.c.attrelid,
3794 pg_catalog.pg_description.c.objsubid
3795 == pg_catalog.pg_attribute.c.attnum,
3796 ),
3797 )
3798 .where(self._pg_class_relkind_condition(relkinds))
3799 .order_by(
3800 pg_catalog.pg_class.c.relname, pg_catalog.pg_attribute.c.attnum
3801 )
3802 )
3803 query = self._pg_class_filter_scope_schema(query, schema, scope=scope)
3804 if has_filter_names:
3805 query = query.where(
3806 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
3807 )
3808 return query
3809
3810 def get_multi_columns(
3811 self, connection, schema, filter_names, scope, kind, **kw
3812 ):
3813 has_filter_names, params = self._prepare_filter_names(filter_names)
3814 query = self._columns_query(schema, has_filter_names, scope, kind)
3815 rows = connection.execute(query, params).mappings()
3816
3817 # dictionary with (name, ) if default search path or (schema, name)
3818 # as keys
3819 domains = {
3820 ((d["schema"], d["name"]) if not d["visible"] else (d["name"],)): d
3821 for d in self._load_domains(
3822 connection, schema="*", info_cache=kw.get("info_cache")
3823 )
3824 }
3825
3826 # dictionary with (name, ) if default search path or (schema, name)
3827 # as keys
3828 enums = dict(
3829 (
3830 ((rec["name"],), rec)
3831 if rec["visible"]
3832 else ((rec["schema"], rec["name"]), rec)
3833 )
3834 for rec in self._load_enums(
3835 connection, schema="*", info_cache=kw.get("info_cache")
3836 )
3837 )
3838
3839 columns = self._get_columns_info(rows, domains, enums, schema)
3840
3841 return columns.items()
3842
3843 _format_type_args_pattern = re.compile(r"\((.*)\)")
3844 _format_type_args_delim = re.compile(r"\s*,\s*")
3845 _format_array_spec_pattern = re.compile(r"((?:\[\])*)$")
3846
3847 def _reflect_type(
3848 self,
3849 format_type: Optional[str],
3850 domains: Dict[str, ReflectedDomain],
3851 enums: Dict[str, ReflectedEnum],
3852 type_description: str,
3853 ) -> sqltypes.TypeEngine[Any]:
3854 """
3855 Attempts to reconstruct a column type defined in ischema_names based
3856 on the information available in the format_type.
3857
3858 If the `format_type` cannot be associated with a known `ischema_names`,
3859 it is treated as a reference to a known PostgreSQL named `ENUM` or
3860 `DOMAIN` type.
3861 """
3862 type_description = type_description or "unknown type"
3863 if format_type is None:
3864 util.warn(
3865 "PostgreSQL format_type() returned NULL for %s"
3866 % type_description
3867 )
3868 return sqltypes.NULLTYPE
3869
3870 attype_args_match = self._format_type_args_pattern.search(format_type)
3871 if attype_args_match and attype_args_match.group(1):
3872 attype_args = self._format_type_args_delim.split(
3873 attype_args_match.group(1)
3874 )
3875 else:
3876 attype_args = ()
3877
3878 match_array_dim = self._format_array_spec_pattern.search(format_type)
3879 # Each "[]" in array specs corresponds to an array dimension
3880 array_dim = len(match_array_dim.group(1) or "") // 2
3881
3882 # Remove all parameters and array specs from format_type to obtain an
3883 # ischema_name candidate
3884 attype = self._format_type_args_pattern.sub("", format_type)
3885 attype = self._format_array_spec_pattern.sub("", attype)
3886
3887 schema_type = self.ischema_names.get(attype.lower(), None)
3888 args, kwargs = (), {}
3889
3890 if attype == "numeric":
3891 if len(attype_args) == 2:
3892 precision, scale = map(int, attype_args)
3893 args = (precision, scale)
3894
3895 elif attype == "double precision":
3896 args = (53,)
3897
3898 elif attype == "integer":
3899 args = ()
3900
3901 elif attype in ("timestamp with time zone", "time with time zone"):
3902 kwargs["timezone"] = True
3903 if len(attype_args) == 1:
3904 kwargs["precision"] = int(attype_args[0])
3905
3906 elif attype in (
3907 "timestamp without time zone",
3908 "time without time zone",
3909 "time",
3910 ):
3911 kwargs["timezone"] = False
3912 if len(attype_args) == 1:
3913 kwargs["precision"] = int(attype_args[0])
3914
3915 elif attype == "bit varying":
3916 kwargs["varying"] = True
3917 if len(attype_args) == 1:
3918 charlen = int(attype_args[0])
3919 args = (charlen,)
3920
3921 elif attype.startswith("interval"):
3922 schema_type = INTERVAL
3923
3924 field_match = re.match(r"interval (.+)", attype)
3925 if field_match:
3926 kwargs["fields"] = field_match.group(1)
3927
3928 if len(attype_args) == 1:
3929 kwargs["precision"] = int(attype_args[0])
3930
3931 else:
3932 enum_or_domain_key = tuple(util.quoted_token_parser(attype))
3933
3934 if enum_or_domain_key in enums:
3935 schema_type = ENUM
3936 enum = enums[enum_or_domain_key]
3937
3938 kwargs["name"] = enum["name"]
3939
3940 if not enum["visible"]:
3941 kwargs["schema"] = enum["schema"]
3942 args = tuple(enum["labels"])
3943 elif enum_or_domain_key in domains:
3944 schema_type = DOMAIN
3945 domain = domains[enum_or_domain_key]
3946
3947 data_type = self._reflect_type(
3948 domain["type"],
3949 domains,
3950 enums,
3951 type_description="DOMAIN '%s'" % domain["name"],
3952 )
3953 args = (domain["name"], data_type)
3954
3955 kwargs["collation"] = domain["collation"]
3956 kwargs["default"] = domain["default"]
3957 kwargs["not_null"] = not domain["nullable"]
3958 kwargs["create_type"] = False
3959
3960 if domain["constraints"]:
3961 # We only support a single constraint
3962 check_constraint = domain["constraints"][0]
3963
3964 kwargs["constraint_name"] = check_constraint["name"]
3965 kwargs["check"] = check_constraint["check"]
3966
3967 if not domain["visible"]:
3968 kwargs["schema"] = domain["schema"]
3969
3970 else:
3971 try:
3972 charlen = int(attype_args[0])
3973 args = (charlen, *attype_args[1:])
3974 except (ValueError, IndexError):
3975 args = attype_args
3976
3977 if not schema_type:
3978 util.warn(
3979 "Did not recognize type '%s' of %s"
3980 % (attype, type_description)
3981 )
3982 return sqltypes.NULLTYPE
3983
3984 data_type = schema_type(*args, **kwargs)
3985 if array_dim >= 1:
3986 # postgres does not preserve dimensionality or size of array types.
3987 data_type = _array.ARRAY(data_type)
3988
3989 return data_type
3990
3991 def _get_columns_info(self, rows, domains, enums, schema):
3992 columns = defaultdict(list)
3993 for row_dict in rows:
3994 # ensure that each table has an entry, even if it has no columns
3995 if row_dict["name"] is None:
3996 columns[(schema, row_dict["table_name"])] = (
3997 ReflectionDefaults.columns()
3998 )
3999 continue
4000 table_cols = columns[(schema, row_dict["table_name"])]
4001
4002 coltype = self._reflect_type(
4003 row_dict["format_type"],
4004 domains,
4005 enums,
4006 type_description="column '%s'" % row_dict["name"],
4007 )
4008
4009 default = row_dict["default"]
4010 name = row_dict["name"]
4011 generated = row_dict["generated"]
4012 nullable = not row_dict["not_null"]
4013
4014 if isinstance(coltype, DOMAIN):
4015 if not default:
4016 # domain can override the default value but
4017 # cant set it to None
4018 if coltype.default is not None:
4019 default = coltype.default
4020
4021 nullable = nullable and not coltype.not_null
4022
4023 identity = row_dict["identity_options"]
4024
4025 # If a zero byte or blank string depending on driver (is also
4026 # absent for older PG versions), then not a generated column.
4027 # Otherwise, s = stored. (Other values might be added in the
4028 # future.)
4029 if generated not in (None, "", b"\x00"):
4030 computed = dict(
4031 sqltext=default, persisted=generated in ("s", b"s")
4032 )
4033 default = None
4034 else:
4035 computed = None
4036
4037 # adjust the default value
4038 autoincrement = False
4039 if default is not None:
4040 match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
4041 if match is not None:
4042 if issubclass(coltype._type_affinity, sqltypes.Integer):
4043 autoincrement = True
4044 # the default is related to a Sequence
4045 if "." not in match.group(2) and schema is not None:
4046 # unconditionally quote the schema name. this could
4047 # later be enhanced to obey quoting rules /
4048 # "quote schema"
4049 default = (
4050 match.group(1)
4051 + ('"%s"' % schema)
4052 + "."
4053 + match.group(2)
4054 + match.group(3)
4055 )
4056
4057 column_info = {
4058 "name": name,
4059 "type": coltype,
4060 "nullable": nullable,
4061 "default": default,
4062 "autoincrement": autoincrement or identity is not None,
4063 "comment": row_dict["comment"],
4064 }
4065 if computed is not None:
4066 column_info["computed"] = computed
4067 if identity is not None:
4068 column_info["identity"] = identity
4069
4070 table_cols.append(column_info)
4071
4072 return columns
4073
4074 @lru_cache()
4075 def _table_oids_query(self, schema, has_filter_names, scope, kind):
4076 relkinds = self._kind_to_relkinds(kind)
4077 oid_q = select(
4078 pg_catalog.pg_class.c.oid, pg_catalog.pg_class.c.relname
4079 ).where(self._pg_class_relkind_condition(relkinds))
4080 oid_q = self._pg_class_filter_scope_schema(oid_q, schema, scope=scope)
4081
4082 if has_filter_names:
4083 oid_q = oid_q.where(
4084 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
4085 )
4086 return oid_q
4087
4088 @reflection.flexi_cache(
4089 ("schema", InternalTraversal.dp_string),
4090 ("filter_names", InternalTraversal.dp_string_list),
4091 ("kind", InternalTraversal.dp_plain_obj),
4092 ("scope", InternalTraversal.dp_plain_obj),
4093 )
4094 def _get_table_oids(
4095 self, connection, schema, filter_names, scope, kind, **kw
4096 ):
4097 has_filter_names, params = self._prepare_filter_names(filter_names)
4098 oid_q = self._table_oids_query(schema, has_filter_names, scope, kind)
4099 result = connection.execute(oid_q, params)
4100 return result.all()
4101
4102 @util.memoized_property
4103 def _constraint_query(self):
4104 if self.server_version_info >= (11, 0):
4105 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts
4106 else:
4107 indnkeyatts = sql.null().label("indnkeyatts")
4108
4109 if self.server_version_info >= (15,):
4110 indnullsnotdistinct = pg_catalog.pg_index.c.indnullsnotdistinct
4111 else:
4112 indnullsnotdistinct = sql.false().label("indnullsnotdistinct")
4113
4114 con_sq = (
4115 select(
4116 pg_catalog.pg_constraint.c.conrelid,
4117 pg_catalog.pg_constraint.c.conname,
4118 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"),
4119 sql.func.generate_subscripts(
4120 pg_catalog.pg_index.c.indkey, 1
4121 ).label("ord"),
4122 indnkeyatts,
4123 indnullsnotdistinct,
4124 pg_catalog.pg_description.c.description,
4125 )
4126 .join(
4127 pg_catalog.pg_index,
4128 pg_catalog.pg_constraint.c.conindid
4129 == pg_catalog.pg_index.c.indexrelid,
4130 )
4131 .outerjoin(
4132 pg_catalog.pg_description,
4133 pg_catalog.pg_description.c.objoid
4134 == pg_catalog.pg_constraint.c.oid,
4135 )
4136 .where(
4137 pg_catalog.pg_constraint.c.contype == bindparam("contype"),
4138 pg_catalog.pg_constraint.c.conrelid.in_(bindparam("oids")),
4139 # NOTE: filtering also on pg_index.indrelid for oids does
4140 # not seem to have a performance effect, but it may be an
4141 # option if perf problems are reported
4142 )
4143 .subquery("con")
4144 )
4145
4146 attr_sq = (
4147 select(
4148 con_sq.c.conrelid,
4149 con_sq.c.conname,
4150 con_sq.c.description,
4151 con_sq.c.ord,
4152 con_sq.c.indnkeyatts,
4153 con_sq.c.indnullsnotdistinct,
4154 pg_catalog.pg_attribute.c.attname,
4155 )
4156 .select_from(pg_catalog.pg_attribute)
4157 .join(
4158 con_sq,
4159 sql.and_(
4160 pg_catalog.pg_attribute.c.attnum == con_sq.c.attnum,
4161 pg_catalog.pg_attribute.c.attrelid == con_sq.c.conrelid,
4162 ),
4163 )
4164 .where(
4165 # NOTE: restate the condition here, since pg15 otherwise
4166 # seems to get confused on pscopg2 sometimes, doing
4167 # a sequential scan of pg_attribute.
4168 # The condition in the con_sq subquery is not actually needed
4169 # in pg15, but it may be needed in older versions. Keeping it
4170 # does not seems to have any inpact in any case.
4171 con_sq.c.conrelid.in_(bindparam("oids"))
4172 )
4173 .subquery("attr")
4174 )
4175
4176 return (
4177 select(
4178 attr_sq.c.conrelid,
4179 sql.func.array_agg(
4180 # NOTE: cast since some postgresql derivatives may
4181 # not support array_agg on the name type
4182 aggregate_order_by(
4183 attr_sq.c.attname.cast(TEXT), attr_sq.c.ord
4184 )
4185 ).label("cols"),
4186 attr_sq.c.conname,
4187 sql.func.min(attr_sq.c.description).label("description"),
4188 sql.func.min(attr_sq.c.indnkeyatts).label("indnkeyatts"),
4189 sql.func.bool_and(attr_sq.c.indnullsnotdistinct).label(
4190 "indnullsnotdistinct"
4191 ),
4192 )
4193 .group_by(attr_sq.c.conrelid, attr_sq.c.conname)
4194 .order_by(attr_sq.c.conrelid, attr_sq.c.conname)
4195 )
4196
4197 def _reflect_constraint(
4198 self, connection, contype, schema, filter_names, scope, kind, **kw
4199 ):
4200 # used to reflect primary and unique constraint
4201 table_oids = self._get_table_oids(
4202 connection, schema, filter_names, scope, kind, **kw
4203 )
4204 batches = list(table_oids)
4205 is_unique = contype == "u"
4206
4207 while batches:
4208 batch = batches[0:3000]
4209 batches[0:3000] = []
4210
4211 result = connection.execute(
4212 self._constraint_query,
4213 {"oids": [r[0] for r in batch], "contype": contype},
4214 ).mappings()
4215
4216 result_by_oid = defaultdict(list)
4217 for row_dict in result:
4218 result_by_oid[row_dict["conrelid"]].append(row_dict)
4219
4220 for oid, tablename in batch:
4221 for_oid = result_by_oid.get(oid, ())
4222 if for_oid:
4223 for row in for_oid:
4224 # See note in get_multi_indexes
4225 all_cols = row["cols"]
4226 indnkeyatts = row["indnkeyatts"]
4227 if (
4228 indnkeyatts is not None
4229 and len(all_cols) > indnkeyatts
4230 ):
4231 inc_cols = all_cols[indnkeyatts:]
4232 cst_cols = all_cols[:indnkeyatts]
4233 else:
4234 inc_cols = []
4235 cst_cols = all_cols
4236
4237 opts = {}
4238 if self.server_version_info >= (11,):
4239 opts["postgresql_include"] = inc_cols
4240 if is_unique:
4241 opts["postgresql_nulls_not_distinct"] = row[
4242 "indnullsnotdistinct"
4243 ]
4244 yield (
4245 tablename,
4246 cst_cols,
4247 row["conname"],
4248 row["description"],
4249 opts,
4250 )
4251 else:
4252 yield tablename, None, None, None, None
4253
4254 @reflection.cache
4255 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
4256 data = self.get_multi_pk_constraint(
4257 connection,
4258 schema=schema,
4259 filter_names=[table_name],
4260 scope=ObjectScope.ANY,
4261 kind=ObjectKind.ANY,
4262 **kw,
4263 )
4264 return self._value_or_raise(data, table_name, schema)
4265
4266 def get_multi_pk_constraint(
4267 self, connection, schema, filter_names, scope, kind, **kw
4268 ):
4269 result = self._reflect_constraint(
4270 connection, "p", schema, filter_names, scope, kind, **kw
4271 )
4272
4273 # only a single pk can be present for each table. Return an entry
4274 # even if a table has no primary key
4275 default = ReflectionDefaults.pk_constraint
4276
4277 def pk_constraint(pk_name, cols, comment, opts):
4278 info = {
4279 "constrained_columns": cols,
4280 "name": pk_name,
4281 "comment": comment,
4282 }
4283 if opts:
4284 info["dialect_options"] = opts
4285 return info
4286
4287 return (
4288 (
4289 (schema, table_name),
4290 (
4291 pk_constraint(pk_name, cols, comment, opts)
4292 if pk_name is not None
4293 else default()
4294 ),
4295 )
4296 for table_name, cols, pk_name, comment, opts in result
4297 )
4298
4299 @reflection.cache
4300 def get_foreign_keys(
4301 self,
4302 connection,
4303 table_name,
4304 schema=None,
4305 postgresql_ignore_search_path=False,
4306 **kw,
4307 ):
4308 data = self.get_multi_foreign_keys(
4309 connection,
4310 schema=schema,
4311 filter_names=[table_name],
4312 postgresql_ignore_search_path=postgresql_ignore_search_path,
4313 scope=ObjectScope.ANY,
4314 kind=ObjectKind.ANY,
4315 **kw,
4316 )
4317 return self._value_or_raise(data, table_name, schema)
4318
4319 @lru_cache()
4320 def _foreing_key_query(self, schema, has_filter_names, scope, kind):
4321 pg_class_ref = pg_catalog.pg_class.alias("cls_ref")
4322 pg_namespace_ref = pg_catalog.pg_namespace.alias("nsp_ref")
4323 relkinds = self._kind_to_relkinds(kind)
4324 query = (
4325 select(
4326 pg_catalog.pg_class.c.relname,
4327 pg_catalog.pg_constraint.c.conname,
4328 # NOTE: avoid calling pg_get_constraintdef when not needed
4329 # to speed up the query
4330 sql.case(
4331 (
4332 pg_catalog.pg_constraint.c.oid.is_not(None),
4333 pg_catalog.pg_get_constraintdef(
4334 pg_catalog.pg_constraint.c.oid, True
4335 ),
4336 ),
4337 else_=None,
4338 ),
4339 pg_namespace_ref.c.nspname,
4340 pg_catalog.pg_description.c.description,
4341 )
4342 .select_from(pg_catalog.pg_class)
4343 .outerjoin(
4344 pg_catalog.pg_constraint,
4345 sql.and_(
4346 pg_catalog.pg_class.c.oid
4347 == pg_catalog.pg_constraint.c.conrelid,
4348 pg_catalog.pg_constraint.c.contype == "f",
4349 ),
4350 )
4351 .outerjoin(
4352 pg_class_ref,
4353 pg_class_ref.c.oid == pg_catalog.pg_constraint.c.confrelid,
4354 )
4355 .outerjoin(
4356 pg_namespace_ref,
4357 pg_class_ref.c.relnamespace == pg_namespace_ref.c.oid,
4358 )
4359 .outerjoin(
4360 pg_catalog.pg_description,
4361 pg_catalog.pg_description.c.objoid
4362 == pg_catalog.pg_constraint.c.oid,
4363 )
4364 .order_by(
4365 pg_catalog.pg_class.c.relname,
4366 pg_catalog.pg_constraint.c.conname,
4367 )
4368 .where(self._pg_class_relkind_condition(relkinds))
4369 )
4370 query = self._pg_class_filter_scope_schema(query, schema, scope)
4371 if has_filter_names:
4372 query = query.where(
4373 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
4374 )
4375 return query
4376
4377 @util.memoized_property
4378 def _fk_regex_pattern(self):
4379 # optionally quoted token
4380 qtoken = '(?:"[^"]+"|[A-Za-z0-9_]+?)'
4381
4382 # https://www.postgresql.org/docs/current/static/sql-createtable.html
4383 return re.compile(
4384 r"FOREIGN KEY \((.*?)\) "
4385 rf"REFERENCES (?:({qtoken})\.)?({qtoken})\(((?:{qtoken}(?: *, *)?)+)\)" # noqa: E501
4386 r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?"
4387 r"[\s]?(ON UPDATE "
4388 r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
4389 r"[\s]?(ON DELETE "
4390 r"(CASCADE|RESTRICT|NO ACTION|"
4391 r"SET (?:NULL|DEFAULT)(?:\s\(.+\))?)+)?"
4392 r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?"
4393 r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?"
4394 )
4395
4396 def get_multi_foreign_keys(
4397 self,
4398 connection,
4399 schema,
4400 filter_names,
4401 scope,
4402 kind,
4403 postgresql_ignore_search_path=False,
4404 **kw,
4405 ):
4406 preparer = self.identifier_preparer
4407
4408 has_filter_names, params = self._prepare_filter_names(filter_names)
4409 query = self._foreing_key_query(schema, has_filter_names, scope, kind)
4410 result = connection.execute(query, params)
4411
4412 FK_REGEX = self._fk_regex_pattern
4413
4414 fkeys = defaultdict(list)
4415 default = ReflectionDefaults.foreign_keys
4416 for table_name, conname, condef, conschema, comment in result:
4417 # ensure that each table has an entry, even if it has
4418 # no foreign keys
4419 if conname is None:
4420 fkeys[(schema, table_name)] = default()
4421 continue
4422 table_fks = fkeys[(schema, table_name)]
4423 m = re.search(FK_REGEX, condef).groups()
4424
4425 (
4426 constrained_columns,
4427 referred_schema,
4428 referred_table,
4429 referred_columns,
4430 _,
4431 match,
4432 _,
4433 onupdate,
4434 _,
4435 ondelete,
4436 deferrable,
4437 _,
4438 initially,
4439 ) = m
4440
4441 if deferrable is not None:
4442 deferrable = True if deferrable == "DEFERRABLE" else False
4443 constrained_columns = [
4444 preparer._unquote_identifier(x)
4445 for x in re.split(r"\s*,\s*", constrained_columns)
4446 ]
4447
4448 if postgresql_ignore_search_path:
4449 # when ignoring search path, we use the actual schema
4450 # provided it isn't the "default" schema
4451 if conschema != self.default_schema_name:
4452 referred_schema = conschema
4453 else:
4454 referred_schema = schema
4455 elif referred_schema:
4456 # referred_schema is the schema that we regexp'ed from
4457 # pg_get_constraintdef(). If the schema is in the search
4458 # path, pg_get_constraintdef() will give us None.
4459 referred_schema = preparer._unquote_identifier(referred_schema)
4460 elif schema is not None and schema == conschema:
4461 # If the actual schema matches the schema of the table
4462 # we're reflecting, then we will use that.
4463 referred_schema = schema
4464
4465 referred_table = preparer._unquote_identifier(referred_table)
4466 referred_columns = [
4467 preparer._unquote_identifier(x)
4468 for x in re.split(r"\s*,\s", referred_columns)
4469 ]
4470 options = {
4471 k: v
4472 for k, v in [
4473 ("onupdate", onupdate),
4474 ("ondelete", ondelete),
4475 ("initially", initially),
4476 ("deferrable", deferrable),
4477 ("match", match),
4478 ]
4479 if v is not None and v != "NO ACTION"
4480 }
4481 fkey_d = {
4482 "name": conname,
4483 "constrained_columns": constrained_columns,
4484 "referred_schema": referred_schema,
4485 "referred_table": referred_table,
4486 "referred_columns": referred_columns,
4487 "options": options,
4488 "comment": comment,
4489 }
4490 table_fks.append(fkey_d)
4491 return fkeys.items()
4492
4493 @reflection.cache
4494 def get_indexes(self, connection, table_name, schema=None, **kw):
4495 data = self.get_multi_indexes(
4496 connection,
4497 schema=schema,
4498 filter_names=[table_name],
4499 scope=ObjectScope.ANY,
4500 kind=ObjectKind.ANY,
4501 **kw,
4502 )
4503 return self._value_or_raise(data, table_name, schema)
4504
4505 @util.memoized_property
4506 def _index_query(self):
4507 # NOTE: pg_index is used as from two times to improve performance,
4508 # since extraing all the index information from `idx_sq` to avoid
4509 # the second pg_index use leads to a worse performing query in
4510 # particular when querying for a single table (as of pg 17)
4511 # NOTE: repeating oids clause improve query performance
4512
4513 # subquery to get the columns
4514 idx_sq = (
4515 select(
4516 pg_catalog.pg_index.c.indexrelid,
4517 pg_catalog.pg_index.c.indrelid,
4518 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"),
4519 sql.func.generate_subscripts(
4520 pg_catalog.pg_index.c.indkey, 1
4521 ).label("ord"),
4522 )
4523 .where(
4524 ~pg_catalog.pg_index.c.indisprimary,
4525 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")),
4526 )
4527 .subquery("idx")
4528 )
4529
4530 attr_sq = (
4531 select(
4532 idx_sq.c.indexrelid,
4533 idx_sq.c.indrelid,
4534 idx_sq.c.ord,
4535 # NOTE: always using pg_get_indexdef is too slow so just
4536 # invoke when the element is an expression
4537 sql.case(
4538 (
4539 idx_sq.c.attnum == 0,
4540 pg_catalog.pg_get_indexdef(
4541 idx_sq.c.indexrelid, idx_sq.c.ord + 1, True
4542 ),
4543 ),
4544 # NOTE: need to cast this since attname is of type "name"
4545 # that's limited to 63 bytes, while pg_get_indexdef
4546 # returns "text" so its output may get cut
4547 else_=pg_catalog.pg_attribute.c.attname.cast(TEXT),
4548 ).label("element"),
4549 (idx_sq.c.attnum == 0).label("is_expr"),
4550 )
4551 .select_from(idx_sq)
4552 .outerjoin(
4553 # do not remove rows where idx_sq.c.attnum is 0
4554 pg_catalog.pg_attribute,
4555 sql.and_(
4556 pg_catalog.pg_attribute.c.attnum == idx_sq.c.attnum,
4557 pg_catalog.pg_attribute.c.attrelid == idx_sq.c.indrelid,
4558 ),
4559 )
4560 .where(idx_sq.c.indrelid.in_(bindparam("oids")))
4561 .subquery("idx_attr")
4562 )
4563
4564 cols_sq = (
4565 select(
4566 attr_sq.c.indexrelid,
4567 sql.func.min(attr_sq.c.indrelid),
4568 sql.func.array_agg(
4569 aggregate_order_by(attr_sq.c.element, attr_sq.c.ord)
4570 ).label("elements"),
4571 sql.func.array_agg(
4572 aggregate_order_by(attr_sq.c.is_expr, attr_sq.c.ord)
4573 ).label("elements_is_expr"),
4574 )
4575 .group_by(attr_sq.c.indexrelid)
4576 .subquery("idx_cols")
4577 )
4578
4579 if self.server_version_info >= (11, 0):
4580 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts
4581 else:
4582 indnkeyatts = sql.null().label("indnkeyatts")
4583
4584 if self.server_version_info >= (15,):
4585 nulls_not_distinct = pg_catalog.pg_index.c.indnullsnotdistinct
4586 else:
4587 nulls_not_distinct = sql.false().label("indnullsnotdistinct")
4588
4589 return (
4590 select(
4591 pg_catalog.pg_index.c.indrelid,
4592 pg_catalog.pg_class.c.relname,
4593 pg_catalog.pg_index.c.indisunique,
4594 pg_catalog.pg_constraint.c.conrelid.is_not(None).label(
4595 "has_constraint"
4596 ),
4597 pg_catalog.pg_index.c.indoption,
4598 pg_catalog.pg_class.c.reloptions,
4599 pg_catalog.pg_am.c.amname,
4600 # NOTE: pg_get_expr is very fast so this case has almost no
4601 # performance impact
4602 sql.case(
4603 (
4604 pg_catalog.pg_index.c.indpred.is_not(None),
4605 pg_catalog.pg_get_expr(
4606 pg_catalog.pg_index.c.indpred,
4607 pg_catalog.pg_index.c.indrelid,
4608 ),
4609 ),
4610 else_=None,
4611 ).label("filter_definition"),
4612 indnkeyatts,
4613 nulls_not_distinct,
4614 cols_sq.c.elements,
4615 cols_sq.c.elements_is_expr,
4616 )
4617 .select_from(pg_catalog.pg_index)
4618 .where(
4619 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")),
4620 ~pg_catalog.pg_index.c.indisprimary,
4621 )
4622 .join(
4623 pg_catalog.pg_class,
4624 pg_catalog.pg_index.c.indexrelid == pg_catalog.pg_class.c.oid,
4625 )
4626 .join(
4627 pg_catalog.pg_am,
4628 pg_catalog.pg_class.c.relam == pg_catalog.pg_am.c.oid,
4629 )
4630 .outerjoin(
4631 cols_sq,
4632 pg_catalog.pg_index.c.indexrelid == cols_sq.c.indexrelid,
4633 )
4634 .outerjoin(
4635 pg_catalog.pg_constraint,
4636 sql.and_(
4637 pg_catalog.pg_index.c.indrelid
4638 == pg_catalog.pg_constraint.c.conrelid,
4639 pg_catalog.pg_index.c.indexrelid
4640 == pg_catalog.pg_constraint.c.conindid,
4641 pg_catalog.pg_constraint.c.contype
4642 == sql.any_(_array.array(("p", "u", "x"))),
4643 ),
4644 )
4645 .order_by(
4646 pg_catalog.pg_index.c.indrelid, pg_catalog.pg_class.c.relname
4647 )
4648 )
4649
4650 def get_multi_indexes(
4651 self, connection, schema, filter_names, scope, kind, **kw
4652 ):
4653 table_oids = self._get_table_oids(
4654 connection, schema, filter_names, scope, kind, **kw
4655 )
4656
4657 indexes = defaultdict(list)
4658 default = ReflectionDefaults.indexes
4659
4660 batches = list(table_oids)
4661
4662 while batches:
4663 batch = batches[0:3000]
4664 batches[0:3000] = []
4665
4666 result = connection.execute(
4667 self._index_query, {"oids": [r[0] for r in batch]}
4668 ).mappings()
4669
4670 result_by_oid = defaultdict(list)
4671 for row_dict in result:
4672 result_by_oid[row_dict["indrelid"]].append(row_dict)
4673
4674 for oid, table_name in batch:
4675 if oid not in result_by_oid:
4676 # ensure that each table has an entry, even if reflection
4677 # is skipped because not supported
4678 indexes[(schema, table_name)] = default()
4679 continue
4680
4681 for row in result_by_oid[oid]:
4682 index_name = row["relname"]
4683
4684 table_indexes = indexes[(schema, table_name)]
4685
4686 all_elements = row["elements"]
4687 all_elements_is_expr = row["elements_is_expr"]
4688 indnkeyatts = row["indnkeyatts"]
4689 # "The number of key columns in the index, not counting any
4690 # included columns, which are merely stored and do not
4691 # participate in the index semantics"
4692 if (
4693 indnkeyatts is not None
4694 and len(all_elements) > indnkeyatts
4695 ):
4696 # this is a "covering index" which has INCLUDE columns
4697 # as well as regular index columns
4698 inc_cols = all_elements[indnkeyatts:]
4699 idx_elements = all_elements[:indnkeyatts]
4700 idx_elements_is_expr = all_elements_is_expr[
4701 :indnkeyatts
4702 ]
4703 # postgresql does not support expression on included
4704 # columns as of v14: "ERROR: expressions are not
4705 # supported in included columns".
4706 assert all(
4707 not is_expr
4708 for is_expr in all_elements_is_expr[indnkeyatts:]
4709 )
4710 else:
4711 idx_elements = all_elements
4712 idx_elements_is_expr = all_elements_is_expr
4713 inc_cols = []
4714
4715 index = {"name": index_name, "unique": row["indisunique"]}
4716 if any(idx_elements_is_expr):
4717 index["column_names"] = [
4718 None if is_expr else expr
4719 for expr, is_expr in zip(
4720 idx_elements, idx_elements_is_expr
4721 )
4722 ]
4723 index["expressions"] = idx_elements
4724 else:
4725 index["column_names"] = idx_elements
4726
4727 sorting = {}
4728 for col_index, col_flags in enumerate(row["indoption"]):
4729 col_sorting = ()
4730 # try to set flags only if they differ from PG
4731 # defaults...
4732 if col_flags & 0x01:
4733 col_sorting += ("desc",)
4734 if not (col_flags & 0x02):
4735 col_sorting += ("nulls_last",)
4736 else:
4737 if col_flags & 0x02:
4738 col_sorting += ("nulls_first",)
4739 if col_sorting:
4740 sorting[idx_elements[col_index]] = col_sorting
4741 if sorting:
4742 index["column_sorting"] = sorting
4743 if row["has_constraint"]:
4744 index["duplicates_constraint"] = index_name
4745
4746 dialect_options = {}
4747 if row["reloptions"]:
4748 dialect_options["postgresql_with"] = dict(
4749 [
4750 option.split("=", 1)
4751 for option in row["reloptions"]
4752 ]
4753 )
4754 # it *might* be nice to include that this is 'btree' in the
4755 # reflection info. But we don't want an Index object
4756 # to have a ``postgresql_using`` in it that is just the
4757 # default, so for the moment leaving this out.
4758 amname = row["amname"]
4759 if amname != "btree":
4760 dialect_options["postgresql_using"] = row["amname"]
4761 if row["filter_definition"]:
4762 dialect_options["postgresql_where"] = row[
4763 "filter_definition"
4764 ]
4765 if self.server_version_info >= (11,):
4766 # NOTE: this is legacy, this is part of
4767 # dialect_options now as of #7382
4768 index["include_columns"] = inc_cols
4769 dialect_options["postgresql_include"] = inc_cols
4770 if row["indnullsnotdistinct"]:
4771 # the default is False, so ignore it.
4772 dialect_options["postgresql_nulls_not_distinct"] = row[
4773 "indnullsnotdistinct"
4774 ]
4775
4776 if dialect_options:
4777 index["dialect_options"] = dialect_options
4778
4779 table_indexes.append(index)
4780 return indexes.items()
4781
4782 @reflection.cache
4783 def get_unique_constraints(
4784 self, connection, table_name, schema=None, **kw
4785 ):
4786 data = self.get_multi_unique_constraints(
4787 connection,
4788 schema=schema,
4789 filter_names=[table_name],
4790 scope=ObjectScope.ANY,
4791 kind=ObjectKind.ANY,
4792 **kw,
4793 )
4794 return self._value_or_raise(data, table_name, schema)
4795
4796 def get_multi_unique_constraints(
4797 self,
4798 connection,
4799 schema,
4800 filter_names,
4801 scope,
4802 kind,
4803 **kw,
4804 ):
4805 result = self._reflect_constraint(
4806 connection, "u", schema, filter_names, scope, kind, **kw
4807 )
4808
4809 # each table can have multiple unique constraints
4810 uniques = defaultdict(list)
4811 default = ReflectionDefaults.unique_constraints
4812 for table_name, cols, con_name, comment, options in result:
4813 # ensure a list is created for each table. leave it empty if
4814 # the table has no unique cosntraint
4815 if con_name is None:
4816 uniques[(schema, table_name)] = default()
4817 continue
4818
4819 uc_dict = {
4820 "column_names": cols,
4821 "name": con_name,
4822 "comment": comment,
4823 }
4824 if options:
4825 uc_dict["dialect_options"] = options
4826
4827 uniques[(schema, table_name)].append(uc_dict)
4828 return uniques.items()
4829
4830 @reflection.cache
4831 def get_table_comment(self, connection, table_name, schema=None, **kw):
4832 data = self.get_multi_table_comment(
4833 connection,
4834 schema,
4835 [table_name],
4836 scope=ObjectScope.ANY,
4837 kind=ObjectKind.ANY,
4838 **kw,
4839 )
4840 return self._value_or_raise(data, table_name, schema)
4841
4842 @lru_cache()
4843 def _comment_query(self, schema, has_filter_names, scope, kind):
4844 relkinds = self._kind_to_relkinds(kind)
4845 query = (
4846 select(
4847 pg_catalog.pg_class.c.relname,
4848 pg_catalog.pg_description.c.description,
4849 )
4850 .select_from(pg_catalog.pg_class)
4851 .outerjoin(
4852 pg_catalog.pg_description,
4853 sql.and_(
4854 pg_catalog.pg_class.c.oid
4855 == pg_catalog.pg_description.c.objoid,
4856 pg_catalog.pg_description.c.objsubid == 0,
4857 pg_catalog.pg_description.c.classoid
4858 == sql.func.cast("pg_catalog.pg_class", REGCLASS),
4859 ),
4860 )
4861 .where(self._pg_class_relkind_condition(relkinds))
4862 )
4863 query = self._pg_class_filter_scope_schema(query, schema, scope)
4864 if has_filter_names:
4865 query = query.where(
4866 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
4867 )
4868 return query
4869
4870 def get_multi_table_comment(
4871 self, connection, schema, filter_names, scope, kind, **kw
4872 ):
4873 has_filter_names, params = self._prepare_filter_names(filter_names)
4874 query = self._comment_query(schema, has_filter_names, scope, kind)
4875 result = connection.execute(query, params)
4876
4877 default = ReflectionDefaults.table_comment
4878 return (
4879 (
4880 (schema, table),
4881 {"text": comment} if comment is not None else default(),
4882 )
4883 for table, comment in result
4884 )
4885
4886 @reflection.cache
4887 def get_check_constraints(self, connection, table_name, schema=None, **kw):
4888 data = self.get_multi_check_constraints(
4889 connection,
4890 schema,
4891 [table_name],
4892 scope=ObjectScope.ANY,
4893 kind=ObjectKind.ANY,
4894 **kw,
4895 )
4896 return self._value_or_raise(data, table_name, schema)
4897
4898 @lru_cache()
4899 def _check_constraint_query(self, schema, has_filter_names, scope, kind):
4900 relkinds = self._kind_to_relkinds(kind)
4901 query = (
4902 select(
4903 pg_catalog.pg_class.c.relname,
4904 pg_catalog.pg_constraint.c.conname,
4905 # NOTE: avoid calling pg_get_constraintdef when not needed
4906 # to speed up the query
4907 sql.case(
4908 (
4909 pg_catalog.pg_constraint.c.oid.is_not(None),
4910 pg_catalog.pg_get_constraintdef(
4911 pg_catalog.pg_constraint.c.oid, True
4912 ),
4913 ),
4914 else_=None,
4915 ),
4916 pg_catalog.pg_description.c.description,
4917 )
4918 .select_from(pg_catalog.pg_class)
4919 .outerjoin(
4920 pg_catalog.pg_constraint,
4921 sql.and_(
4922 pg_catalog.pg_class.c.oid
4923 == pg_catalog.pg_constraint.c.conrelid,
4924 pg_catalog.pg_constraint.c.contype == "c",
4925 ),
4926 )
4927 .outerjoin(
4928 pg_catalog.pg_description,
4929 pg_catalog.pg_description.c.objoid
4930 == pg_catalog.pg_constraint.c.oid,
4931 )
4932 .order_by(
4933 pg_catalog.pg_class.c.relname,
4934 pg_catalog.pg_constraint.c.conname,
4935 )
4936 .where(self._pg_class_relkind_condition(relkinds))
4937 )
4938 query = self._pg_class_filter_scope_schema(query, schema, scope)
4939 if has_filter_names:
4940 query = query.where(
4941 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
4942 )
4943 return query
4944
4945 def get_multi_check_constraints(
4946 self, connection, schema, filter_names, scope, kind, **kw
4947 ):
4948 has_filter_names, params = self._prepare_filter_names(filter_names)
4949 query = self._check_constraint_query(
4950 schema, has_filter_names, scope, kind
4951 )
4952 result = connection.execute(query, params)
4953
4954 check_constraints = defaultdict(list)
4955 default = ReflectionDefaults.check_constraints
4956 for table_name, check_name, src, comment in result:
4957 # only two cases for check_name and src: both null or both defined
4958 if check_name is None and src is None:
4959 check_constraints[(schema, table_name)] = default()
4960 continue
4961 # samples:
4962 # "CHECK (((a > 1) AND (a < 5)))"
4963 # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))"
4964 # "CHECK (((a > 1) AND (a < 5))) NOT VALID"
4965 # "CHECK (some_boolean_function(a))"
4966 # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)"
4967 # "CHECK (a NOT NULL) NO INHERIT"
4968 # "CHECK (a NOT NULL) NO INHERIT NOT VALID"
4969
4970 m = re.match(
4971 r"^CHECK *\((.+)\)( NO INHERIT)?( NOT VALID)?$",
4972 src,
4973 flags=re.DOTALL,
4974 )
4975 if not m:
4976 util.warn("Could not parse CHECK constraint text: %r" % src)
4977 sqltext = ""
4978 else:
4979 sqltext = re.compile(
4980 r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL
4981 ).sub(r"\1", m.group(1))
4982 entry = {
4983 "name": check_name,
4984 "sqltext": sqltext,
4985 "comment": comment,
4986 }
4987 if m:
4988 do = {}
4989 if " NOT VALID" in m.groups():
4990 do["not_valid"] = True
4991 if " NO INHERIT" in m.groups():
4992 do["no_inherit"] = True
4993 if do:
4994 entry["dialect_options"] = do
4995
4996 check_constraints[(schema, table_name)].append(entry)
4997 return check_constraints.items()
4998
4999 def _pg_type_filter_schema(self, query, schema):
5000 if schema is None:
5001 query = query.where(
5002 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid),
5003 # ignore pg_catalog schema
5004 pg_catalog.pg_namespace.c.nspname != "pg_catalog",
5005 )
5006 elif schema != "*":
5007 query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
5008 return query
5009
5010 @lru_cache()
5011 def _enum_query(self, schema):
5012 lbl_agg_sq = (
5013 select(
5014 pg_catalog.pg_enum.c.enumtypid,
5015 sql.func.array_agg(
5016 aggregate_order_by(
5017 # NOTE: cast since some postgresql derivatives may
5018 # not support array_agg on the name type
5019 pg_catalog.pg_enum.c.enumlabel.cast(TEXT),
5020 pg_catalog.pg_enum.c.enumsortorder,
5021 )
5022 ).label("labels"),
5023 )
5024 .group_by(pg_catalog.pg_enum.c.enumtypid)
5025 .subquery("lbl_agg")
5026 )
5027
5028 query = (
5029 select(
5030 pg_catalog.pg_type.c.typname.label("name"),
5031 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label(
5032 "visible"
5033 ),
5034 pg_catalog.pg_namespace.c.nspname.label("schema"),
5035 lbl_agg_sq.c.labels.label("labels"),
5036 )
5037 .join(
5038 pg_catalog.pg_namespace,
5039 pg_catalog.pg_namespace.c.oid
5040 == pg_catalog.pg_type.c.typnamespace,
5041 )
5042 .outerjoin(
5043 lbl_agg_sq, pg_catalog.pg_type.c.oid == lbl_agg_sq.c.enumtypid
5044 )
5045 .where(pg_catalog.pg_type.c.typtype == "e")
5046 .order_by(
5047 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname
5048 )
5049 )
5050
5051 return self._pg_type_filter_schema(query, schema)
5052
5053 @reflection.cache
5054 def _load_enums(self, connection, schema=None, **kw):
5055 if not self.supports_native_enum:
5056 return []
5057
5058 result = connection.execute(self._enum_query(schema))
5059
5060 enums = []
5061 for name, visible, schema, labels in result:
5062 enums.append(
5063 {
5064 "name": name,
5065 "schema": schema,
5066 "visible": visible,
5067 "labels": [] if labels is None else labels,
5068 }
5069 )
5070 return enums
5071
5072 @lru_cache()
5073 def _domain_query(self, schema):
5074 con_sq = (
5075 select(
5076 pg_catalog.pg_constraint.c.contypid,
5077 sql.func.array_agg(
5078 pg_catalog.pg_get_constraintdef(
5079 pg_catalog.pg_constraint.c.oid, True
5080 )
5081 ).label("condefs"),
5082 sql.func.array_agg(
5083 # NOTE: cast since some postgresql derivatives may
5084 # not support array_agg on the name type
5085 pg_catalog.pg_constraint.c.conname.cast(TEXT)
5086 ).label("connames"),
5087 )
5088 # The domain this constraint is on; zero if not a domain constraint
5089 .where(pg_catalog.pg_constraint.c.contypid != 0)
5090 .group_by(pg_catalog.pg_constraint.c.contypid)
5091 .subquery("domain_constraints")
5092 )
5093
5094 query = (
5095 select(
5096 pg_catalog.pg_type.c.typname.label("name"),
5097 pg_catalog.format_type(
5098 pg_catalog.pg_type.c.typbasetype,
5099 pg_catalog.pg_type.c.typtypmod,
5100 ).label("attype"),
5101 (~pg_catalog.pg_type.c.typnotnull).label("nullable"),
5102 pg_catalog.pg_type.c.typdefault.label("default"),
5103 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label(
5104 "visible"
5105 ),
5106 pg_catalog.pg_namespace.c.nspname.label("schema"),
5107 con_sq.c.condefs,
5108 con_sq.c.connames,
5109 pg_catalog.pg_collation.c.collname,
5110 )
5111 .join(
5112 pg_catalog.pg_namespace,
5113 pg_catalog.pg_namespace.c.oid
5114 == pg_catalog.pg_type.c.typnamespace,
5115 )
5116 .outerjoin(
5117 pg_catalog.pg_collation,
5118 pg_catalog.pg_type.c.typcollation
5119 == pg_catalog.pg_collation.c.oid,
5120 )
5121 .outerjoin(
5122 con_sq,
5123 pg_catalog.pg_type.c.oid == con_sq.c.contypid,
5124 )
5125 .where(pg_catalog.pg_type.c.typtype == "d")
5126 .order_by(
5127 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname
5128 )
5129 )
5130 return self._pg_type_filter_schema(query, schema)
5131
5132 @reflection.cache
5133 def _load_domains(self, connection, schema=None, **kw):
5134 result = connection.execute(self._domain_query(schema))
5135
5136 domains: List[ReflectedDomain] = []
5137 for domain in result.mappings():
5138 # strip (30) from character varying(30)
5139 attype = re.search(r"([^\(]+)", domain["attype"]).group(1)
5140 constraints: List[ReflectedDomainConstraint] = []
5141 if domain["connames"]:
5142 # When a domain has multiple CHECK constraints, they will
5143 # be tested in alphabetical order by name.
5144 sorted_constraints = sorted(
5145 zip(domain["connames"], domain["condefs"]),
5146 key=lambda t: t[0],
5147 )
5148 for name, def_ in sorted_constraints:
5149 # constraint is in the form "CHECK (expression)"
5150 # or "NOT NULL". Ignore the "NOT NULL" and
5151 # remove "CHECK (" and the tailing ")".
5152 if def_.casefold().startswith("check"):
5153 check = def_[7:-1]
5154 constraints.append({"name": name, "check": check})
5155 domain_rec: ReflectedDomain = {
5156 "name": domain["name"],
5157 "schema": domain["schema"],
5158 "visible": domain["visible"],
5159 "type": attype,
5160 "nullable": domain["nullable"],
5161 "default": domain["default"],
5162 "constraints": constraints,
5163 "collation": domain["collname"],
5164 }
5165 domains.append(domain_rec)
5166
5167 return domains
5168
5169 def _set_backslash_escapes(self, connection):
5170 # this method is provided as an override hook for descendant
5171 # dialects (e.g. Redshift), so removing it may break them
5172 std_string = connection.exec_driver_sql(
5173 "show standard_conforming_strings"
5174 ).scalar()
5175 self._backslash_escapes = std_string == "off"