1# dialects/postgresql/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9r"""
10.. dialect:: postgresql
11 :name: PostgreSQL
12 :normal_support: 9.6+
13 :best_effort: 9+
14
15.. _postgresql_sequences:
16
17Sequences/SERIAL/IDENTITY
18-------------------------
19
20PostgreSQL supports sequences, and SQLAlchemy uses these as the default means
21of creating new primary key values for integer-based primary key columns. When
22creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for
23integer-based primary key columns, which generates a sequence and server side
24default corresponding to the column.
25
26To specify a specific named sequence to be used for primary key generation,
27use the :func:`~sqlalchemy.schema.Sequence` construct::
28
29 Table(
30 "sometable",
31 metadata,
32 Column(
33 "id", Integer, Sequence("some_id_seq", start=1), primary_key=True
34 ),
35 )
36
37When SQLAlchemy issues a single INSERT statement, to fulfill the contract of
38having the "last insert identifier" available, a RETURNING clause is added to
39the INSERT statement which specifies the primary key columns should be
40returned after the statement completes. The RETURNING functionality only takes
41place if PostgreSQL 8.2 or later is in use. As a fallback approach, the
42sequence, whether specified explicitly or implicitly via ``SERIAL``, is
43executed independently beforehand, the returned value to be used in the
44subsequent insert. Note that when an
45:func:`~sqlalchemy.sql.expression.insert()` construct is executed using
46"executemany" semantics, the "last inserted identifier" functionality does not
47apply; no RETURNING clause is emitted nor is the sequence pre-executed in this
48case.
49
50
51PostgreSQL 10 and above IDENTITY columns
52^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
53
54PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use
55of SERIAL. The :class:`_schema.Identity` construct in a
56:class:`_schema.Column` can be used to control its behavior::
57
58 from sqlalchemy import Table, Column, MetaData, Integer, Computed
59
60 metadata = MetaData()
61
62 data = Table(
63 "data",
64 metadata,
65 Column(
66 "id", Integer, Identity(start=42, cycle=True), primary_key=True
67 ),
68 Column("data", String),
69 )
70
71The CREATE TABLE for the above :class:`_schema.Table` object would be:
72
73.. sourcecode:: sql
74
75 CREATE TABLE data (
76 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE),
77 data VARCHAR,
78 PRIMARY KEY (id)
79 )
80
81.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
82 in a :class:`_schema.Column` to specify the option of an autoincrementing
83 column.
84
85.. note::
86
87 Previous versions of SQLAlchemy did not have built-in support for rendering
88 of IDENTITY, and could use the following compilation hook to replace
89 occurrences of SERIAL with IDENTITY::
90
91 from sqlalchemy.schema import CreateColumn
92 from sqlalchemy.ext.compiler import compiles
93
94
95 @compiles(CreateColumn, "postgresql")
96 def use_identity(element, compiler, **kw):
97 text = compiler.visit_create_column(element, **kw)
98 text = text.replace("SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY")
99 return text
100
101 Using the above, a table such as::
102
103 t = Table(
104 "t", m, Column("id", Integer, primary_key=True), Column("data", String)
105 )
106
107 Will generate on the backing database as:
108
109 .. sourcecode:: sql
110
111 CREATE TABLE t (
112 id INT GENERATED BY DEFAULT AS IDENTITY,
113 data VARCHAR,
114 PRIMARY KEY (id)
115 )
116
117.. _postgresql_ss_cursors:
118
119Server Side Cursors
120-------------------
121
122Server-side cursor support is available for the psycopg2, asyncpg
123dialects and may also be available in others.
124
125Server side cursors are enabled on a per-statement basis by using the
126:paramref:`.Connection.execution_options.stream_results` connection execution
127option::
128
129 with engine.connect() as conn:
130 result = conn.execution_options(stream_results=True).execute(
131 text("select * from table")
132 )
133
134Note that some kinds of SQL statements may not be supported with
135server side cursors; generally, only SQL statements that return rows should be
136used with this option.
137
138.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated
139 and will be removed in a future release. Please use the
140 :paramref:`_engine.Connection.stream_results` execution option for
141 unbuffered cursor support.
142
143.. seealso::
144
145 :ref:`engine_stream_results`
146
147.. _postgresql_isolation_level:
148
149Transaction Isolation Level
150---------------------------
151
152Most SQLAlchemy dialects support setting of transaction isolation level
153using the :paramref:`_sa.create_engine.isolation_level` parameter
154at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection`
155level via the :paramref:`.Connection.execution_options.isolation_level`
156parameter.
157
158For PostgreSQL dialects, this feature works either by making use of the
159DBAPI-specific features, such as psycopg2's isolation level flags which will
160embed the isolation level setting inline with the ``"BEGIN"`` statement, or for
161DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS
162TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement
163emitted by the DBAPI. For the special AUTOCOMMIT isolation level,
164DBAPI-specific techniques are used which is typically an ``.autocommit``
165flag on the DBAPI connection object.
166
167To set isolation level using :func:`_sa.create_engine`::
168
169 engine = create_engine(
170 "postgresql+pg8000://scott:tiger@localhost/test",
171 isolation_level="REPEATABLE READ",
172 )
173
174To set using per-connection execution options::
175
176 with engine.connect() as conn:
177 conn = conn.execution_options(isolation_level="REPEATABLE READ")
178 with conn.begin():
179 ... # work with transaction
180
181There are also more options for isolation level configurations, such as
182"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
183different isolation level settings. See the discussion at
184:ref:`dbapi_autocommit` for background.
185
186Valid values for ``isolation_level`` on most PostgreSQL dialects include:
187
188* ``READ COMMITTED``
189* ``READ UNCOMMITTED``
190* ``REPEATABLE READ``
191* ``SERIALIZABLE``
192* ``AUTOCOMMIT``
193
194.. seealso::
195
196 :ref:`dbapi_autocommit`
197
198 :ref:`postgresql_readonly_deferrable`
199
200 :ref:`psycopg2_isolation_level`
201
202 :ref:`pg8000_isolation_level`
203
204.. _postgresql_readonly_deferrable:
205
206Setting READ ONLY / DEFERRABLE
207------------------------------
208
209Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE"
210characteristics of the transaction, which is in addition to the isolation level
211setting. These two attributes can be established either in conjunction with or
212independently of the isolation level by passing the ``postgresql_readonly`` and
213``postgresql_deferrable`` flags with
214:meth:`_engine.Connection.execution_options`. The example below illustrates
215passing the ``"SERIALIZABLE"`` isolation level at the same time as setting
216"READ ONLY" and "DEFERRABLE"::
217
218 with engine.connect() as conn:
219 conn = conn.execution_options(
220 isolation_level="SERIALIZABLE",
221 postgresql_readonly=True,
222 postgresql_deferrable=True,
223 )
224 with conn.begin():
225 ... # work with transaction
226
227Note that some DBAPIs such as asyncpg only support "readonly" with
228SERIALIZABLE isolation.
229
230.. versionadded:: 1.4 added support for the ``postgresql_readonly``
231 and ``postgresql_deferrable`` execution options.
232
233.. _postgresql_reset_on_return:
234
235Temporary Table / Resource Reset for Connection Pooling
236-------------------------------------------------------
237
238The :class:`.QueuePool` connection pool implementation used
239by the SQLAlchemy :class:`.Engine` object includes
240:ref:`reset on return <pool_reset_on_return>` behavior that will invoke
241the DBAPI ``.rollback()`` method when connections are returned to the pool.
242While this rollback will clear out the immediate state used by the previous
243transaction, it does not cover a wider range of session-level state, including
244temporary tables as well as other server state such as prepared statement
245handles and statement caches. The PostgreSQL database includes a variety
246of commands which may be used to reset this state, including
247``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``.
248
249
250To install
251one or more of these commands as the means of performing reset-on-return,
252the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated
253in the example below. The implementation
254will end transactions in progress as well as discard temporary tables
255using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL
256documentation for background on what each of these statements do.
257
258The :paramref:`_sa.create_engine.pool_reset_on_return` parameter
259is set to ``None`` so that the custom scheme can replace the default behavior
260completely. The custom hook implementation calls ``.rollback()`` in any case,
261as it's usually important that the DBAPI's own tracking of commit/rollback
262will remain consistent with the state of the transaction::
263
264
265 from sqlalchemy import create_engine
266 from sqlalchemy import event
267
268 postgresql_engine = create_engine(
269 "postgresql+psycopg2://scott:tiger@hostname/dbname",
270 # disable default reset-on-return scheme
271 pool_reset_on_return=None,
272 )
273
274
275 @event.listens_for(postgresql_engine, "reset")
276 def _reset_postgresql(dbapi_connection, connection_record, reset_state):
277 if not reset_state.terminate_only:
278 dbapi_connection.execute("CLOSE ALL")
279 dbapi_connection.execute("RESET ALL")
280 dbapi_connection.execute("DISCARD TEMP")
281
282 # so that the DBAPI itself knows that the connection has been
283 # reset
284 dbapi_connection.rollback()
285
286.. versionchanged:: 2.0.0b3 Added additional state arguments to
287 the :meth:`.PoolEvents.reset` event and additionally ensured the event
288 is invoked for all "reset" occurrences, so that it's appropriate
289 as a place for custom "reset" handlers. Previous schemes which
290 use the :meth:`.PoolEvents.checkin` handler remain usable as well.
291
292.. seealso::
293
294 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation
295
296.. _postgresql_alternate_search_path:
297
298Setting Alternate Search Paths on Connect
299------------------------------------------
300
301The PostgreSQL ``search_path`` variable refers to the list of schema names
302that will be implicitly referenced when a particular table or other
303object is referenced in a SQL statement. As detailed in the next section
304:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around
305the concept of keeping this variable at its default value of ``public``,
306however, in order to have it set to any arbitrary name or names when connections
307are used automatically, the "SET SESSION search_path" command may be invoked
308for all connections in a pool using the following event handler, as discussed
309at :ref:`schema_set_default_connections`::
310
311 from sqlalchemy import event
312 from sqlalchemy import create_engine
313
314 engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname")
315
316
317 @event.listens_for(engine, "connect", insert=True)
318 def set_search_path(dbapi_connection, connection_record):
319 existing_autocommit = dbapi_connection.autocommit
320 dbapi_connection.autocommit = True
321 cursor = dbapi_connection.cursor()
322 cursor.execute("SET SESSION search_path='%s'" % schema_name)
323 cursor.close()
324 dbapi_connection.autocommit = existing_autocommit
325
326The reason the recipe is complicated by use of the ``.autocommit`` DBAPI
327attribute is so that when the ``SET SESSION search_path`` directive is invoked,
328it is invoked outside of the scope of any transaction and therefore will not
329be reverted when the DBAPI connection has a rollback.
330
331.. seealso::
332
333 :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation
334
335.. _postgresql_schema_reflection:
336
337Remote-Schema Table Introspection and PostgreSQL search_path
338------------------------------------------------------------
339
340.. admonition:: Section Best Practices Summarized
341
342 keep the ``search_path`` variable set to its default of ``public``, without
343 any other schema names. Ensure the username used to connect **does not**
344 match remote schemas, or ensure the ``"$user"`` token is **removed** from
345 ``search_path``. For other schema names, name these explicitly
346 within :class:`_schema.Table` definitions. Alternatively, the
347 ``postgresql_ignore_search_path`` option will cause all reflected
348 :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema`
349 attribute set up.
350
351The PostgreSQL dialect can reflect tables from any schema, as outlined in
352:ref:`metadata_reflection_schemas`.
353
354In all cases, the first thing SQLAlchemy does when reflecting tables is
355to **determine the default schema for the current database connection**.
356It does this using the PostgreSQL ``current_schema()``
357function, illustated below using a PostgreSQL client session (i.e. using
358the ``psql`` tool):
359
360.. sourcecode:: sql
361
362 test=> select current_schema();
363 current_schema
364 ----------------
365 public
366 (1 row)
367
368Above we see that on a plain install of PostgreSQL, the default schema name
369is the name ``public``.
370
371However, if your database username **matches the name of a schema**, PostgreSQL's
372default is to then **use that name as the default schema**. Below, we log in
373using the username ``scott``. When we create a schema named ``scott``, **it
374implicitly changes the default schema**:
375
376.. sourcecode:: sql
377
378 test=> select current_schema();
379 current_schema
380 ----------------
381 public
382 (1 row)
383
384 test=> create schema scott;
385 CREATE SCHEMA
386 test=> select current_schema();
387 current_schema
388 ----------------
389 scott
390 (1 row)
391
392The behavior of ``current_schema()`` is derived from the
393`PostgreSQL search path
394<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
395variable ``search_path``, which in modern PostgreSQL versions defaults to this:
396
397.. sourcecode:: sql
398
399 test=> show search_path;
400 search_path
401 -----------------
402 "$user", public
403 (1 row)
404
405Where above, the ``"$user"`` variable will inject the current username as the
406default schema, if one exists. Otherwise, ``public`` is used.
407
408When a :class:`_schema.Table` object is reflected, if it is present in the
409schema indicated by the ``current_schema()`` function, **the schema name assigned
410to the ".schema" attribute of the Table is the Python "None" value**. Otherwise, the
411".schema" attribute will be assigned the string name of that schema.
412
413With regards to tables which these :class:`_schema.Table`
414objects refer to via foreign key constraint, a decision must be made as to how
415the ``.schema`` is represented in those remote tables, in the case where that
416remote schema name is also a member of the current ``search_path``.
417
418By default, the PostgreSQL dialect mimics the behavior encouraged by
419PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function
420returns a sample definition for a particular foreign key constraint,
421omitting the referenced schema name from that definition when the name is
422also in the PostgreSQL schema search path. The interaction below
423illustrates this behavior:
424
425.. sourcecode:: sql
426
427 test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY);
428 CREATE TABLE
429 test=> CREATE TABLE referring(
430 test(> id INTEGER PRIMARY KEY,
431 test(> referred_id INTEGER REFERENCES test_schema.referred(id));
432 CREATE TABLE
433 test=> SET search_path TO public, test_schema;
434 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
435 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
436 test-> ON n.oid = c.relnamespace
437 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
438 test-> WHERE c.relname='referring' AND r.contype = 'f'
439 test-> ;
440 pg_get_constraintdef
441 ---------------------------------------------------
442 FOREIGN KEY (referred_id) REFERENCES referred(id)
443 (1 row)
444
445Above, we created a table ``referred`` as a member of the remote schema
446``test_schema``, however when we added ``test_schema`` to the
447PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the
448``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of
449the function.
450
451On the other hand, if we set the search path back to the typical default
452of ``public``:
453
454.. sourcecode:: sql
455
456 test=> SET search_path TO public;
457 SET
458
459The same query against ``pg_get_constraintdef()`` now returns the fully
460schema-qualified name for us:
461
462.. sourcecode:: sql
463
464 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
465 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
466 test-> ON n.oid = c.relnamespace
467 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
468 test-> WHERE c.relname='referring' AND r.contype = 'f';
469 pg_get_constraintdef
470 ---------------------------------------------------------------
471 FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id)
472 (1 row)
473
474SQLAlchemy will by default use the return value of ``pg_get_constraintdef()``
475in order to determine the remote schema name. That is, if our ``search_path``
476were set to include ``test_schema``, and we invoked a table
477reflection process as follows::
478
479 >>> from sqlalchemy import Table, MetaData, create_engine, text
480 >>> engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
481 >>> with engine.connect() as conn:
482 ... conn.execute(text("SET search_path TO test_schema, public"))
483 ... metadata_obj = MetaData()
484 ... referring = Table("referring", metadata_obj, autoload_with=conn)
485 <sqlalchemy.engine.result.CursorResult object at 0x101612ed0>
486
487The above process would deliver to the :attr:`_schema.MetaData.tables`
488collection
489``referred`` table named **without** the schema::
490
491 >>> metadata_obj.tables["referred"].schema is None
492 True
493
494To alter the behavior of reflection such that the referred schema is
495maintained regardless of the ``search_path`` setting, use the
496``postgresql_ignore_search_path`` option, which can be specified as a
497dialect-specific argument to both :class:`_schema.Table` as well as
498:meth:`_schema.MetaData.reflect`::
499
500 >>> with engine.connect() as conn:
501 ... conn.execute(text("SET search_path TO test_schema, public"))
502 ... metadata_obj = MetaData()
503 ... referring = Table(
504 ... "referring",
505 ... metadata_obj,
506 ... autoload_with=conn,
507 ... postgresql_ignore_search_path=True,
508 ... )
509 <sqlalchemy.engine.result.CursorResult object at 0x1016126d0>
510
511We will now have ``test_schema.referred`` stored as schema-qualified::
512
513 >>> metadata_obj.tables["test_schema.referred"].schema
514 'test_schema'
515
516.. sidebar:: Best Practices for PostgreSQL Schema reflection
517
518 The description of PostgreSQL schema reflection behavior is complex, and
519 is the product of many years of dealing with widely varied use cases and
520 user preferences. But in fact, there's no need to understand any of it if
521 you just stick to the simplest use pattern: leave the ``search_path`` set
522 to its default of ``public`` only, never refer to the name ``public`` as
523 an explicit schema name otherwise, and refer to all other schema names
524 explicitly when building up a :class:`_schema.Table` object. The options
525 described here are only for those users who can't, or prefer not to, stay
526 within these guidelines.
527
528.. seealso::
529
530 :ref:`reflection_schema_qualified_interaction` - discussion of the issue
531 from a backend-agnostic perspective
532
533 `The Schema Search Path
534 <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
535 - on the PostgreSQL website.
536
537INSERT/UPDATE...RETURNING
538-------------------------
539
540The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and
541``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default
542for single-row INSERT statements in order to fetch newly generated
543primary key identifiers. To specify an explicit ``RETURNING`` clause,
544use the :meth:`._UpdateBase.returning` method on a per-statement basis::
545
546 # INSERT..RETURNING
547 result = (
548 table.insert().returning(table.c.col1, table.c.col2).values(name="foo")
549 )
550 print(result.fetchall())
551
552 # UPDATE..RETURNING
553 result = (
554 table.update()
555 .returning(table.c.col1, table.c.col2)
556 .where(table.c.name == "foo")
557 .values(name="bar")
558 )
559 print(result.fetchall())
560
561 # DELETE..RETURNING
562 result = (
563 table.delete()
564 .returning(table.c.col1, table.c.col2)
565 .where(table.c.name == "foo")
566 )
567 print(result.fetchall())
568
569.. _postgresql_insert_on_conflict:
570
571INSERT...ON CONFLICT (Upsert)
572------------------------------
573
574Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of
575rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A
576candidate row will only be inserted if that row does not violate any unique
577constraints. In the case of a unique constraint violation, a secondary action
578can occur which can be either "DO UPDATE", indicating that the data in the
579target row should be updated, or "DO NOTHING", which indicates to silently skip
580this row.
581
582Conflicts are determined using existing unique constraints and indexes. These
583constraints may be identified either using their name as stated in DDL,
584or they may be inferred by stating the columns and conditions that comprise
585the indexes.
586
587SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific
588:func:`_postgresql.insert()` function, which provides
589the generative methods :meth:`_postgresql.Insert.on_conflict_do_update`
590and :meth:`~.postgresql.Insert.on_conflict_do_nothing`:
591
592.. sourcecode:: pycon+sql
593
594 >>> from sqlalchemy.dialects.postgresql import insert
595 >>> insert_stmt = insert(my_table).values(
596 ... id="some_existing_id", data="inserted value"
597 ... )
598 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"])
599 >>> print(do_nothing_stmt)
600 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
601 ON CONFLICT (id) DO NOTHING
602 {stop}
603
604 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
605 ... constraint="pk_my_table", set_=dict(data="updated value")
606 ... )
607 >>> print(do_update_stmt)
608 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
609 ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s
610
611.. seealso::
612
613 `INSERT .. ON CONFLICT
614 <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_
615 - in the PostgreSQL documentation.
616
617Specifying the Target
618^^^^^^^^^^^^^^^^^^^^^
619
620Both methods supply the "target" of the conflict using either the
621named constraint or by column inference:
622
623* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument
624 specifies a sequence containing string column names, :class:`_schema.Column`
625 objects, and/or SQL expression elements, which would identify a unique
626 index:
627
628 .. sourcecode:: pycon+sql
629
630 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
631 ... index_elements=["id"], set_=dict(data="updated value")
632 ... )
633 >>> print(do_update_stmt)
634 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
635 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
636 {stop}
637
638 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
639 ... index_elements=[my_table.c.id], set_=dict(data="updated value")
640 ... )
641 >>> print(do_update_stmt)
642 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
643 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
644
645* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to
646 infer an index, a partial index can be inferred by also specifying the
647 use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter:
648
649 .. sourcecode:: pycon+sql
650
651 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data")
652 >>> stmt = stmt.on_conflict_do_update(
653 ... index_elements=[my_table.c.user_email],
654 ... index_where=my_table.c.user_email.like("%@gmail.com"),
655 ... set_=dict(data=stmt.excluded.data),
656 ... )
657 >>> print(stmt)
658 {printsql}INSERT INTO my_table (data, user_email)
659 VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email)
660 WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data
661
662* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is
663 used to specify an index directly rather than inferring it. This can be
664 the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX:
665
666 .. sourcecode:: pycon+sql
667
668 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
669 ... constraint="my_table_idx_1", set_=dict(data="updated value")
670 ... )
671 >>> print(do_update_stmt)
672 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
673 ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s
674 {stop}
675
676 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
677 ... constraint="my_table_pk", set_=dict(data="updated value")
678 ... )
679 >>> print(do_update_stmt)
680 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
681 ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s
682 {stop}
683
684* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may
685 also refer to a SQLAlchemy construct representing a constraint,
686 e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`,
687 :class:`.Index`, or :class:`.ExcludeConstraint`. In this use,
688 if the constraint has a name, it is used directly. Otherwise, if the
689 constraint is unnamed, then inference will be used, where the expressions
690 and optional WHERE clause of the constraint will be spelled out in the
691 construct. This use is especially convenient
692 to refer to the named or unnamed primary key of a :class:`_schema.Table`
693 using the
694 :attr:`_schema.Table.primary_key` attribute:
695
696 .. sourcecode:: pycon+sql
697
698 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
699 ... constraint=my_table.primary_key, set_=dict(data="updated value")
700 ... )
701 >>> print(do_update_stmt)
702 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
703 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
704
705The SET Clause
706^^^^^^^^^^^^^^^
707
708``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
709existing row, using any combination of new values as well as values
710from the proposed insertion. These values are specified using the
711:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This
712parameter accepts a dictionary which consists of direct values
713for UPDATE:
714
715.. sourcecode:: pycon+sql
716
717 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
718 >>> do_update_stmt = stmt.on_conflict_do_update(
719 ... index_elements=["id"], set_=dict(data="updated value")
720 ... )
721 >>> print(do_update_stmt)
722 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
723 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
724
725.. warning::
726
727 The :meth:`_expression.Insert.on_conflict_do_update`
728 method does **not** take into
729 account Python-side default UPDATE values or generation functions, e.g.
730 those specified using :paramref:`_schema.Column.onupdate`.
731 These values will not be exercised for an ON CONFLICT style of UPDATE,
732 unless they are manually specified in the
733 :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary.
734
735Updating using the Excluded INSERT Values
736^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
737
738In order to refer to the proposed insertion row, the special alias
739:attr:`~.postgresql.Insert.excluded` is available as an attribute on
740the :class:`_postgresql.Insert` object; this object is a
741:class:`_expression.ColumnCollection`
742which alias contains all columns of the target
743table:
744
745.. sourcecode:: pycon+sql
746
747 >>> stmt = insert(my_table).values(
748 ... id="some_id", data="inserted value", author="jlh"
749 ... )
750 >>> do_update_stmt = stmt.on_conflict_do_update(
751 ... index_elements=["id"],
752 ... set_=dict(data="updated value", author=stmt.excluded.author),
753 ... )
754 >>> print(do_update_stmt)
755 {printsql}INSERT INTO my_table (id, data, author)
756 VALUES (%(id)s, %(data)s, %(author)s)
757 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
758
759Additional WHERE Criteria
760^^^^^^^^^^^^^^^^^^^^^^^^^
761
762The :meth:`_expression.Insert.on_conflict_do_update` method also accepts
763a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where`
764parameter, which will limit those rows which receive an UPDATE:
765
766.. sourcecode:: pycon+sql
767
768 >>> stmt = insert(my_table).values(
769 ... id="some_id", data="inserted value", author="jlh"
770 ... )
771 >>> on_update_stmt = stmt.on_conflict_do_update(
772 ... index_elements=["id"],
773 ... set_=dict(data="updated value", author=stmt.excluded.author),
774 ... where=(my_table.c.status == 2),
775 ... )
776 >>> print(on_update_stmt)
777 {printsql}INSERT INTO my_table (id, data, author)
778 VALUES (%(id)s, %(data)s, %(author)s)
779 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
780 WHERE my_table.status = %(status_1)s
781
782Skipping Rows with DO NOTHING
783^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
784
785``ON CONFLICT`` may be used to skip inserting a row entirely
786if any conflict with a unique or exclusion constraint occurs; below
787this is illustrated using the
788:meth:`~.postgresql.Insert.on_conflict_do_nothing` method:
789
790.. sourcecode:: pycon+sql
791
792 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
793 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
794 >>> print(stmt)
795 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
796 ON CONFLICT (id) DO NOTHING
797
798If ``DO NOTHING`` is used without specifying any columns or constraint,
799it has the effect of skipping the INSERT for any unique or exclusion
800constraint violation which occurs:
801
802.. sourcecode:: pycon+sql
803
804 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
805 >>> stmt = stmt.on_conflict_do_nothing()
806 >>> print(stmt)
807 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
808 ON CONFLICT DO NOTHING
809
810.. _postgresql_match:
811
812Full Text Search
813----------------
814
815PostgreSQL's full text search system is available through the use of the
816:data:`.func` namespace, combined with the use of custom operators
817via the :meth:`.Operators.bool_op` method. For simple cases with some
818degree of cross-backend compatibility, the :meth:`.Operators.match` operator
819may also be used.
820
821.. _postgresql_simple_match:
822
823Simple plain text matching with ``match()``
824^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
825
826The :meth:`.Operators.match` operator provides for cross-compatible simple
827text matching. For the PostgreSQL backend, it's hardcoded to generate
828an expression using the ``@@`` operator in conjunction with the
829``plainto_tsquery()`` PostgreSQL function.
830
831On the PostgreSQL dialect, an expression like the following::
832
833 select(sometable.c.text.match("search string"))
834
835would emit to the database:
836
837.. sourcecode:: sql
838
839 SELECT text @@ plainto_tsquery('search string') FROM table
840
841Above, passing a plain string to :meth:`.Operators.match` will automatically
842make use of ``plainto_tsquery()`` to specify the type of tsquery. This
843establishes basic database cross-compatibility for :meth:`.Operators.match`
844with other backends.
845
846.. versionchanged:: 2.0 The default tsquery generation function used by the
847 PostgreSQL dialect with :meth:`.Operators.match` is ``plainto_tsquery()``.
848
849 To render exactly what was rendered in 1.4, use the following form::
850
851 from sqlalchemy import func
852
853 select(sometable.c.text.bool_op("@@")(func.to_tsquery("search string")))
854
855 Which would emit:
856
857 .. sourcecode:: sql
858
859 SELECT text @@ to_tsquery('search string') FROM table
860
861Using PostgreSQL full text functions and operators directly
862^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
863
864Text search operations beyond the simple use of :meth:`.Operators.match`
865may make use of the :data:`.func` namespace to generate PostgreSQL full-text
866functions, in combination with :meth:`.Operators.bool_op` to generate
867any boolean operator.
868
869For example, the query::
870
871 select(func.to_tsquery("cat").bool_op("@>")(func.to_tsquery("cat & rat")))
872
873would generate:
874
875.. sourcecode:: sql
876
877 SELECT to_tsquery('cat') @> to_tsquery('cat & rat')
878
879
880The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST::
881
882 from sqlalchemy.dialects.postgresql import TSVECTOR
883 from sqlalchemy import select, cast
884
885 select(cast("some text", TSVECTOR))
886
887produces a statement equivalent to:
888
889.. sourcecode:: sql
890
891 SELECT CAST('some text' AS TSVECTOR) AS anon_1
892
893The ``func`` namespace is augmented by the PostgreSQL dialect to set up
894correct argument and return types for most full text search functions.
895These functions are used automatically by the :attr:`_sql.func` namespace
896assuming the ``sqlalchemy.dialects.postgresql`` package has been imported,
897or :func:`_sa.create_engine` has been invoked using a ``postgresql``
898dialect. These functions are documented at:
899
900* :class:`_postgresql.to_tsvector`
901* :class:`_postgresql.to_tsquery`
902* :class:`_postgresql.plainto_tsquery`
903* :class:`_postgresql.phraseto_tsquery`
904* :class:`_postgresql.websearch_to_tsquery`
905* :class:`_postgresql.ts_headline`
906
907Specifying the "regconfig" with ``match()`` or custom operators
908^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
909
910PostgreSQL's ``plainto_tsquery()`` function accepts an optional
911"regconfig" argument that is used to instruct PostgreSQL to use a
912particular pre-computed GIN or GiST index in order to perform the search.
913When using :meth:`.Operators.match`, this additional parameter may be
914specified using the ``postgresql_regconfig`` parameter, such as::
915
916 select(mytable.c.id).where(
917 mytable.c.title.match("somestring", postgresql_regconfig="english")
918 )
919
920Which would emit:
921
922.. sourcecode:: sql
923
924 SELECT mytable.id FROM mytable
925 WHERE mytable.title @@ plainto_tsquery('english', 'somestring')
926
927When using other PostgreSQL search functions with :data:`.func`, the
928"regconfig" parameter may be passed directly as the initial argument::
929
930 select(mytable.c.id).where(
931 func.to_tsvector("english", mytable.c.title).bool_op("@@")(
932 func.to_tsquery("english", "somestring")
933 )
934 )
935
936produces a statement equivalent to:
937
938.. sourcecode:: sql
939
940 SELECT mytable.id FROM mytable
941 WHERE to_tsvector('english', mytable.title) @@
942 to_tsquery('english', 'somestring')
943
944It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from
945PostgreSQL to ensure that you are generating queries with SQLAlchemy that
946take full advantage of any indexes you may have created for full text search.
947
948.. seealso::
949
950 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation
951
952
953FROM ONLY ...
954-------------
955
956The dialect supports PostgreSQL's ONLY keyword for targeting only a particular
957table in an inheritance hierarchy. This can be used to produce the
958``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...``
959syntaxes. It uses SQLAlchemy's hints mechanism::
960
961 # SELECT ... FROM ONLY ...
962 result = table.select().with_hint(table, "ONLY", "postgresql")
963 print(result.fetchall())
964
965 # UPDATE ONLY ...
966 table.update(values=dict(foo="bar")).with_hint(
967 "ONLY", dialect_name="postgresql"
968 )
969
970 # DELETE FROM ONLY ...
971 table.delete().with_hint("ONLY", dialect_name="postgresql")
972
973.. _postgresql_indexes:
974
975PostgreSQL-Specific Index Options
976---------------------------------
977
978Several extensions to the :class:`.Index` construct are available, specific
979to the PostgreSQL dialect.
980
981.. _postgresql_covering_indexes:
982
983Covering Indexes
984^^^^^^^^^^^^^^^^
985
986A covering index includes additional columns that are not part of the index key
987but are stored in the index, allowing PostgreSQL to satisfy queries using only
988the index without accessing the table (an "index-only scan"). This is
989indicated on the index using the ``INCLUDE`` clause. The
990``postgresql_include`` option for :class:`.Index` (as well as
991:class:`.UniqueConstraint`) renders ``INCLUDE(colname)`` for the given string
992names::
993
994 Index("my_index", table.c.x, postgresql_include=["y"])
995
996would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
997
998Note that this feature requires PostgreSQL 11 or later.
999
1000.. seealso::
1001
1002 :ref:`postgresql_constraint_options_include` - the same feature implemented
1003 for :class:`.UniqueConstraint`
1004
1005.. versionadded:: 1.4 - support for covering indexes with :class:`.Index`.
1006 support for :class:`.UniqueConstraint` was in 2.0.41
1007
1008.. _postgresql_partial_indexes:
1009
1010Partial Indexes
1011^^^^^^^^^^^^^^^
1012
1013Partial indexes add criterion to the index definition so that the index is
1014applied to a subset of rows. These can be specified on :class:`.Index`
1015using the ``postgresql_where`` keyword argument::
1016
1017 Index("my_index", my_table.c.id, postgresql_where=my_table.c.value > 10)
1018
1019.. _postgresql_operator_classes:
1020
1021Operator Classes
1022^^^^^^^^^^^^^^^^
1023
1024PostgreSQL allows the specification of an *operator class* for each column of
1025an index (see
1026https://www.postgresql.org/docs/current/interactive/indexes-opclass.html).
1027The :class:`.Index` construct allows these to be specified via the
1028``postgresql_ops`` keyword argument::
1029
1030 Index(
1031 "my_index",
1032 my_table.c.id,
1033 my_table.c.data,
1034 postgresql_ops={"data": "text_pattern_ops", "id": "int4_ops"},
1035 )
1036
1037Note that the keys in the ``postgresql_ops`` dictionaries are the
1038"key" name of the :class:`_schema.Column`, i.e. the name used to access it from
1039the ``.c`` collection of :class:`_schema.Table`, which can be configured to be
1040different than the actual name of the column as expressed in the database.
1041
1042If ``postgresql_ops`` is to be used against a complex SQL expression such
1043as a function call, then to apply to the column it must be given a label
1044that is identified in the dictionary by name, e.g.::
1045
1046 Index(
1047 "my_index",
1048 my_table.c.id,
1049 func.lower(my_table.c.data).label("data_lower"),
1050 postgresql_ops={"data_lower": "text_pattern_ops", "id": "int4_ops"},
1051 )
1052
1053Operator classes are also supported by the
1054:class:`_postgresql.ExcludeConstraint` construct using the
1055:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for
1056details.
1057
1058.. versionadded:: 1.3.21 added support for operator classes with
1059 :class:`_postgresql.ExcludeConstraint`.
1060
1061
1062Index Types
1063^^^^^^^^^^^
1064
1065PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well
1066as the ability for users to create their own (see
1067https://www.postgresql.org/docs/current/static/indexes-types.html). These can be
1068specified on :class:`.Index` using the ``postgresql_using`` keyword argument::
1069
1070 Index("my_index", my_table.c.data, postgresql_using="gin")
1071
1072The value passed to the keyword argument will be simply passed through to the
1073underlying CREATE INDEX command, so it *must* be a valid index type for your
1074version of PostgreSQL.
1075
1076.. _postgresql_index_storage:
1077
1078Index Storage Parameters
1079^^^^^^^^^^^^^^^^^^^^^^^^
1080
1081PostgreSQL allows storage parameters to be set on indexes. The storage
1082parameters available depend on the index method used by the index. Storage
1083parameters can be specified on :class:`.Index` using the ``postgresql_with``
1084keyword argument::
1085
1086 Index("my_index", my_table.c.data, postgresql_with={"fillfactor": 50})
1087
1088PostgreSQL allows to define the tablespace in which to create the index.
1089The tablespace can be specified on :class:`.Index` using the
1090``postgresql_tablespace`` keyword argument::
1091
1092 Index("my_index", my_table.c.data, postgresql_tablespace="my_tablespace")
1093
1094Note that the same option is available on :class:`_schema.Table` as well.
1095
1096.. _postgresql_index_concurrently:
1097
1098Indexes with CONCURRENTLY
1099^^^^^^^^^^^^^^^^^^^^^^^^^
1100
1101The PostgreSQL index option CONCURRENTLY is supported by passing the
1102flag ``postgresql_concurrently`` to the :class:`.Index` construct::
1103
1104 tbl = Table("testtbl", m, Column("data", Integer))
1105
1106 idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True)
1107
1108The above index construct will render DDL for CREATE INDEX, assuming
1109PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as:
1110
1111.. sourcecode:: sql
1112
1113 CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)
1114
1115For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for
1116a connection-less dialect, it will emit:
1117
1118.. sourcecode:: sql
1119
1120 DROP INDEX CONCURRENTLY test_idx1
1121
1122When using CONCURRENTLY, the PostgreSQL database requires that the statement
1123be invoked outside of a transaction block. The Python DBAPI enforces that
1124even for a single statement, a transaction is present, so to use this
1125construct, the DBAPI's "autocommit" mode must be used::
1126
1127 metadata = MetaData()
1128 table = Table("foo", metadata, Column("id", String))
1129 index = Index("foo_idx", table.c.id, postgresql_concurrently=True)
1130
1131 with engine.connect() as conn:
1132 with conn.execution_options(isolation_level="AUTOCOMMIT"):
1133 table.create(conn)
1134
1135.. seealso::
1136
1137 :ref:`postgresql_isolation_level`
1138
1139.. _postgresql_index_reflection:
1140
1141PostgreSQL Index Reflection
1142---------------------------
1143
1144The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the
1145UNIQUE CONSTRAINT construct is used. When inspecting a table using
1146:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes`
1147and the :meth:`_reflection.Inspector.get_unique_constraints`
1148will report on these
1149two constructs distinctly; in the case of the index, the key
1150``duplicates_constraint`` will be present in the index entry if it is
1151detected as mirroring a constraint. When performing reflection using
1152``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned
1153in :attr:`_schema.Table.indexes` when it is detected as mirroring a
1154:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection
1155.
1156
1157Special Reflection Options
1158--------------------------
1159
1160The :class:`_reflection.Inspector`
1161used for the PostgreSQL backend is an instance
1162of :class:`.PGInspector`, which offers additional methods::
1163
1164 from sqlalchemy import create_engine, inspect
1165
1166 engine = create_engine("postgresql+psycopg2://localhost/test")
1167 insp = inspect(engine) # will be a PGInspector
1168
1169 print(insp.get_enums())
1170
1171.. autoclass:: PGInspector
1172 :members:
1173
1174.. _postgresql_table_options:
1175
1176PostgreSQL Table Options
1177------------------------
1178
1179Several options for CREATE TABLE are supported directly by the PostgreSQL
1180dialect in conjunction with the :class:`_schema.Table` construct, listed in
1181the following sections.
1182
1183.. seealso::
1184
1185 `PostgreSQL CREATE TABLE options
1186 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ -
1187 in the PostgreSQL documentation.
1188
1189``INHERITS``
1190^^^^^^^^^^^^
1191
1192Specifies one or more parent tables from which this table inherits columns and
1193constraints, enabling table inheritance hierarchies in PostgreSQL.
1194
1195::
1196
1197 Table("some_table", metadata, ..., postgresql_inherits="some_supertable")
1198
1199 Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...))
1200
1201``ON COMMIT``
1202^^^^^^^^^^^^^
1203
1204Controls the behavior of temporary tables at transaction commit, with options
1205to preserve rows, delete rows, or drop the table.
1206
1207::
1208
1209 Table("some_table", metadata, ..., postgresql_on_commit="PRESERVE ROWS")
1210
1211``PARTITION BY``
1212^^^^^^^^^^^^^^^^
1213
1214Declares the table as a partitioned table using the specified partitioning
1215strategy (RANGE, LIST, or HASH) on the given column(s).
1216
1217::
1218
1219 Table(
1220 "some_table",
1221 metadata,
1222 ...,
1223 postgresql_partition_by="LIST (part_column)",
1224 )
1225
1226``TABLESPACE``
1227^^^^^^^^^^^^^^
1228
1229Specifies the tablespace where the table will be stored, allowing control over
1230the physical location of table data on disk.
1231
1232::
1233
1234 Table("some_table", metadata, ..., postgresql_tablespace="some_tablespace")
1235
1236The above option is also available on the :class:`.Index` construct.
1237
1238``USING``
1239^^^^^^^^^
1240
1241Specifies the table access method to use for storing table data, such as
1242``heap`` (the default) or other custom access methods.
1243
1244::
1245
1246 Table("some_table", metadata, ..., postgresql_using="heap")
1247
1248.. versionadded:: 2.0.26
1249
1250``WITH OIDS``
1251^^^^^^^^^^^^^
1252
1253Enables the legacy OID (object identifier) system column for the table, which
1254assigns a unique identifier to each row.
1255
1256::
1257
1258 Table("some_table", metadata, ..., postgresql_with_oids=True)
1259
1260``WITHOUT OIDS``
1261^^^^^^^^^^^^^^^^
1262
1263Explicitly disables the OID system column for the table (the default behavior
1264in modern PostgreSQL versions).
1265
1266::
1267
1268 Table("some_table", metadata, ..., postgresql_with_oids=False)
1269
1270.. _postgresql_constraint_options:
1271
1272PostgreSQL Constraint Options
1273-----------------------------
1274
1275The following sections indicate options which are supported by the PostgreSQL
1276dialect in conjunction with selected constraint constructs.
1277
1278
1279``NOT VALID``
1280^^^^^^^^^^^^^
1281
1282Allows a constraint to be added without validating existing rows, improving
1283performance when adding constraints to large tables. This option applies
1284towards CHECK and FOREIGN KEY constraints when the constraint is being added
1285to an existing table via ALTER TABLE, and has the effect that existing rows
1286are not scanned during the ALTER operation against the constraint being added.
1287
1288When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_
1289that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument
1290may be specified as an additional keyword argument within the operation
1291that creates the constraint, as in the following Alembic example::
1292
1293 def update():
1294 op.create_foreign_key(
1295 "fk_user_address",
1296 "address",
1297 "user",
1298 ["user_id"],
1299 ["id"],
1300 postgresql_not_valid=True,
1301 )
1302
1303The keyword is ultimately accepted directly by the
1304:class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint`
1305and :class:`_schema.ForeignKey` constructs; when using a tool like
1306Alembic, dialect-specific keyword arguments are passed through to
1307these constructs from the migration operation directives::
1308
1309 CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True)
1310
1311 ForeignKeyConstraint(
1312 ["some_id"], ["some_table.some_id"], postgresql_not_valid=True
1313 )
1314
1315.. versionadded:: 1.4.32
1316
1317.. seealso::
1318
1319 `PostgreSQL ALTER TABLE options
1320 <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ -
1321 in the PostgreSQL documentation.
1322
1323.. _postgresql_constraint_options_include:
1324
1325``INCLUDE``
1326^^^^^^^^^^^
1327
1328This keyword is applicable to both a ``UNIQUE`` constraint as well as an
1329``INDEX``. The ``postgresql_include`` option available for
1330:class:`.UniqueConstraint` as well as :class:`.Index` creates a covering index
1331by including additional columns in the underlying index without making them
1332part of the key constraint. This option adds one or more columns as a "payload"
1333to the index created automatically by PostgreSQL for the constraint. For
1334example, the following table definition::
1335
1336 Table(
1337 "mytable",
1338 metadata,
1339 Column("id", Integer, nullable=False),
1340 Column("value", Integer, nullable=False),
1341 UniqueConstraint("id", postgresql_include=["value"]),
1342 )
1343
1344would produce the DDL statement
1345
1346.. sourcecode:: sql
1347
1348 CREATE TABLE mytable (
1349 id INTEGER NOT NULL,
1350 value INTEGER NOT NULL,
1351 UNIQUE (id) INCLUDE (value)
1352 )
1353
1354Note that this feature requires PostgreSQL 11 or later.
1355
1356.. versionadded:: 2.0.41 - added support for ``postgresql_include`` to
1357 :class:`.UniqueConstraint`, to complement the existing feature in
1358 :class:`.Index`.
1359
1360.. seealso::
1361
1362 :ref:`postgresql_covering_indexes` - background on ``postgresql_include``
1363 for the :class:`.Index` construct.
1364
1365
1366Column list with foreign key ``ON DELETE SET`` actions
1367^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1368
1369Allows selective column updates when a foreign key action is triggered, limiting
1370which columns are set to NULL or DEFAULT upon deletion of a referenced row.
1371This applies to :class:`.ForeignKey` and :class:`.ForeignKeyConstraint`, the
1372:paramref:`.ForeignKey.ondelete` parameter will accept on the PostgreSQL
1373backend only a string list of column names inside parenthesis, following the
1374``SET NULL`` or ``SET DEFAULT`` phrases, which will limit the set of columns
1375that are subject to the action::
1376
1377 fktable = Table(
1378 "fktable",
1379 metadata,
1380 Column("tid", Integer),
1381 Column("id", Integer),
1382 Column("fk_id_del_set_null", Integer),
1383 ForeignKeyConstraint(
1384 columns=["tid", "fk_id_del_set_null"],
1385 refcolumns=[pktable.c.tid, pktable.c.id],
1386 ondelete="SET NULL (fk_id_del_set_null)",
1387 ),
1388 )
1389
1390.. versionadded:: 2.0.40
1391
1392
1393.. _postgresql_table_valued_overview:
1394
1395Table values, Table and Column valued functions, Row and Tuple objects
1396-----------------------------------------------------------------------
1397
1398PostgreSQL makes great use of modern SQL forms such as table-valued functions,
1399tables and rows as values. These constructs are commonly used as part
1400of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other
1401datatypes. SQLAlchemy's SQL expression language has native support for
1402most table-valued and row-valued forms.
1403
1404.. _postgresql_table_valued:
1405
1406Table-Valued Functions
1407^^^^^^^^^^^^^^^^^^^^^^^
1408
1409Many PostgreSQL built-in functions are intended to be used in the FROM clause
1410of a SELECT statement, and are capable of returning table rows or sets of table
1411rows. A large portion of PostgreSQL's JSON functions for example such as
1412``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``,
1413``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such
1414forms. These classes of SQL function calling forms in SQLAlchemy are available
1415using the :meth:`_functions.FunctionElement.table_valued` method in conjunction
1416with :class:`_functions.Function` objects generated from the :data:`_sql.func`
1417namespace.
1418
1419Examples from PostgreSQL's reference documentation follow below:
1420
1421* ``json_each()``:
1422
1423 .. sourcecode:: pycon+sql
1424
1425 >>> from sqlalchemy import select, func
1426 >>> stmt = select(
1427 ... func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value")
1428 ... )
1429 >>> print(stmt)
1430 {printsql}SELECT anon_1.key, anon_1.value
1431 FROM json_each(:json_each_1) AS anon_1
1432
1433* ``json_populate_record()``:
1434
1435 .. sourcecode:: pycon+sql
1436
1437 >>> from sqlalchemy import select, func, literal_column
1438 >>> stmt = select(
1439 ... func.json_populate_record(
1440 ... literal_column("null::myrowtype"), '{"a":1,"b":2}'
1441 ... ).table_valued("a", "b", name="x")
1442 ... )
1443 >>> print(stmt)
1444 {printsql}SELECT x.a, x.b
1445 FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x
1446
1447* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived
1448 columns in the alias, where we may make use of :func:`_sql.column` elements with
1449 types to produce them. The :meth:`_functions.FunctionElement.table_valued`
1450 method produces a :class:`_sql.TableValuedAlias` construct, and the method
1451 :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived
1452 columns specification:
1453
1454 .. sourcecode:: pycon+sql
1455
1456 >>> from sqlalchemy import select, func, column, Integer, Text
1457 >>> stmt = select(
1458 ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}')
1459 ... .table_valued(
1460 ... column("a", Integer),
1461 ... column("b", Text),
1462 ... column("d", Text),
1463 ... )
1464 ... .render_derived(name="x", with_types=True)
1465 ... )
1466 >>> print(stmt)
1467 {printsql}SELECT x.a, x.b, x.d
1468 FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT)
1469
1470* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an
1471 ordinal counter to the output of a function and is accepted by a limited set
1472 of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The
1473 :meth:`_functions.FunctionElement.table_valued` method accepts a keyword
1474 parameter ``with_ordinality`` for this purpose, which accepts the string name
1475 that will be applied to the "ordinality" column:
1476
1477 .. sourcecode:: pycon+sql
1478
1479 >>> from sqlalchemy import select, func
1480 >>> stmt = select(
1481 ... func.generate_series(4, 1, -1)
1482 ... .table_valued("value", with_ordinality="ordinality")
1483 ... .render_derived()
1484 ... )
1485 >>> print(stmt)
1486 {printsql}SELECT anon_1.value, anon_1.ordinality
1487 FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3)
1488 WITH ORDINALITY AS anon_1(value, ordinality)
1489
1490.. versionadded:: 1.4.0b2
1491
1492.. seealso::
1493
1494 :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial`
1495
1496.. _postgresql_column_valued:
1497
1498Column Valued Functions
1499^^^^^^^^^^^^^^^^^^^^^^^
1500
1501Similar to the table valued function, a column valued function is present
1502in the FROM clause, but delivers itself to the columns clause as a single
1503scalar value. PostgreSQL functions such as ``json_array_elements()``,
1504``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the
1505:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`:
1506
1507* ``json_array_elements()``:
1508
1509 .. sourcecode:: pycon+sql
1510
1511 >>> from sqlalchemy import select, func
1512 >>> stmt = select(
1513 ... func.json_array_elements('["one", "two"]').column_valued("x")
1514 ... )
1515 >>> print(stmt)
1516 {printsql}SELECT x
1517 FROM json_array_elements(:json_array_elements_1) AS x
1518
1519* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the
1520 :func:`_postgresql.array` construct may be used:
1521
1522 .. sourcecode:: pycon+sql
1523
1524 >>> from sqlalchemy.dialects.postgresql import array
1525 >>> from sqlalchemy import select, func
1526 >>> stmt = select(func.unnest(array([1, 2])).column_valued())
1527 >>> print(stmt)
1528 {printsql}SELECT anon_1
1529 FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1
1530
1531 The function can of course be used against an existing table-bound column
1532 that's of type :class:`_types.ARRAY`:
1533
1534 .. sourcecode:: pycon+sql
1535
1536 >>> from sqlalchemy import table, column, ARRAY, Integer
1537 >>> from sqlalchemy import select, func
1538 >>> t = table("t", column("value", ARRAY(Integer)))
1539 >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value"))
1540 >>> print(stmt)
1541 {printsql}SELECT unnested_value
1542 FROM unnest(t.value) AS unnested_value
1543
1544.. seealso::
1545
1546 :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial`
1547
1548
1549Row Types
1550^^^^^^^^^
1551
1552Built-in support for rendering a ``ROW`` may be approximated using
1553``func.ROW`` with the :attr:`_sa.func` namespace, or by using the
1554:func:`_sql.tuple_` construct:
1555
1556.. sourcecode:: pycon+sql
1557
1558 >>> from sqlalchemy import table, column, func, tuple_
1559 >>> t = table("t", column("id"), column("fk"))
1560 >>> stmt = (
1561 ... t.select()
1562 ... .where(tuple_(t.c.id, t.c.fk) > (1, 2))
1563 ... .where(func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7))
1564 ... )
1565 >>> print(stmt)
1566 {printsql}SELECT t.id, t.fk
1567 FROM t
1568 WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2)
1569
1570.. seealso::
1571
1572 `PostgreSQL Row Constructors
1573 <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_
1574
1575 `PostgreSQL Row Constructor Comparison
1576 <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_
1577
1578Table Types passed to Functions
1579^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1580
1581PostgreSQL supports passing a table as an argument to a function, which is
1582known as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects
1583such as :class:`_schema.Table` support this special form using the
1584:meth:`_sql.FromClause.table_valued` method, which is comparable to the
1585:meth:`_functions.FunctionElement.table_valued` method except that the collection
1586of columns is already established by that of the :class:`_sql.FromClause`
1587itself:
1588
1589.. sourcecode:: pycon+sql
1590
1591 >>> from sqlalchemy import table, column, func, select
1592 >>> a = table("a", column("id"), column("x"), column("y"))
1593 >>> stmt = select(func.row_to_json(a.table_valued()))
1594 >>> print(stmt)
1595 {printsql}SELECT row_to_json(a) AS row_to_json_1
1596 FROM a
1597
1598.. versionadded:: 1.4.0b2
1599
1600
1601
1602""" # noqa: E501
1603
1604from __future__ import annotations
1605
1606from collections import defaultdict
1607from functools import lru_cache
1608import re
1609from typing import Any
1610from typing import cast
1611from typing import Dict
1612from typing import List
1613from typing import Optional
1614from typing import Tuple
1615from typing import TYPE_CHECKING
1616from typing import Union
1617
1618from . import arraylib as _array
1619from . import json as _json
1620from . import pg_catalog
1621from . import ranges as _ranges
1622from .ext import _regconfig_fn
1623from .ext import aggregate_order_by
1624from .hstore import HSTORE
1625from .named_types import CreateDomainType as CreateDomainType # noqa: F401
1626from .named_types import CreateEnumType as CreateEnumType # noqa: F401
1627from .named_types import DOMAIN as DOMAIN # noqa: F401
1628from .named_types import DropDomainType as DropDomainType # noqa: F401
1629from .named_types import DropEnumType as DropEnumType # noqa: F401
1630from .named_types import ENUM as ENUM # noqa: F401
1631from .named_types import NamedType as NamedType # noqa: F401
1632from .types import _DECIMAL_TYPES # noqa: F401
1633from .types import _FLOAT_TYPES # noqa: F401
1634from .types import _INT_TYPES # noqa: F401
1635from .types import BIT as BIT
1636from .types import BYTEA as BYTEA
1637from .types import CIDR as CIDR
1638from .types import CITEXT as CITEXT
1639from .types import INET as INET
1640from .types import INTERVAL as INTERVAL
1641from .types import MACADDR as MACADDR
1642from .types import MACADDR8 as MACADDR8
1643from .types import MONEY as MONEY
1644from .types import OID as OID
1645from .types import PGBit as PGBit # noqa: F401
1646from .types import PGCidr as PGCidr # noqa: F401
1647from .types import PGInet as PGInet # noqa: F401
1648from .types import PGInterval as PGInterval # noqa: F401
1649from .types import PGMacAddr as PGMacAddr # noqa: F401
1650from .types import PGMacAddr8 as PGMacAddr8 # noqa: F401
1651from .types import PGUuid as PGUuid
1652from .types import REGCLASS as REGCLASS
1653from .types import REGCONFIG as REGCONFIG # noqa: F401
1654from .types import TIME as TIME
1655from .types import TIMESTAMP as TIMESTAMP
1656from .types import TSVECTOR as TSVECTOR
1657from ... import exc
1658from ... import schema
1659from ... import select
1660from ... import sql
1661from ... import util
1662from ...engine import characteristics
1663from ...engine import default
1664from ...engine import interfaces
1665from ...engine import ObjectKind
1666from ...engine import ObjectScope
1667from ...engine import reflection
1668from ...engine import URL
1669from ...engine.reflection import ReflectionDefaults
1670from ...sql import bindparam
1671from ...sql import coercions
1672from ...sql import compiler
1673from ...sql import elements
1674from ...sql import expression
1675from ...sql import functions
1676from ...sql import roles
1677from ...sql import sqltypes
1678from ...sql import util as sql_util
1679from ...sql.compiler import InsertmanyvaluesSentinelOpts
1680from ...sql.visitors import InternalTraversal
1681from ...types import BIGINT
1682from ...types import BOOLEAN
1683from ...types import CHAR
1684from ...types import DATE
1685from ...types import DOUBLE_PRECISION
1686from ...types import FLOAT
1687from ...types import INTEGER
1688from ...types import NUMERIC
1689from ...types import REAL
1690from ...types import SMALLINT
1691from ...types import TEXT
1692from ...types import UUID as UUID
1693from ...types import VARCHAR
1694from ...util.typing import TypedDict
1695
1696IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I)
1697
1698RESERVED_WORDS = {
1699 "all",
1700 "analyse",
1701 "analyze",
1702 "and",
1703 "any",
1704 "array",
1705 "as",
1706 "asc",
1707 "asymmetric",
1708 "both",
1709 "case",
1710 "cast",
1711 "check",
1712 "collate",
1713 "column",
1714 "constraint",
1715 "create",
1716 "current_catalog",
1717 "current_date",
1718 "current_role",
1719 "current_time",
1720 "current_timestamp",
1721 "current_user",
1722 "default",
1723 "deferrable",
1724 "desc",
1725 "distinct",
1726 "do",
1727 "else",
1728 "end",
1729 "except",
1730 "false",
1731 "fetch",
1732 "for",
1733 "foreign",
1734 "from",
1735 "grant",
1736 "group",
1737 "having",
1738 "in",
1739 "initially",
1740 "intersect",
1741 "into",
1742 "leading",
1743 "limit",
1744 "localtime",
1745 "localtimestamp",
1746 "new",
1747 "not",
1748 "null",
1749 "of",
1750 "off",
1751 "offset",
1752 "old",
1753 "on",
1754 "only",
1755 "or",
1756 "order",
1757 "placing",
1758 "primary",
1759 "references",
1760 "returning",
1761 "select",
1762 "session_user",
1763 "some",
1764 "symmetric",
1765 "table",
1766 "then",
1767 "to",
1768 "trailing",
1769 "true",
1770 "union",
1771 "unique",
1772 "user",
1773 "using",
1774 "variadic",
1775 "when",
1776 "where",
1777 "window",
1778 "with",
1779 "authorization",
1780 "between",
1781 "binary",
1782 "cross",
1783 "current_schema",
1784 "freeze",
1785 "full",
1786 "ilike",
1787 "inner",
1788 "is",
1789 "isnull",
1790 "join",
1791 "left",
1792 "like",
1793 "natural",
1794 "notnull",
1795 "outer",
1796 "over",
1797 "overlaps",
1798 "right",
1799 "similar",
1800 "verbose",
1801}
1802
1803
1804colspecs = {
1805 sqltypes.ARRAY: _array.ARRAY,
1806 sqltypes.Interval: INTERVAL,
1807 sqltypes.Enum: ENUM,
1808 sqltypes.JSON.JSONPathType: _json.JSONPATH,
1809 sqltypes.JSON: _json.JSON,
1810 sqltypes.Uuid: PGUuid,
1811}
1812
1813
1814ischema_names = {
1815 "_array": _array.ARRAY,
1816 "hstore": HSTORE,
1817 "json": _json.JSON,
1818 "jsonb": _json.JSONB,
1819 "int4range": _ranges.INT4RANGE,
1820 "int8range": _ranges.INT8RANGE,
1821 "numrange": _ranges.NUMRANGE,
1822 "daterange": _ranges.DATERANGE,
1823 "tsrange": _ranges.TSRANGE,
1824 "tstzrange": _ranges.TSTZRANGE,
1825 "int4multirange": _ranges.INT4MULTIRANGE,
1826 "int8multirange": _ranges.INT8MULTIRANGE,
1827 "nummultirange": _ranges.NUMMULTIRANGE,
1828 "datemultirange": _ranges.DATEMULTIRANGE,
1829 "tsmultirange": _ranges.TSMULTIRANGE,
1830 "tstzmultirange": _ranges.TSTZMULTIRANGE,
1831 "integer": INTEGER,
1832 "bigint": BIGINT,
1833 "smallint": SMALLINT,
1834 "character varying": VARCHAR,
1835 "character": CHAR,
1836 '"char"': sqltypes.String,
1837 "name": sqltypes.String,
1838 "text": TEXT,
1839 "numeric": NUMERIC,
1840 "float": FLOAT,
1841 "real": REAL,
1842 "inet": INET,
1843 "cidr": CIDR,
1844 "citext": CITEXT,
1845 "uuid": UUID,
1846 "bit": BIT,
1847 "bit varying": BIT,
1848 "macaddr": MACADDR,
1849 "macaddr8": MACADDR8,
1850 "money": MONEY,
1851 "oid": OID,
1852 "regclass": REGCLASS,
1853 "double precision": DOUBLE_PRECISION,
1854 "timestamp": TIMESTAMP,
1855 "timestamp with time zone": TIMESTAMP,
1856 "timestamp without time zone": TIMESTAMP,
1857 "time with time zone": TIME,
1858 "time without time zone": TIME,
1859 "date": DATE,
1860 "time": TIME,
1861 "bytea": BYTEA,
1862 "boolean": BOOLEAN,
1863 "interval": INTERVAL,
1864 "tsvector": TSVECTOR,
1865}
1866
1867
1868class PGCompiler(compiler.SQLCompiler):
1869 def visit_to_tsvector_func(self, element, **kw):
1870 return self._assert_pg_ts_ext(element, **kw)
1871
1872 def visit_to_tsquery_func(self, element, **kw):
1873 return self._assert_pg_ts_ext(element, **kw)
1874
1875 def visit_plainto_tsquery_func(self, element, **kw):
1876 return self._assert_pg_ts_ext(element, **kw)
1877
1878 def visit_phraseto_tsquery_func(self, element, **kw):
1879 return self._assert_pg_ts_ext(element, **kw)
1880
1881 def visit_websearch_to_tsquery_func(self, element, **kw):
1882 return self._assert_pg_ts_ext(element, **kw)
1883
1884 def visit_ts_headline_func(self, element, **kw):
1885 return self._assert_pg_ts_ext(element, **kw)
1886
1887 def _assert_pg_ts_ext(self, element, **kw):
1888 if not isinstance(element, _regconfig_fn):
1889 # other options here include trying to rewrite the function
1890 # with the correct types. however, that means we have to
1891 # "un-SQL-ize" the first argument, which can't work in a
1892 # generalized way. Also, parent compiler class has already added
1893 # the incorrect return type to the result map. So let's just
1894 # make sure the function we want is used up front.
1895
1896 raise exc.CompileError(
1897 f'Can\'t compile "{element.name}()" full text search '
1898 f"function construct that does not originate from the "
1899 f'"sqlalchemy.dialects.postgresql" package. '
1900 f'Please ensure "import sqlalchemy.dialects.postgresql" is '
1901 f"called before constructing "
1902 f'"sqlalchemy.func.{element.name}()" to ensure registration '
1903 f"of the correct argument and return types."
1904 )
1905
1906 return f"{element.name}{self.function_argspec(element, **kw)}"
1907
1908 def render_bind_cast(self, type_, dbapi_type, sqltext):
1909 if dbapi_type._type_affinity is sqltypes.String and dbapi_type.length:
1910 # use VARCHAR with no length for VARCHAR cast.
1911 # see #9511
1912 dbapi_type = sqltypes.STRINGTYPE
1913 return f"""{sqltext}::{
1914 self.dialect.type_compiler_instance.process(
1915 dbapi_type, identifier_preparer=self.preparer
1916 )
1917 }"""
1918
1919 def visit_array(self, element, **kw):
1920 if not element.clauses and not element.type.item_type._isnull:
1921 return "ARRAY[]::%s" % element.type.compile(self.dialect)
1922 return "ARRAY[%s]" % self.visit_clauselist(element, **kw)
1923
1924 def visit_slice(self, element, **kw):
1925 return "%s:%s" % (
1926 self.process(element.start, **kw),
1927 self.process(element.stop, **kw),
1928 )
1929
1930 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1931 return self._generate_generic_binary(binary, " # ", **kw)
1932
1933 def visit_json_getitem_op_binary(
1934 self, binary, operator, _cast_applied=False, **kw
1935 ):
1936 if (
1937 not _cast_applied
1938 and binary.type._type_affinity is not sqltypes.JSON
1939 ):
1940 kw["_cast_applied"] = True
1941 return self.process(sql.cast(binary, binary.type), **kw)
1942
1943 kw["eager_grouping"] = True
1944
1945 if (
1946 not _cast_applied
1947 and isinstance(binary.left.type, _json.JSONB)
1948 and self.dialect._supports_jsonb_subscripting
1949 ):
1950 left = binary.left
1951 if isinstance(left, (functions.FunctionElement, elements.Cast)):
1952 left = elements.Grouping(left)
1953
1954 # for pg14+JSONB use subscript notation: col['key'] instead
1955 # of col -> 'key'
1956 return "%s[%s]" % (
1957 self.process(left, **kw),
1958 self.process(binary.right, **kw),
1959 )
1960 else:
1961 # Fall back to arrow notation for older versions or when cast
1962 # is applied
1963 return self._generate_generic_binary(
1964 binary, " -> " if not _cast_applied else " ->> ", **kw
1965 )
1966
1967 def visit_json_path_getitem_op_binary(
1968 self, binary, operator, _cast_applied=False, **kw
1969 ):
1970 if (
1971 not _cast_applied
1972 and binary.type._type_affinity is not sqltypes.JSON
1973 ):
1974 kw["_cast_applied"] = True
1975 return self.process(sql.cast(binary, binary.type), **kw)
1976
1977 kw["eager_grouping"] = True
1978 return self._generate_generic_binary(
1979 binary, " #> " if not _cast_applied else " #>> ", **kw
1980 )
1981
1982 def visit_getitem_binary(self, binary, operator, **kw):
1983 return "%s[%s]" % (
1984 self.process(binary.left, **kw),
1985 self.process(binary.right, **kw),
1986 )
1987
1988 def visit_aggregate_order_by(self, element, **kw):
1989 return "%s ORDER BY %s" % (
1990 self.process(element.target, **kw),
1991 self.process(element.order_by, **kw),
1992 )
1993
1994 def visit_match_op_binary(self, binary, operator, **kw):
1995 if "postgresql_regconfig" in binary.modifiers:
1996 regconfig = self.render_literal_value(
1997 binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE
1998 )
1999 if regconfig:
2000 return "%s @@ plainto_tsquery(%s, %s)" % (
2001 self.process(binary.left, **kw),
2002 regconfig,
2003 self.process(binary.right, **kw),
2004 )
2005 return "%s @@ plainto_tsquery(%s)" % (
2006 self.process(binary.left, **kw),
2007 self.process(binary.right, **kw),
2008 )
2009
2010 def visit_ilike_case_insensitive_operand(self, element, **kw):
2011 return element.element._compiler_dispatch(self, **kw)
2012
2013 def visit_ilike_op_binary(self, binary, operator, **kw):
2014 escape = binary.modifiers.get("escape", None)
2015
2016 return "%s ILIKE %s" % (
2017 self.process(binary.left, **kw),
2018 self.process(binary.right, **kw),
2019 ) + (
2020 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
2021 if escape is not None
2022 else ""
2023 )
2024
2025 def visit_not_ilike_op_binary(self, binary, operator, **kw):
2026 escape = binary.modifiers.get("escape", None)
2027 return "%s NOT ILIKE %s" % (
2028 self.process(binary.left, **kw),
2029 self.process(binary.right, **kw),
2030 ) + (
2031 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
2032 if escape is not None
2033 else ""
2034 )
2035
2036 def _regexp_match(self, base_op, binary, operator, kw):
2037 flags = binary.modifiers["flags"]
2038 if flags is None:
2039 return self._generate_generic_binary(
2040 binary, " %s " % base_op, **kw
2041 )
2042 if flags == "i":
2043 return self._generate_generic_binary(
2044 binary, " %s* " % base_op, **kw
2045 )
2046 return "%s %s CONCAT('(?', %s, ')', %s)" % (
2047 self.process(binary.left, **kw),
2048 base_op,
2049 self.render_literal_value(flags, sqltypes.STRINGTYPE),
2050 self.process(binary.right, **kw),
2051 )
2052
2053 def visit_regexp_match_op_binary(self, binary, operator, **kw):
2054 return self._regexp_match("~", binary, operator, kw)
2055
2056 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
2057 return self._regexp_match("!~", binary, operator, kw)
2058
2059 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
2060 string = self.process(binary.left, **kw)
2061 pattern_replace = self.process(binary.right, **kw)
2062 flags = binary.modifiers["flags"]
2063 if flags is None:
2064 return "REGEXP_REPLACE(%s, %s)" % (
2065 string,
2066 pattern_replace,
2067 )
2068 else:
2069 return "REGEXP_REPLACE(%s, %s, %s)" % (
2070 string,
2071 pattern_replace,
2072 self.render_literal_value(flags, sqltypes.STRINGTYPE),
2073 )
2074
2075 def visit_empty_set_expr(self, element_types, **kw):
2076 # cast the empty set to the type we are comparing against. if
2077 # we are comparing against the null type, pick an arbitrary
2078 # datatype for the empty set
2079 return "SELECT %s WHERE 1!=1" % (
2080 ", ".join(
2081 "CAST(NULL AS %s)"
2082 % self.dialect.type_compiler_instance.process(
2083 INTEGER() if type_._isnull else type_
2084 )
2085 for type_ in element_types or [INTEGER()]
2086 ),
2087 )
2088
2089 def render_literal_value(self, value, type_):
2090 value = super().render_literal_value(value, type_)
2091
2092 if self.dialect._backslash_escapes:
2093 value = value.replace("\\", "\\\\")
2094 return value
2095
2096 def visit_aggregate_strings_func(self, fn, **kw):
2097 return "string_agg%s" % self.function_argspec(fn)
2098
2099 def visit_sequence(self, seq, **kw):
2100 return "nextval('%s')" % self.preparer.format_sequence(seq)
2101
2102 def limit_clause(self, select, **kw):
2103 text = ""
2104 if select._limit_clause is not None:
2105 text += " \n LIMIT " + self.process(select._limit_clause, **kw)
2106 if select._offset_clause is not None:
2107 if select._limit_clause is None:
2108 text += "\n LIMIT ALL"
2109 text += " OFFSET " + self.process(select._offset_clause, **kw)
2110 return text
2111
2112 def format_from_hint_text(self, sqltext, table, hint, iscrud):
2113 if hint.upper() != "ONLY":
2114 raise exc.CompileError("Unrecognized hint: %r" % hint)
2115 return "ONLY " + sqltext
2116
2117 def get_select_precolumns(self, select, **kw):
2118 # Do not call super().get_select_precolumns because
2119 # it will warn/raise when distinct on is present
2120 if select._distinct or select._distinct_on:
2121 if select._distinct_on:
2122 return (
2123 "DISTINCT ON ("
2124 + ", ".join(
2125 [
2126 self.process(col, **kw)
2127 for col in select._distinct_on
2128 ]
2129 )
2130 + ") "
2131 )
2132 else:
2133 return "DISTINCT "
2134 else:
2135 return ""
2136
2137 def for_update_clause(self, select, **kw):
2138 if select._for_update_arg.read:
2139 if select._for_update_arg.key_share:
2140 tmp = " FOR KEY SHARE"
2141 else:
2142 tmp = " FOR SHARE"
2143 elif select._for_update_arg.key_share:
2144 tmp = " FOR NO KEY UPDATE"
2145 else:
2146 tmp = " FOR UPDATE"
2147
2148 if select._for_update_arg.of:
2149 tables = util.OrderedSet()
2150 for c in select._for_update_arg.of:
2151 tables.update(sql_util.surface_selectables_only(c))
2152
2153 of_kw = dict(kw)
2154 of_kw.update(ashint=True, use_schema=False)
2155 tmp += " OF " + ", ".join(
2156 self.process(table, **of_kw) for table in tables
2157 )
2158
2159 if select._for_update_arg.nowait:
2160 tmp += " NOWAIT"
2161 if select._for_update_arg.skip_locked:
2162 tmp += " SKIP LOCKED"
2163
2164 return tmp
2165
2166 def visit_substring_func(self, func, **kw):
2167 s = self.process(func.clauses.clauses[0], **kw)
2168 start = self.process(func.clauses.clauses[1], **kw)
2169 if len(func.clauses.clauses) > 2:
2170 length = self.process(func.clauses.clauses[2], **kw)
2171 return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length)
2172 else:
2173 return "SUBSTRING(%s FROM %s)" % (s, start)
2174
2175 def _on_conflict_target(self, clause, **kw):
2176 if clause.constraint_target is not None:
2177 # target may be a name of an Index, UniqueConstraint or
2178 # ExcludeConstraint. While there is a separate
2179 # "max_identifier_length" for indexes, PostgreSQL uses the same
2180 # length for all objects so we can use
2181 # truncate_and_render_constraint_name
2182 target_text = (
2183 "ON CONSTRAINT %s"
2184 % self.preparer.truncate_and_render_constraint_name(
2185 clause.constraint_target
2186 )
2187 )
2188 elif clause.inferred_target_elements is not None:
2189 target_text = "(%s)" % ", ".join(
2190 (
2191 self.preparer.quote(c)
2192 if isinstance(c, str)
2193 else self.process(c, include_table=False, use_schema=False)
2194 )
2195 for c in clause.inferred_target_elements
2196 )
2197 if clause.inferred_target_whereclause is not None:
2198 target_text += " WHERE %s" % self.process(
2199 clause.inferred_target_whereclause,
2200 include_table=False,
2201 use_schema=False,
2202 )
2203 else:
2204 target_text = ""
2205
2206 return target_text
2207
2208 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
2209 target_text = self._on_conflict_target(on_conflict, **kw)
2210
2211 if target_text:
2212 return "ON CONFLICT %s DO NOTHING" % target_text
2213 else:
2214 return "ON CONFLICT DO NOTHING"
2215
2216 def visit_on_conflict_do_update(self, on_conflict, **kw):
2217 clause = on_conflict
2218
2219 target_text = self._on_conflict_target(on_conflict, **kw)
2220
2221 action_set_ops = []
2222
2223 set_parameters = dict(clause.update_values_to_set)
2224 # create a list of column assignment clauses as tuples
2225
2226 insert_statement = self.stack[-1]["selectable"]
2227 cols = insert_statement.table.c
2228 for c in cols:
2229 col_key = c.key
2230
2231 if col_key in set_parameters:
2232 value = set_parameters.pop(col_key)
2233 elif c in set_parameters:
2234 value = set_parameters.pop(c)
2235 else:
2236 continue
2237
2238 # TODO: this coercion should be up front. we can't cache
2239 # SQL constructs with non-bound literals buried in them
2240 if coercions._is_literal(value):
2241 value = elements.BindParameter(None, value, type_=c.type)
2242
2243 else:
2244 if (
2245 isinstance(value, elements.BindParameter)
2246 and value.type._isnull
2247 ):
2248 value = value._clone()
2249 value.type = c.type
2250 value_text = self.process(value.self_group(), use_schema=False)
2251
2252 key_text = self.preparer.quote(c.name)
2253 action_set_ops.append("%s = %s" % (key_text, value_text))
2254
2255 # check for names that don't match columns
2256 if set_parameters:
2257 util.warn(
2258 "Additional column names not matching "
2259 "any column keys in table '%s': %s"
2260 % (
2261 self.current_executable.table.name,
2262 (", ".join("'%s'" % c for c in set_parameters)),
2263 )
2264 )
2265 for k, v in set_parameters.items():
2266 key_text = (
2267 self.preparer.quote(k)
2268 if isinstance(k, str)
2269 else self.process(k, use_schema=False)
2270 )
2271 value_text = self.process(
2272 coercions.expect(roles.ExpressionElementRole, v),
2273 use_schema=False,
2274 )
2275 action_set_ops.append("%s = %s" % (key_text, value_text))
2276
2277 action_text = ", ".join(action_set_ops)
2278 if clause.update_whereclause is not None:
2279 action_text += " WHERE %s" % self.process(
2280 clause.update_whereclause, include_table=True, use_schema=False
2281 )
2282
2283 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
2284
2285 def update_from_clause(
2286 self, update_stmt, from_table, extra_froms, from_hints, **kw
2287 ):
2288 kw["asfrom"] = True
2289 return "FROM " + ", ".join(
2290 t._compiler_dispatch(self, fromhints=from_hints, **kw)
2291 for t in extra_froms
2292 )
2293
2294 def delete_extra_from_clause(
2295 self, delete_stmt, from_table, extra_froms, from_hints, **kw
2296 ):
2297 """Render the DELETE .. USING clause specific to PostgreSQL."""
2298 kw["asfrom"] = True
2299 return "USING " + ", ".join(
2300 t._compiler_dispatch(self, fromhints=from_hints, **kw)
2301 for t in extra_froms
2302 )
2303
2304 def fetch_clause(self, select, **kw):
2305 # pg requires parens for non literal clauses. It's also required for
2306 # bind parameters if a ::type casts is used by the driver (asyncpg),
2307 # so it's easiest to just always add it
2308 text = ""
2309 if select._offset_clause is not None:
2310 text += "\n OFFSET (%s) ROWS" % self.process(
2311 select._offset_clause, **kw
2312 )
2313 if select._fetch_clause is not None:
2314 text += "\n FETCH FIRST (%s)%s ROWS %s" % (
2315 self.process(select._fetch_clause, **kw),
2316 " PERCENT" if select._fetch_clause_options["percent"] else "",
2317 (
2318 "WITH TIES"
2319 if select._fetch_clause_options["with_ties"]
2320 else "ONLY"
2321 ),
2322 )
2323 return text
2324
2325
2326class PGDDLCompiler(compiler.DDLCompiler):
2327 def get_column_specification(self, column, **kwargs):
2328 colspec = self.preparer.format_column(column)
2329 impl_type = column.type.dialect_impl(self.dialect)
2330 if isinstance(impl_type, sqltypes.TypeDecorator):
2331 impl_type = impl_type.impl
2332
2333 has_identity = (
2334 column.identity is not None
2335 and self.dialect.supports_identity_columns
2336 )
2337
2338 if (
2339 column.primary_key
2340 and column is column.table._autoincrement_column
2341 and (
2342 self.dialect.supports_smallserial
2343 or not isinstance(impl_type, sqltypes.SmallInteger)
2344 )
2345 and not has_identity
2346 and (
2347 column.default is None
2348 or (
2349 isinstance(column.default, schema.Sequence)
2350 and column.default.optional
2351 )
2352 )
2353 ):
2354 if isinstance(impl_type, sqltypes.BigInteger):
2355 colspec += " BIGSERIAL"
2356 elif isinstance(impl_type, sqltypes.SmallInteger):
2357 colspec += " SMALLSERIAL"
2358 else:
2359 colspec += " SERIAL"
2360 else:
2361 colspec += " " + self.dialect.type_compiler_instance.process(
2362 column.type,
2363 type_expression=column,
2364 identifier_preparer=self.preparer,
2365 )
2366 default = self.get_column_default_string(column)
2367 if default is not None:
2368 colspec += " DEFAULT " + default
2369
2370 if column.computed is not None:
2371 colspec += " " + self.process(column.computed)
2372 if has_identity:
2373 colspec += " " + self.process(column.identity)
2374
2375 if not column.nullable and not has_identity:
2376 colspec += " NOT NULL"
2377 elif column.nullable and has_identity:
2378 colspec += " NULL"
2379 return colspec
2380
2381 def _define_constraint_validity(self, constraint):
2382 not_valid = constraint.dialect_options["postgresql"]["not_valid"]
2383 return " NOT VALID" if not_valid else ""
2384
2385 def _define_include(self, obj):
2386 includeclause = obj.dialect_options["postgresql"]["include"]
2387 if not includeclause:
2388 return ""
2389 inclusions = [
2390 obj.table.c[col] if isinstance(col, str) else col
2391 for col in includeclause
2392 ]
2393 return " INCLUDE (%s)" % ", ".join(
2394 [self.preparer.quote(c.name) for c in inclusions]
2395 )
2396
2397 def visit_check_constraint(self, constraint, **kw):
2398 if constraint._type_bound:
2399 typ = list(constraint.columns)[0].type
2400 if (
2401 isinstance(typ, sqltypes.ARRAY)
2402 and isinstance(typ.item_type, sqltypes.Enum)
2403 and not typ.item_type.native_enum
2404 ):
2405 raise exc.CompileError(
2406 "PostgreSQL dialect cannot produce the CHECK constraint "
2407 "for ARRAY of non-native ENUM; please specify "
2408 "create_constraint=False on this Enum datatype."
2409 )
2410
2411 text = super().visit_check_constraint(constraint)
2412 text += self._define_constraint_validity(constraint)
2413 return text
2414
2415 def visit_foreign_key_constraint(self, constraint, **kw):
2416 text = super().visit_foreign_key_constraint(constraint)
2417 text += self._define_constraint_validity(constraint)
2418 return text
2419
2420 def visit_primary_key_constraint(self, constraint, **kw):
2421 text = self.define_constraint_preamble(constraint, **kw)
2422 text += self.define_primary_key_body(constraint, **kw)
2423 text += self._define_include(constraint)
2424 text += self.define_constraint_deferrability(constraint)
2425 return text
2426
2427 def visit_unique_constraint(self, constraint, **kw):
2428 if len(constraint) == 0:
2429 return ""
2430 text = self.define_constraint_preamble(constraint, **kw)
2431 text += self.define_unique_body(constraint, **kw)
2432 text += self._define_include(constraint)
2433 text += self.define_constraint_deferrability(constraint)
2434 return text
2435
2436 @util.memoized_property
2437 def _fk_ondelete_pattern(self):
2438 return re.compile(
2439 r"^(?:RESTRICT|CASCADE|SET (?:NULL|DEFAULT)(?:\s*\(.+\))?"
2440 r"|NO ACTION)$",
2441 re.I,
2442 )
2443
2444 def define_constraint_ondelete_cascade(self, constraint):
2445 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
2446 constraint.ondelete, self._fk_ondelete_pattern
2447 )
2448
2449 def visit_create_enum_type(self, create, **kw):
2450 type_ = create.element
2451
2452 return "CREATE TYPE %s AS ENUM (%s)" % (
2453 self.preparer.format_type(type_),
2454 ", ".join(
2455 self.sql_compiler.process(sql.literal(e), literal_binds=True)
2456 for e in type_.enums
2457 ),
2458 )
2459
2460 def visit_drop_enum_type(self, drop, **kw):
2461 type_ = drop.element
2462
2463 return "DROP TYPE %s" % (self.preparer.format_type(type_))
2464
2465 def visit_create_domain_type(self, create, **kw):
2466 domain: DOMAIN = create.element
2467
2468 options = []
2469 if domain.collation is not None:
2470 options.append(f"COLLATE {self.preparer.quote(domain.collation)}")
2471 if domain.default is not None:
2472 default = self.render_default_string(domain.default)
2473 options.append(f"DEFAULT {default}")
2474 if domain.constraint_name is not None:
2475 name = self.preparer.truncate_and_render_constraint_name(
2476 domain.constraint_name
2477 )
2478 options.append(f"CONSTRAINT {name}")
2479 if domain.not_null:
2480 options.append("NOT NULL")
2481 if domain.check is not None:
2482 check = self.sql_compiler.process(
2483 domain.check, include_table=False, literal_binds=True
2484 )
2485 options.append(f"CHECK ({check})")
2486
2487 return (
2488 f"CREATE DOMAIN {self.preparer.format_type(domain)} AS "
2489 f"{self.type_compiler.process(domain.data_type)} "
2490 f"{' '.join(options)}"
2491 )
2492
2493 def visit_drop_domain_type(self, drop, **kw):
2494 domain = drop.element
2495 return f"DROP DOMAIN {self.preparer.format_type(domain)}"
2496
2497 def visit_create_index(self, create, **kw):
2498 preparer = self.preparer
2499 index = create.element
2500 self._verify_index_table(index)
2501 text = "CREATE "
2502 if index.unique:
2503 text += "UNIQUE "
2504
2505 text += "INDEX "
2506
2507 if self.dialect._supports_create_index_concurrently:
2508 concurrently = index.dialect_options["postgresql"]["concurrently"]
2509 if concurrently:
2510 text += "CONCURRENTLY "
2511
2512 if create.if_not_exists:
2513 text += "IF NOT EXISTS "
2514
2515 text += "%s ON %s " % (
2516 self._prepared_index_name(index, include_schema=False),
2517 preparer.format_table(index.table),
2518 )
2519
2520 using = index.dialect_options["postgresql"]["using"]
2521 if using:
2522 text += (
2523 "USING %s "
2524 % self.preparer.validate_sql_phrase(using, IDX_USING).lower()
2525 )
2526
2527 ops = index.dialect_options["postgresql"]["ops"]
2528 text += "(%s)" % (
2529 ", ".join(
2530 [
2531 self.sql_compiler.process(
2532 (
2533 expr.self_group()
2534 if not isinstance(expr, expression.ColumnClause)
2535 else expr
2536 ),
2537 include_table=False,
2538 literal_binds=True,
2539 )
2540 + (
2541 (" " + ops[expr.key])
2542 if hasattr(expr, "key") and expr.key in ops
2543 else ""
2544 )
2545 for expr in index.expressions
2546 ]
2547 )
2548 )
2549
2550 text += self._define_include(index)
2551
2552 nulls_not_distinct = index.dialect_options["postgresql"][
2553 "nulls_not_distinct"
2554 ]
2555 if nulls_not_distinct is True:
2556 text += " NULLS NOT DISTINCT"
2557 elif nulls_not_distinct is False:
2558 text += " NULLS DISTINCT"
2559
2560 withclause = index.dialect_options["postgresql"]["with"]
2561 if withclause:
2562 text += " WITH (%s)" % (
2563 ", ".join(
2564 [
2565 "%s = %s" % storage_parameter
2566 for storage_parameter in withclause.items()
2567 ]
2568 )
2569 )
2570
2571 tablespace_name = index.dialect_options["postgresql"]["tablespace"]
2572 if tablespace_name:
2573 text += " TABLESPACE %s" % preparer.quote(tablespace_name)
2574
2575 whereclause = index.dialect_options["postgresql"]["where"]
2576 if whereclause is not None:
2577 whereclause = coercions.expect(
2578 roles.DDLExpressionRole, whereclause
2579 )
2580
2581 where_compiled = self.sql_compiler.process(
2582 whereclause, include_table=False, literal_binds=True
2583 )
2584 text += " WHERE " + where_compiled
2585
2586 return text
2587
2588 def define_unique_constraint_distinct(self, constraint, **kw):
2589 nulls_not_distinct = constraint.dialect_options["postgresql"][
2590 "nulls_not_distinct"
2591 ]
2592 if nulls_not_distinct is True:
2593 nulls_not_distinct_param = "NULLS NOT DISTINCT "
2594 elif nulls_not_distinct is False:
2595 nulls_not_distinct_param = "NULLS DISTINCT "
2596 else:
2597 nulls_not_distinct_param = ""
2598 return nulls_not_distinct_param
2599
2600 def visit_drop_index(self, drop, **kw):
2601 index = drop.element
2602
2603 text = "\nDROP INDEX "
2604
2605 if self.dialect._supports_drop_index_concurrently:
2606 concurrently = index.dialect_options["postgresql"]["concurrently"]
2607 if concurrently:
2608 text += "CONCURRENTLY "
2609
2610 if drop.if_exists:
2611 text += "IF EXISTS "
2612
2613 text += self._prepared_index_name(index, include_schema=True)
2614 return text
2615
2616 def visit_exclude_constraint(self, constraint, **kw):
2617 text = ""
2618 if constraint.name is not None:
2619 text += "CONSTRAINT %s " % self.preparer.format_constraint(
2620 constraint
2621 )
2622 elements = []
2623 kw["include_table"] = False
2624 kw["literal_binds"] = True
2625 for expr, name, op in constraint._render_exprs:
2626 exclude_element = self.sql_compiler.process(expr, **kw) + (
2627 (" " + constraint.ops[expr.key])
2628 if hasattr(expr, "key") and expr.key in constraint.ops
2629 else ""
2630 )
2631
2632 elements.append("%s WITH %s" % (exclude_element, op))
2633 text += "EXCLUDE USING %s (%s)" % (
2634 self.preparer.validate_sql_phrase(
2635 constraint.using, IDX_USING
2636 ).lower(),
2637 ", ".join(elements),
2638 )
2639 if constraint.where is not None:
2640 text += " WHERE (%s)" % self.sql_compiler.process(
2641 constraint.where, literal_binds=True
2642 )
2643 text += self.define_constraint_deferrability(constraint)
2644 return text
2645
2646 def post_create_table(self, table):
2647 table_opts = []
2648 pg_opts = table.dialect_options["postgresql"]
2649
2650 inherits = pg_opts.get("inherits")
2651 if inherits is not None:
2652 if not isinstance(inherits, (list, tuple)):
2653 inherits = (inherits,)
2654 table_opts.append(
2655 "\n INHERITS ( "
2656 + ", ".join(self.preparer.quote(name) for name in inherits)
2657 + " )"
2658 )
2659
2660 if pg_opts["partition_by"]:
2661 table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"])
2662
2663 if pg_opts["using"]:
2664 table_opts.append("\n USING %s" % pg_opts["using"])
2665
2666 if pg_opts["with_oids"] is True:
2667 table_opts.append("\n WITH OIDS")
2668 elif pg_opts["with_oids"] is False:
2669 table_opts.append("\n WITHOUT OIDS")
2670
2671 if pg_opts["on_commit"]:
2672 on_commit_options = pg_opts["on_commit"].replace("_", " ").upper()
2673 table_opts.append("\n ON COMMIT %s" % on_commit_options)
2674
2675 if pg_opts["tablespace"]:
2676 tablespace_name = pg_opts["tablespace"]
2677 table_opts.append(
2678 "\n TABLESPACE %s" % self.preparer.quote(tablespace_name)
2679 )
2680
2681 return "".join(table_opts)
2682
2683 def visit_computed_column(self, generated, **kw):
2684 if generated.persisted is False:
2685 raise exc.CompileError(
2686 "PostrgreSQL computed columns do not support 'virtual' "
2687 "persistence; set the 'persisted' flag to None or True for "
2688 "PostgreSQL support."
2689 )
2690
2691 return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process(
2692 generated.sqltext, include_table=False, literal_binds=True
2693 )
2694
2695 def visit_create_sequence(self, create, **kw):
2696 prefix = None
2697 if create.element.data_type is not None:
2698 prefix = " AS %s" % self.type_compiler.process(
2699 create.element.data_type
2700 )
2701
2702 return super().visit_create_sequence(create, prefix=prefix, **kw)
2703
2704 def _can_comment_on_constraint(self, ddl_instance):
2705 constraint = ddl_instance.element
2706 if constraint.name is None:
2707 raise exc.CompileError(
2708 f"Can't emit COMMENT ON for constraint {constraint!r}: "
2709 "it has no name"
2710 )
2711 if constraint.table is None:
2712 raise exc.CompileError(
2713 f"Can't emit COMMENT ON for constraint {constraint!r}: "
2714 "it has no associated table"
2715 )
2716
2717 def visit_set_constraint_comment(self, create, **kw):
2718 self._can_comment_on_constraint(create)
2719 return "COMMENT ON CONSTRAINT %s ON %s IS %s" % (
2720 self.preparer.format_constraint(create.element),
2721 self.preparer.format_table(create.element.table),
2722 self.sql_compiler.render_literal_value(
2723 create.element.comment, sqltypes.String()
2724 ),
2725 )
2726
2727 def visit_drop_constraint_comment(self, drop, **kw):
2728 self._can_comment_on_constraint(drop)
2729 return "COMMENT ON CONSTRAINT %s ON %s IS NULL" % (
2730 self.preparer.format_constraint(drop.element),
2731 self.preparer.format_table(drop.element.table),
2732 )
2733
2734
2735class PGTypeCompiler(compiler.GenericTypeCompiler):
2736 def visit_TSVECTOR(self, type_, **kw):
2737 return "TSVECTOR"
2738
2739 def visit_TSQUERY(self, type_, **kw):
2740 return "TSQUERY"
2741
2742 def visit_INET(self, type_, **kw):
2743 return "INET"
2744
2745 def visit_CIDR(self, type_, **kw):
2746 return "CIDR"
2747
2748 def visit_CITEXT(self, type_, **kw):
2749 return "CITEXT"
2750
2751 def visit_MACADDR(self, type_, **kw):
2752 return "MACADDR"
2753
2754 def visit_MACADDR8(self, type_, **kw):
2755 return "MACADDR8"
2756
2757 def visit_MONEY(self, type_, **kw):
2758 return "MONEY"
2759
2760 def visit_OID(self, type_, **kw):
2761 return "OID"
2762
2763 def visit_REGCONFIG(self, type_, **kw):
2764 return "REGCONFIG"
2765
2766 def visit_REGCLASS(self, type_, **kw):
2767 return "REGCLASS"
2768
2769 def visit_FLOAT(self, type_, **kw):
2770 if not type_.precision:
2771 return "FLOAT"
2772 else:
2773 return "FLOAT(%(precision)s)" % {"precision": type_.precision}
2774
2775 def visit_double(self, type_, **kw):
2776 return self.visit_DOUBLE_PRECISION(type, **kw)
2777
2778 def visit_BIGINT(self, type_, **kw):
2779 return "BIGINT"
2780
2781 def visit_HSTORE(self, type_, **kw):
2782 return "HSTORE"
2783
2784 def visit_JSON(self, type_, **kw):
2785 return "JSON"
2786
2787 def visit_JSONB(self, type_, **kw):
2788 return "JSONB"
2789
2790 def visit_INT4MULTIRANGE(self, type_, **kw):
2791 return "INT4MULTIRANGE"
2792
2793 def visit_INT8MULTIRANGE(self, type_, **kw):
2794 return "INT8MULTIRANGE"
2795
2796 def visit_NUMMULTIRANGE(self, type_, **kw):
2797 return "NUMMULTIRANGE"
2798
2799 def visit_DATEMULTIRANGE(self, type_, **kw):
2800 return "DATEMULTIRANGE"
2801
2802 def visit_TSMULTIRANGE(self, type_, **kw):
2803 return "TSMULTIRANGE"
2804
2805 def visit_TSTZMULTIRANGE(self, type_, **kw):
2806 return "TSTZMULTIRANGE"
2807
2808 def visit_INT4RANGE(self, type_, **kw):
2809 return "INT4RANGE"
2810
2811 def visit_INT8RANGE(self, type_, **kw):
2812 return "INT8RANGE"
2813
2814 def visit_NUMRANGE(self, type_, **kw):
2815 return "NUMRANGE"
2816
2817 def visit_DATERANGE(self, type_, **kw):
2818 return "DATERANGE"
2819
2820 def visit_TSRANGE(self, type_, **kw):
2821 return "TSRANGE"
2822
2823 def visit_TSTZRANGE(self, type_, **kw):
2824 return "TSTZRANGE"
2825
2826 def visit_json_int_index(self, type_, **kw):
2827 return "INT"
2828
2829 def visit_json_str_index(self, type_, **kw):
2830 return "TEXT"
2831
2832 def visit_datetime(self, type_, **kw):
2833 return self.visit_TIMESTAMP(type_, **kw)
2834
2835 def visit_enum(self, type_, **kw):
2836 if not type_.native_enum or not self.dialect.supports_native_enum:
2837 return super().visit_enum(type_, **kw)
2838 else:
2839 return self.visit_ENUM(type_, **kw)
2840
2841 def visit_ENUM(self, type_, identifier_preparer=None, **kw):
2842 if identifier_preparer is None:
2843 identifier_preparer = self.dialect.identifier_preparer
2844 return identifier_preparer.format_type(type_)
2845
2846 def visit_DOMAIN(self, type_, identifier_preparer=None, **kw):
2847 if identifier_preparer is None:
2848 identifier_preparer = self.dialect.identifier_preparer
2849 return identifier_preparer.format_type(type_)
2850
2851 def visit_TIMESTAMP(self, type_, **kw):
2852 return "TIMESTAMP%s %s" % (
2853 (
2854 "(%d)" % type_.precision
2855 if getattr(type_, "precision", None) is not None
2856 else ""
2857 ),
2858 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
2859 )
2860
2861 def visit_TIME(self, type_, **kw):
2862 return "TIME%s %s" % (
2863 (
2864 "(%d)" % type_.precision
2865 if getattr(type_, "precision", None) is not None
2866 else ""
2867 ),
2868 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
2869 )
2870
2871 def visit_INTERVAL(self, type_, **kw):
2872 text = "INTERVAL"
2873 if type_.fields is not None:
2874 text += " " + type_.fields
2875 if type_.precision is not None:
2876 text += " (%d)" % type_.precision
2877 return text
2878
2879 def visit_BIT(self, type_, **kw):
2880 if type_.varying:
2881 compiled = "BIT VARYING"
2882 if type_.length is not None:
2883 compiled += "(%d)" % type_.length
2884 else:
2885 compiled = "BIT(%d)" % type_.length
2886 return compiled
2887
2888 def visit_uuid(self, type_, **kw):
2889 if type_.native_uuid:
2890 return self.visit_UUID(type_, **kw)
2891 else:
2892 return super().visit_uuid(type_, **kw)
2893
2894 def visit_UUID(self, type_, **kw):
2895 return "UUID"
2896
2897 def visit_large_binary(self, type_, **kw):
2898 return self.visit_BYTEA(type_, **kw)
2899
2900 def visit_BYTEA(self, type_, **kw):
2901 return "BYTEA"
2902
2903 def visit_ARRAY(self, type_, **kw):
2904 inner = self.process(type_.item_type, **kw)
2905 return re.sub(
2906 r"((?: COLLATE.*)?)$",
2907 (
2908 r"%s\1"
2909 % (
2910 "[]"
2911 * (type_.dimensions if type_.dimensions is not None else 1)
2912 )
2913 ),
2914 inner,
2915 count=1,
2916 )
2917
2918 def visit_json_path(self, type_, **kw):
2919 return self.visit_JSONPATH(type_, **kw)
2920
2921 def visit_JSONPATH(self, type_, **kw):
2922 return "JSONPATH"
2923
2924
2925class PGIdentifierPreparer(compiler.IdentifierPreparer):
2926 reserved_words = RESERVED_WORDS
2927
2928 def _unquote_identifier(self, value):
2929 if value[0] == self.initial_quote:
2930 value = value[1:-1].replace(
2931 self.escape_to_quote, self.escape_quote
2932 )
2933 return value
2934
2935 def format_type(self, type_, use_schema=True):
2936 if not type_.name:
2937 raise exc.CompileError(
2938 f"PostgreSQL {type_.__class__.__name__} type requires a name."
2939 )
2940
2941 name = self.quote(type_.name)
2942 effective_schema = self.schema_for_object(type_)
2943
2944 if (
2945 not self.omit_schema
2946 and use_schema
2947 and effective_schema is not None
2948 ):
2949 name = f"{self.quote_schema(effective_schema)}.{name}"
2950 return name
2951
2952
2953class ReflectedNamedType(TypedDict):
2954 """Represents a reflected named type."""
2955
2956 name: str
2957 """Name of the type."""
2958 schema: str
2959 """The schema of the type."""
2960 visible: bool
2961 """Indicates if this type is in the current search path."""
2962
2963
2964class ReflectedDomainConstraint(TypedDict):
2965 """Represents a reflect check constraint of a domain."""
2966
2967 name: str
2968 """Name of the constraint."""
2969 check: str
2970 """The check constraint text."""
2971
2972
2973class ReflectedDomain(ReflectedNamedType):
2974 """Represents a reflected enum."""
2975
2976 type: str
2977 """The string name of the underlying data type of the domain."""
2978 nullable: bool
2979 """Indicates if the domain allows null or not."""
2980 default: Optional[str]
2981 """The string representation of the default value of this domain
2982 or ``None`` if none present.
2983 """
2984 constraints: List[ReflectedDomainConstraint]
2985 """The constraints defined in the domain, if any.
2986 The constraint are in order of evaluation by postgresql.
2987 """
2988 collation: Optional[str]
2989 """The collation for the domain."""
2990
2991
2992class ReflectedEnum(ReflectedNamedType):
2993 """Represents a reflected enum."""
2994
2995 labels: List[str]
2996 """The labels that compose the enum."""
2997
2998
2999class PGInspector(reflection.Inspector):
3000 dialect: PGDialect
3001
3002 def get_table_oid(
3003 self, table_name: str, schema: Optional[str] = None
3004 ) -> int:
3005 """Return the OID for the given table name.
3006
3007 :param table_name: string name of the table. For special quoting,
3008 use :class:`.quoted_name`.
3009
3010 :param schema: string schema name; if omitted, uses the default schema
3011 of the database connection. For special quoting,
3012 use :class:`.quoted_name`.
3013
3014 """
3015
3016 with self._operation_context() as conn:
3017 return self.dialect.get_table_oid(
3018 conn, table_name, schema, info_cache=self.info_cache
3019 )
3020
3021 def get_domains(
3022 self, schema: Optional[str] = None
3023 ) -> List[ReflectedDomain]:
3024 """Return a list of DOMAIN objects.
3025
3026 Each member is a dictionary containing these fields:
3027
3028 * name - name of the domain
3029 * schema - the schema name for the domain.
3030 * visible - boolean, whether or not this domain is visible
3031 in the default search path.
3032 * type - the type defined by this domain.
3033 * nullable - Indicates if this domain can be ``NULL``.
3034 * default - The default value of the domain or ``None`` if the
3035 domain has no default.
3036 * constraints - A list of dict with the constraint defined by this
3037 domain. Each element contains two keys: ``name`` of the
3038 constraint and ``check`` with the constraint text.
3039
3040 :param schema: schema name. If None, the default schema
3041 (typically 'public') is used. May also be set to ``'*'`` to
3042 indicate load domains for all schemas.
3043
3044 .. versionadded:: 2.0
3045
3046 """
3047 with self._operation_context() as conn:
3048 return self.dialect._load_domains(
3049 conn, schema, info_cache=self.info_cache
3050 )
3051
3052 def get_enums(self, schema: Optional[str] = None) -> List[ReflectedEnum]:
3053 """Return a list of ENUM objects.
3054
3055 Each member is a dictionary containing these fields:
3056
3057 * name - name of the enum
3058 * schema - the schema name for the enum.
3059 * visible - boolean, whether or not this enum is visible
3060 in the default search path.
3061 * labels - a list of string labels that apply to the enum.
3062
3063 :param schema: schema name. If None, the default schema
3064 (typically 'public') is used. May also be set to ``'*'`` to
3065 indicate load enums for all schemas.
3066
3067 """
3068 with self._operation_context() as conn:
3069 return self.dialect._load_enums(
3070 conn, schema, info_cache=self.info_cache
3071 )
3072
3073 def get_foreign_table_names(
3074 self, schema: Optional[str] = None
3075 ) -> List[str]:
3076 """Return a list of FOREIGN TABLE names.
3077
3078 Behavior is similar to that of
3079 :meth:`_reflection.Inspector.get_table_names`,
3080 except that the list is limited to those tables that report a
3081 ``relkind`` value of ``f``.
3082
3083 """
3084 with self._operation_context() as conn:
3085 return self.dialect._get_foreign_table_names(
3086 conn, schema, info_cache=self.info_cache
3087 )
3088
3089 def has_type(
3090 self, type_name: str, schema: Optional[str] = None, **kw: Any
3091 ) -> bool:
3092 """Return if the database has the specified type in the provided
3093 schema.
3094
3095 :param type_name: the type to check.
3096 :param schema: schema name. If None, the default schema
3097 (typically 'public') is used. May also be set to ``'*'`` to
3098 check in all schemas.
3099
3100 .. versionadded:: 2.0
3101
3102 """
3103 with self._operation_context() as conn:
3104 return self.dialect.has_type(
3105 conn, type_name, schema, info_cache=self.info_cache
3106 )
3107
3108
3109class PGExecutionContext(default.DefaultExecutionContext):
3110 def fire_sequence(self, seq, type_):
3111 return self._execute_scalar(
3112 (
3113 "select nextval('%s')"
3114 % self.identifier_preparer.format_sequence(seq)
3115 ),
3116 type_,
3117 )
3118
3119 def get_insert_default(self, column):
3120 if column.primary_key and column is column.table._autoincrement_column:
3121 if column.server_default and column.server_default.has_argument:
3122 # pre-execute passive defaults on primary key columns
3123 return self._execute_scalar(
3124 "select %s" % column.server_default.arg, column.type
3125 )
3126
3127 elif column.default is None or (
3128 column.default.is_sequence and column.default.optional
3129 ):
3130 # execute the sequence associated with a SERIAL primary
3131 # key column. for non-primary-key SERIAL, the ID just
3132 # generates server side.
3133
3134 try:
3135 seq_name = column._postgresql_seq_name
3136 except AttributeError:
3137 tab = column.table.name
3138 col = column.name
3139 tab = tab[0 : 29 + max(0, (29 - len(col)))]
3140 col = col[0 : 29 + max(0, (29 - len(tab)))]
3141 name = "%s_%s_seq" % (tab, col)
3142 column._postgresql_seq_name = seq_name = name
3143
3144 if column.table is not None:
3145 effective_schema = self.connection.schema_for_object(
3146 column.table
3147 )
3148 else:
3149 effective_schema = None
3150
3151 if effective_schema is not None:
3152 exc = 'select nextval(\'"%s"."%s"\')' % (
3153 effective_schema,
3154 seq_name,
3155 )
3156 else:
3157 exc = "select nextval('\"%s\"')" % (seq_name,)
3158
3159 return self._execute_scalar(exc, column.type)
3160
3161 return super().get_insert_default(column)
3162
3163
3164class PGReadOnlyConnectionCharacteristic(
3165 characteristics.ConnectionCharacteristic
3166):
3167 transactional = True
3168
3169 def reset_characteristic(self, dialect, dbapi_conn):
3170 dialect.set_readonly(dbapi_conn, False)
3171
3172 def set_characteristic(self, dialect, dbapi_conn, value):
3173 dialect.set_readonly(dbapi_conn, value)
3174
3175 def get_characteristic(self, dialect, dbapi_conn):
3176 return dialect.get_readonly(dbapi_conn)
3177
3178
3179class PGDeferrableConnectionCharacteristic(
3180 characteristics.ConnectionCharacteristic
3181):
3182 transactional = True
3183
3184 def reset_characteristic(self, dialect, dbapi_conn):
3185 dialect.set_deferrable(dbapi_conn, False)
3186
3187 def set_characteristic(self, dialect, dbapi_conn, value):
3188 dialect.set_deferrable(dbapi_conn, value)
3189
3190 def get_characteristic(self, dialect, dbapi_conn):
3191 return dialect.get_deferrable(dbapi_conn)
3192
3193
3194class PGDialect(default.DefaultDialect):
3195 name = "postgresql"
3196 supports_statement_cache = True
3197 supports_alter = True
3198 max_identifier_length = 63
3199 supports_sane_rowcount = True
3200
3201 bind_typing = interfaces.BindTyping.RENDER_CASTS
3202
3203 supports_native_enum = True
3204 supports_native_boolean = True
3205 supports_native_uuid = True
3206 supports_smallserial = True
3207
3208 supports_sequences = True
3209 sequences_optional = True
3210 preexecute_autoincrement_sequences = True
3211 postfetch_lastrowid = False
3212 use_insertmanyvalues = True
3213
3214 returns_native_bytes = True
3215
3216 insertmanyvalues_implicit_sentinel = (
3217 InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
3218 | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
3219 | InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
3220 )
3221
3222 supports_comments = True
3223 supports_constraint_comments = True
3224 supports_default_values = True
3225
3226 supports_default_metavalue = True
3227
3228 supports_empty_insert = False
3229 supports_multivalues_insert = True
3230
3231 supports_identity_columns = True
3232
3233 default_paramstyle = "pyformat"
3234 ischema_names = ischema_names
3235 colspecs = colspecs
3236
3237 statement_compiler = PGCompiler
3238 ddl_compiler = PGDDLCompiler
3239 type_compiler_cls = PGTypeCompiler
3240 preparer = PGIdentifierPreparer
3241 execution_ctx_cls = PGExecutionContext
3242 inspector = PGInspector
3243
3244 update_returning = True
3245 delete_returning = True
3246 insert_returning = True
3247 update_returning_multifrom = True
3248 delete_returning_multifrom = True
3249
3250 connection_characteristics = (
3251 default.DefaultDialect.connection_characteristics
3252 )
3253 connection_characteristics = connection_characteristics.union(
3254 {
3255 "postgresql_readonly": PGReadOnlyConnectionCharacteristic(),
3256 "postgresql_deferrable": PGDeferrableConnectionCharacteristic(),
3257 }
3258 )
3259
3260 construct_arguments = [
3261 (
3262 schema.Index,
3263 {
3264 "using": False,
3265 "include": None,
3266 "where": None,
3267 "ops": {},
3268 "concurrently": False,
3269 "with": {},
3270 "tablespace": None,
3271 "nulls_not_distinct": None,
3272 },
3273 ),
3274 (
3275 schema.Table,
3276 {
3277 "ignore_search_path": False,
3278 "tablespace": None,
3279 "partition_by": None,
3280 "with_oids": None,
3281 "on_commit": None,
3282 "inherits": None,
3283 "using": None,
3284 },
3285 ),
3286 (
3287 schema.CheckConstraint,
3288 {
3289 "not_valid": False,
3290 },
3291 ),
3292 (
3293 schema.ForeignKeyConstraint,
3294 {
3295 "not_valid": False,
3296 },
3297 ),
3298 (
3299 schema.PrimaryKeyConstraint,
3300 {"include": None},
3301 ),
3302 (
3303 schema.UniqueConstraint,
3304 {
3305 "include": None,
3306 "nulls_not_distinct": None,
3307 },
3308 ),
3309 ]
3310
3311 reflection_options = ("postgresql_ignore_search_path",)
3312
3313 _backslash_escapes = True
3314 _supports_create_index_concurrently = True
3315 _supports_drop_index_concurrently = True
3316 _supports_jsonb_subscripting = True
3317
3318 def __init__(
3319 self,
3320 native_inet_types=None,
3321 json_serializer=None,
3322 json_deserializer=None,
3323 **kwargs,
3324 ):
3325 default.DefaultDialect.__init__(self, **kwargs)
3326
3327 self._native_inet_types = native_inet_types
3328 self._json_deserializer = json_deserializer
3329 self._json_serializer = json_serializer
3330
3331 def initialize(self, connection):
3332 super().initialize(connection)
3333
3334 # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689
3335 self.supports_smallserial = self.server_version_info >= (9, 2)
3336
3337 self._set_backslash_escapes(connection)
3338
3339 self._supports_drop_index_concurrently = self.server_version_info >= (
3340 9,
3341 2,
3342 )
3343 self.supports_identity_columns = self.server_version_info >= (10,)
3344
3345 self._supports_jsonb_subscripting = self.server_version_info >= (14,)
3346
3347 def get_isolation_level_values(self, dbapi_conn):
3348 # note the generic dialect doesn't have AUTOCOMMIT, however
3349 # all postgresql dialects should include AUTOCOMMIT.
3350 return (
3351 "SERIALIZABLE",
3352 "READ UNCOMMITTED",
3353 "READ COMMITTED",
3354 "REPEATABLE READ",
3355 )
3356
3357 def set_isolation_level(self, dbapi_connection, level):
3358 cursor = dbapi_connection.cursor()
3359 cursor.execute(
3360 "SET SESSION CHARACTERISTICS AS TRANSACTION "
3361 f"ISOLATION LEVEL {level}"
3362 )
3363 cursor.execute("COMMIT")
3364 cursor.close()
3365
3366 def get_isolation_level(self, dbapi_connection):
3367 cursor = dbapi_connection.cursor()
3368 cursor.execute("show transaction isolation level")
3369 val = cursor.fetchone()[0]
3370 cursor.close()
3371 return val.upper()
3372
3373 def set_readonly(self, connection, value):
3374 raise NotImplementedError()
3375
3376 def get_readonly(self, connection):
3377 raise NotImplementedError()
3378
3379 def set_deferrable(self, connection, value):
3380 raise NotImplementedError()
3381
3382 def get_deferrable(self, connection):
3383 raise NotImplementedError()
3384
3385 def _split_multihost_from_url(self, url: URL) -> Union[
3386 Tuple[None, None],
3387 Tuple[Tuple[Optional[str], ...], Tuple[Optional[int], ...]],
3388 ]:
3389 hosts: Optional[Tuple[Optional[str], ...]] = None
3390 ports_str: Union[str, Tuple[Optional[str], ...], None] = None
3391
3392 integrated_multihost = False
3393
3394 if "host" in url.query:
3395 if isinstance(url.query["host"], (list, tuple)):
3396 integrated_multihost = True
3397 hosts, ports_str = zip(
3398 *[
3399 token.split(":") if ":" in token else (token, None)
3400 for token in url.query["host"]
3401 ]
3402 )
3403
3404 elif isinstance(url.query["host"], str):
3405 hosts = tuple(url.query["host"].split(","))
3406
3407 if (
3408 "port" not in url.query
3409 and len(hosts) == 1
3410 and ":" in hosts[0]
3411 ):
3412 # internet host is alphanumeric plus dots or hyphens.
3413 # this is essentially rfc1123, which refers to rfc952.
3414 # https://stackoverflow.com/questions/3523028/
3415 # valid-characters-of-a-hostname
3416 host_port_match = re.match(
3417 r"^([a-zA-Z0-9\-\.]*)(?:\:(\d*))?$", hosts[0]
3418 )
3419 if host_port_match:
3420 integrated_multihost = True
3421 h, p = host_port_match.group(1, 2)
3422 if TYPE_CHECKING:
3423 assert isinstance(h, str)
3424 assert isinstance(p, str)
3425 hosts = (h,)
3426 ports_str = cast(
3427 "Tuple[Optional[str], ...]", (p,) if p else (None,)
3428 )
3429
3430 if "port" in url.query:
3431 if integrated_multihost:
3432 raise exc.ArgumentError(
3433 "Can't mix 'multihost' formats together; use "
3434 '"host=h1,h2,h3&port=p1,p2,p3" or '
3435 '"host=h1:p1&host=h2:p2&host=h3:p3" separately'
3436 )
3437 if isinstance(url.query["port"], (list, tuple)):
3438 ports_str = url.query["port"]
3439 elif isinstance(url.query["port"], str):
3440 ports_str = tuple(url.query["port"].split(","))
3441
3442 ports: Optional[Tuple[Optional[int], ...]] = None
3443
3444 if ports_str:
3445 try:
3446 ports = tuple(int(x) if x else None for x in ports_str)
3447 except ValueError:
3448 raise exc.ArgumentError(
3449 f"Received non-integer port arguments: {ports_str}"
3450 ) from None
3451
3452 if ports and (
3453 (not hosts and len(ports) > 1)
3454 or (
3455 hosts
3456 and ports
3457 and len(hosts) != len(ports)
3458 and (len(hosts) > 1 or len(ports) > 1)
3459 )
3460 ):
3461 raise exc.ArgumentError("number of hosts and ports don't match")
3462
3463 if hosts is not None:
3464 if ports is None:
3465 ports = tuple(None for _ in hosts)
3466
3467 return hosts, ports # type: ignore
3468
3469 def do_begin_twophase(self, connection, xid):
3470 self.do_begin(connection.connection)
3471
3472 def do_prepare_twophase(self, connection, xid):
3473 connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid)
3474
3475 def do_rollback_twophase(
3476 self, connection, xid, is_prepared=True, recover=False
3477 ):
3478 if is_prepared:
3479 if recover:
3480 # FIXME: ugly hack to get out of transaction
3481 # context when committing recoverable transactions
3482 # Must find out a way how to make the dbapi not
3483 # open a transaction.
3484 connection.exec_driver_sql("ROLLBACK")
3485 connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
3486 connection.exec_driver_sql("BEGIN")
3487 self.do_rollback(connection.connection)
3488 else:
3489 self.do_rollback(connection.connection)
3490
3491 def do_commit_twophase(
3492 self, connection, xid, is_prepared=True, recover=False
3493 ):
3494 if is_prepared:
3495 if recover:
3496 connection.exec_driver_sql("ROLLBACK")
3497 connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid)
3498 connection.exec_driver_sql("BEGIN")
3499 self.do_rollback(connection.connection)
3500 else:
3501 self.do_commit(connection.connection)
3502
3503 def do_recover_twophase(self, connection):
3504 return connection.scalars(
3505 sql.text("SELECT gid FROM pg_prepared_xacts")
3506 ).all()
3507
3508 def _get_default_schema_name(self, connection):
3509 return connection.exec_driver_sql("select current_schema()").scalar()
3510
3511 @reflection.cache
3512 def has_schema(self, connection, schema, **kw):
3513 query = select(pg_catalog.pg_namespace.c.nspname).where(
3514 pg_catalog.pg_namespace.c.nspname == schema
3515 )
3516 return bool(connection.scalar(query))
3517
3518 def _pg_class_filter_scope_schema(
3519 self, query, schema, scope, pg_class_table=None
3520 ):
3521 if pg_class_table is None:
3522 pg_class_table = pg_catalog.pg_class
3523 query = query.join(
3524 pg_catalog.pg_namespace,
3525 pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace,
3526 )
3527
3528 if scope is ObjectScope.DEFAULT:
3529 query = query.where(pg_class_table.c.relpersistence != "t")
3530 elif scope is ObjectScope.TEMPORARY:
3531 query = query.where(pg_class_table.c.relpersistence == "t")
3532
3533 if schema is None:
3534 query = query.where(
3535 pg_catalog.pg_table_is_visible(pg_class_table.c.oid),
3536 # ignore pg_catalog schema
3537 pg_catalog.pg_namespace.c.nspname != "pg_catalog",
3538 )
3539 else:
3540 query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
3541 return query
3542
3543 def _pg_class_relkind_condition(self, relkinds, pg_class_table=None):
3544 if pg_class_table is None:
3545 pg_class_table = pg_catalog.pg_class
3546 # uses the any form instead of in otherwise postgresql complaings
3547 # that 'IN could not convert type character to "char"'
3548 return pg_class_table.c.relkind == sql.any_(_array.array(relkinds))
3549
3550 @lru_cache()
3551 def _has_table_query(self, schema):
3552 query = select(pg_catalog.pg_class.c.relname).where(
3553 pg_catalog.pg_class.c.relname == bindparam("table_name"),
3554 self._pg_class_relkind_condition(
3555 pg_catalog.RELKINDS_ALL_TABLE_LIKE
3556 ),
3557 )
3558 return self._pg_class_filter_scope_schema(
3559 query, schema, scope=ObjectScope.ANY
3560 )
3561
3562 @reflection.cache
3563 def has_table(self, connection, table_name, schema=None, **kw):
3564 self._ensure_has_table_connection(connection)
3565 query = self._has_table_query(schema)
3566 return bool(connection.scalar(query, {"table_name": table_name}))
3567
3568 @reflection.cache
3569 def has_sequence(self, connection, sequence_name, schema=None, **kw):
3570 query = select(pg_catalog.pg_class.c.relname).where(
3571 pg_catalog.pg_class.c.relkind == "S",
3572 pg_catalog.pg_class.c.relname == sequence_name,
3573 )
3574 query = self._pg_class_filter_scope_schema(
3575 query, schema, scope=ObjectScope.ANY
3576 )
3577 return bool(connection.scalar(query))
3578
3579 @reflection.cache
3580 def has_type(self, connection, type_name, schema=None, **kw):
3581 query = (
3582 select(pg_catalog.pg_type.c.typname)
3583 .join(
3584 pg_catalog.pg_namespace,
3585 pg_catalog.pg_namespace.c.oid
3586 == pg_catalog.pg_type.c.typnamespace,
3587 )
3588 .where(pg_catalog.pg_type.c.typname == type_name)
3589 )
3590 if schema is None:
3591 query = query.where(
3592 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid),
3593 # ignore pg_catalog schema
3594 pg_catalog.pg_namespace.c.nspname != "pg_catalog",
3595 )
3596 elif schema != "*":
3597 query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
3598
3599 return bool(connection.scalar(query))
3600
3601 def _get_server_version_info(self, connection):
3602 v = connection.exec_driver_sql("select pg_catalog.version()").scalar()
3603 m = re.match(
3604 r".*(?:PostgreSQL|EnterpriseDB) "
3605 r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?",
3606 v,
3607 )
3608 if not m:
3609 raise AssertionError(
3610 "Could not determine version from string '%s'" % v
3611 )
3612 return tuple([int(x) for x in m.group(1, 2, 3) if x is not None])
3613
3614 @reflection.cache
3615 def get_table_oid(self, connection, table_name, schema=None, **kw):
3616 """Fetch the oid for schema.table_name."""
3617 query = select(pg_catalog.pg_class.c.oid).where(
3618 pg_catalog.pg_class.c.relname == table_name,
3619 self._pg_class_relkind_condition(
3620 pg_catalog.RELKINDS_ALL_TABLE_LIKE
3621 ),
3622 )
3623 query = self._pg_class_filter_scope_schema(
3624 query, schema, scope=ObjectScope.ANY
3625 )
3626 table_oid = connection.scalar(query)
3627 if table_oid is None:
3628 raise exc.NoSuchTableError(
3629 f"{schema}.{table_name}" if schema else table_name
3630 )
3631 return table_oid
3632
3633 @reflection.cache
3634 def get_schema_names(self, connection, **kw):
3635 query = (
3636 select(pg_catalog.pg_namespace.c.nspname)
3637 .where(pg_catalog.pg_namespace.c.nspname.not_like("pg_%"))
3638 .order_by(pg_catalog.pg_namespace.c.nspname)
3639 )
3640 return connection.scalars(query).all()
3641
3642 def _get_relnames_for_relkinds(self, connection, schema, relkinds, scope):
3643 query = select(pg_catalog.pg_class.c.relname).where(
3644 self._pg_class_relkind_condition(relkinds)
3645 )
3646 query = self._pg_class_filter_scope_schema(query, schema, scope=scope)
3647 return connection.scalars(query).all()
3648
3649 @reflection.cache
3650 def get_table_names(self, connection, schema=None, **kw):
3651 return self._get_relnames_for_relkinds(
3652 connection,
3653 schema,
3654 pg_catalog.RELKINDS_TABLE_NO_FOREIGN,
3655 scope=ObjectScope.DEFAULT,
3656 )
3657
3658 @reflection.cache
3659 def get_temp_table_names(self, connection, **kw):
3660 return self._get_relnames_for_relkinds(
3661 connection,
3662 schema=None,
3663 relkinds=pg_catalog.RELKINDS_TABLE_NO_FOREIGN,
3664 scope=ObjectScope.TEMPORARY,
3665 )
3666
3667 @reflection.cache
3668 def _get_foreign_table_names(self, connection, schema=None, **kw):
3669 return self._get_relnames_for_relkinds(
3670 connection, schema, relkinds=("f",), scope=ObjectScope.ANY
3671 )
3672
3673 @reflection.cache
3674 def get_view_names(self, connection, schema=None, **kw):
3675 return self._get_relnames_for_relkinds(
3676 connection,
3677 schema,
3678 pg_catalog.RELKINDS_VIEW,
3679 scope=ObjectScope.DEFAULT,
3680 )
3681
3682 @reflection.cache
3683 def get_materialized_view_names(self, connection, schema=None, **kw):
3684 return self._get_relnames_for_relkinds(
3685 connection,
3686 schema,
3687 pg_catalog.RELKINDS_MAT_VIEW,
3688 scope=ObjectScope.DEFAULT,
3689 )
3690
3691 @reflection.cache
3692 def get_temp_view_names(self, connection, schema=None, **kw):
3693 return self._get_relnames_for_relkinds(
3694 connection,
3695 schema,
3696 # NOTE: do not include temp materialzied views (that do not
3697 # seem to be a thing at least up to version 14)
3698 pg_catalog.RELKINDS_VIEW,
3699 scope=ObjectScope.TEMPORARY,
3700 )
3701
3702 @reflection.cache
3703 def get_sequence_names(self, connection, schema=None, **kw):
3704 return self._get_relnames_for_relkinds(
3705 connection, schema, relkinds=("S",), scope=ObjectScope.ANY
3706 )
3707
3708 @reflection.cache
3709 def get_view_definition(self, connection, view_name, schema=None, **kw):
3710 query = (
3711 select(pg_catalog.pg_get_viewdef(pg_catalog.pg_class.c.oid))
3712 .select_from(pg_catalog.pg_class)
3713 .where(
3714 pg_catalog.pg_class.c.relname == view_name,
3715 self._pg_class_relkind_condition(
3716 pg_catalog.RELKINDS_VIEW + pg_catalog.RELKINDS_MAT_VIEW
3717 ),
3718 )
3719 )
3720 query = self._pg_class_filter_scope_schema(
3721 query, schema, scope=ObjectScope.ANY
3722 )
3723 res = connection.scalar(query)
3724 if res is None:
3725 raise exc.NoSuchTableError(
3726 f"{schema}.{view_name}" if schema else view_name
3727 )
3728 else:
3729 return res
3730
3731 def _value_or_raise(self, data, table, schema):
3732 try:
3733 return dict(data)[(schema, table)]
3734 except KeyError:
3735 raise exc.NoSuchTableError(
3736 f"{schema}.{table}" if schema else table
3737 ) from None
3738
3739 def _prepare_filter_names(self, filter_names):
3740 if filter_names:
3741 return True, {"filter_names": filter_names}
3742 else:
3743 return False, {}
3744
3745 def _kind_to_relkinds(self, kind: ObjectKind) -> Tuple[str, ...]:
3746 if kind is ObjectKind.ANY:
3747 return pg_catalog.RELKINDS_ALL_TABLE_LIKE
3748 relkinds = ()
3749 if ObjectKind.TABLE in kind:
3750 relkinds += pg_catalog.RELKINDS_TABLE
3751 if ObjectKind.VIEW in kind:
3752 relkinds += pg_catalog.RELKINDS_VIEW
3753 if ObjectKind.MATERIALIZED_VIEW in kind:
3754 relkinds += pg_catalog.RELKINDS_MAT_VIEW
3755 return relkinds
3756
3757 @reflection.cache
3758 def get_columns(self, connection, table_name, schema=None, **kw):
3759 data = self.get_multi_columns(
3760 connection,
3761 schema=schema,
3762 filter_names=[table_name],
3763 scope=ObjectScope.ANY,
3764 kind=ObjectKind.ANY,
3765 **kw,
3766 )
3767 return self._value_or_raise(data, table_name, schema)
3768
3769 @lru_cache()
3770 def _columns_query(self, schema, has_filter_names, scope, kind):
3771 # NOTE: the query with the default and identity options scalar
3772 # subquery is faster than trying to use outer joins for them
3773 generated = (
3774 pg_catalog.pg_attribute.c.attgenerated.label("generated")
3775 if self.server_version_info >= (12,)
3776 else sql.null().label("generated")
3777 )
3778 if self.server_version_info >= (10,):
3779 # join lateral performs worse (~2x slower) than a scalar_subquery
3780 identity = (
3781 select(
3782 sql.func.json_build_object(
3783 "always",
3784 pg_catalog.pg_attribute.c.attidentity == "a",
3785 "start",
3786 pg_catalog.pg_sequence.c.seqstart,
3787 "increment",
3788 pg_catalog.pg_sequence.c.seqincrement,
3789 "minvalue",
3790 pg_catalog.pg_sequence.c.seqmin,
3791 "maxvalue",
3792 pg_catalog.pg_sequence.c.seqmax,
3793 "cache",
3794 pg_catalog.pg_sequence.c.seqcache,
3795 "cycle",
3796 pg_catalog.pg_sequence.c.seqcycle,
3797 type_=sqltypes.JSON(),
3798 )
3799 )
3800 .select_from(pg_catalog.pg_sequence)
3801 .where(
3802 # attidentity != '' is required or it will reflect also
3803 # serial columns as identity.
3804 pg_catalog.pg_attribute.c.attidentity != "",
3805 pg_catalog.pg_sequence.c.seqrelid
3806 == sql.cast(
3807 sql.cast(
3808 pg_catalog.pg_get_serial_sequence(
3809 sql.cast(
3810 sql.cast(
3811 pg_catalog.pg_attribute.c.attrelid,
3812 REGCLASS,
3813 ),
3814 TEXT,
3815 ),
3816 pg_catalog.pg_attribute.c.attname,
3817 ),
3818 REGCLASS,
3819 ),
3820 OID,
3821 ),
3822 )
3823 .correlate(pg_catalog.pg_attribute)
3824 .scalar_subquery()
3825 .label("identity_options")
3826 )
3827 else:
3828 identity = sql.null().label("identity_options")
3829
3830 # join lateral performs the same as scalar_subquery here
3831 default = (
3832 select(
3833 pg_catalog.pg_get_expr(
3834 pg_catalog.pg_attrdef.c.adbin,
3835 pg_catalog.pg_attrdef.c.adrelid,
3836 )
3837 )
3838 .select_from(pg_catalog.pg_attrdef)
3839 .where(
3840 pg_catalog.pg_attrdef.c.adrelid
3841 == pg_catalog.pg_attribute.c.attrelid,
3842 pg_catalog.pg_attrdef.c.adnum
3843 == pg_catalog.pg_attribute.c.attnum,
3844 pg_catalog.pg_attribute.c.atthasdef,
3845 )
3846 .correlate(pg_catalog.pg_attribute)
3847 .scalar_subquery()
3848 .label("default")
3849 )
3850
3851 # get the name of the collate when it's different from the default one
3852 collate = sql.case(
3853 (
3854 sql.and_(
3855 pg_catalog.pg_attribute.c.attcollation != 0,
3856 select(pg_catalog.pg_type.c.typcollation)
3857 .where(
3858 pg_catalog.pg_type.c.oid
3859 == pg_catalog.pg_attribute.c.atttypid,
3860 )
3861 .correlate(pg_catalog.pg_attribute)
3862 .scalar_subquery()
3863 != pg_catalog.pg_attribute.c.attcollation,
3864 ),
3865 select(pg_catalog.pg_collation.c.collname)
3866 .where(
3867 pg_catalog.pg_collation.c.oid
3868 == pg_catalog.pg_attribute.c.attcollation
3869 )
3870 .correlate(pg_catalog.pg_attribute)
3871 .scalar_subquery(),
3872 ),
3873 else_=sql.null(),
3874 ).label("collation")
3875
3876 relkinds = self._kind_to_relkinds(kind)
3877 query = (
3878 select(
3879 pg_catalog.pg_attribute.c.attname.label("name"),
3880 pg_catalog.format_type(
3881 pg_catalog.pg_attribute.c.atttypid,
3882 pg_catalog.pg_attribute.c.atttypmod,
3883 ).label("format_type"),
3884 default,
3885 pg_catalog.pg_attribute.c.attnotnull.label("not_null"),
3886 pg_catalog.pg_class.c.relname.label("table_name"),
3887 pg_catalog.pg_description.c.description.label("comment"),
3888 generated,
3889 identity,
3890 collate,
3891 )
3892 .select_from(pg_catalog.pg_class)
3893 # NOTE: postgresql support table with no user column, meaning
3894 # there is no row with pg_attribute.attnum > 0. use a left outer
3895 # join to avoid filtering these tables.
3896 .outerjoin(
3897 pg_catalog.pg_attribute,
3898 sql.and_(
3899 pg_catalog.pg_class.c.oid
3900 == pg_catalog.pg_attribute.c.attrelid,
3901 pg_catalog.pg_attribute.c.attnum > 0,
3902 ~pg_catalog.pg_attribute.c.attisdropped,
3903 ),
3904 )
3905 .outerjoin(
3906 pg_catalog.pg_description,
3907 sql.and_(
3908 pg_catalog.pg_description.c.objoid
3909 == pg_catalog.pg_attribute.c.attrelid,
3910 pg_catalog.pg_description.c.objsubid
3911 == pg_catalog.pg_attribute.c.attnum,
3912 ),
3913 )
3914 .where(self._pg_class_relkind_condition(relkinds))
3915 .order_by(
3916 pg_catalog.pg_class.c.relname, pg_catalog.pg_attribute.c.attnum
3917 )
3918 )
3919 query = self._pg_class_filter_scope_schema(query, schema, scope=scope)
3920 if has_filter_names:
3921 query = query.where(
3922 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
3923 )
3924 return query
3925
3926 def get_multi_columns(
3927 self, connection, schema, filter_names, scope, kind, **kw
3928 ):
3929 has_filter_names, params = self._prepare_filter_names(filter_names)
3930 query = self._columns_query(schema, has_filter_names, scope, kind)
3931 rows = connection.execute(query, params).mappings()
3932
3933 # dictionary with (name, ) if default search path or (schema, name)
3934 # as keys
3935 domains = {
3936 ((d["schema"], d["name"]) if not d["visible"] else (d["name"],)): d
3937 for d in self._load_domains(
3938 connection, schema="*", info_cache=kw.get("info_cache")
3939 )
3940 }
3941
3942 # dictionary with (name, ) if default search path or (schema, name)
3943 # as keys
3944 enums = dict(
3945 (
3946 ((rec["name"],), rec)
3947 if rec["visible"]
3948 else ((rec["schema"], rec["name"]), rec)
3949 )
3950 for rec in self._load_enums(
3951 connection, schema="*", info_cache=kw.get("info_cache")
3952 )
3953 )
3954
3955 columns = self._get_columns_info(rows, domains, enums, schema)
3956
3957 return columns.items()
3958
3959 _format_type_args_pattern = re.compile(r"\((.*)\)")
3960 _format_type_args_delim = re.compile(r"\s*,\s*")
3961 _format_array_spec_pattern = re.compile(r"((?:\[\])*)$")
3962
3963 def _reflect_type(
3964 self,
3965 format_type: Optional[str],
3966 domains: Dict[str, ReflectedDomain],
3967 enums: Dict[str, ReflectedEnum],
3968 type_description: str,
3969 collation: Optional[str],
3970 ) -> sqltypes.TypeEngine[Any]:
3971 """
3972 Attempts to reconstruct a column type defined in ischema_names based
3973 on the information available in the format_type.
3974
3975 If the `format_type` cannot be associated with a known `ischema_names`,
3976 it is treated as a reference to a known PostgreSQL named `ENUM` or
3977 `DOMAIN` type.
3978 """
3979 type_description = type_description or "unknown type"
3980 if format_type is None:
3981 util.warn(
3982 "PostgreSQL format_type() returned NULL for %s"
3983 % type_description
3984 )
3985 return sqltypes.NULLTYPE
3986
3987 attype_args_match = self._format_type_args_pattern.search(format_type)
3988 if attype_args_match and attype_args_match.group(1):
3989 attype_args = self._format_type_args_delim.split(
3990 attype_args_match.group(1)
3991 )
3992 else:
3993 attype_args = ()
3994
3995 match_array_dim = self._format_array_spec_pattern.search(format_type)
3996 # Each "[]" in array specs corresponds to an array dimension
3997 array_dim = len(match_array_dim.group(1) or "") // 2
3998
3999 # Remove all parameters and array specs from format_type to obtain an
4000 # ischema_name candidate
4001 attype = self._format_type_args_pattern.sub("", format_type)
4002 attype = self._format_array_spec_pattern.sub("", attype)
4003
4004 schema_type = self.ischema_names.get(attype.lower(), None)
4005 args, kwargs = (), {}
4006
4007 if attype == "numeric":
4008 if len(attype_args) == 2:
4009 precision, scale = map(int, attype_args)
4010 args = (precision, scale)
4011
4012 elif attype == "double precision":
4013 args = (53,)
4014
4015 elif attype == "integer":
4016 args = ()
4017
4018 elif attype in ("timestamp with time zone", "time with time zone"):
4019 kwargs["timezone"] = True
4020 if len(attype_args) == 1:
4021 kwargs["precision"] = int(attype_args[0])
4022
4023 elif attype in (
4024 "timestamp without time zone",
4025 "time without time zone",
4026 "time",
4027 ):
4028 kwargs["timezone"] = False
4029 if len(attype_args) == 1:
4030 kwargs["precision"] = int(attype_args[0])
4031
4032 elif attype == "bit varying":
4033 kwargs["varying"] = True
4034 if len(attype_args) == 1:
4035 charlen = int(attype_args[0])
4036 args = (charlen,)
4037
4038 # a domain or enum can start with interval, so be mindful of that.
4039 elif attype == "interval" or attype.startswith("interval "):
4040 schema_type = INTERVAL
4041
4042 field_match = re.match(r"interval (.+)", attype)
4043 if field_match:
4044 kwargs["fields"] = field_match.group(1)
4045
4046 if len(attype_args) == 1:
4047 kwargs["precision"] = int(attype_args[0])
4048
4049 else:
4050 enum_or_domain_key = tuple(util.quoted_token_parser(attype))
4051
4052 if enum_or_domain_key in enums:
4053 schema_type = ENUM
4054 enum = enums[enum_or_domain_key]
4055
4056 kwargs["name"] = enum["name"]
4057
4058 if not enum["visible"]:
4059 kwargs["schema"] = enum["schema"]
4060 args = tuple(enum["labels"])
4061 elif enum_or_domain_key in domains:
4062 schema_type = DOMAIN
4063 domain = domains[enum_or_domain_key]
4064
4065 data_type = self._reflect_type(
4066 domain["type"],
4067 domains,
4068 enums,
4069 type_description="DOMAIN '%s'" % domain["name"],
4070 collation=domain["collation"],
4071 )
4072 args = (domain["name"], data_type)
4073
4074 kwargs["collation"] = domain["collation"]
4075 kwargs["default"] = domain["default"]
4076 kwargs["not_null"] = not domain["nullable"]
4077 kwargs["create_type"] = False
4078
4079 if domain["constraints"]:
4080 # We only support a single constraint
4081 check_constraint = domain["constraints"][0]
4082
4083 kwargs["constraint_name"] = check_constraint["name"]
4084 kwargs["check"] = check_constraint["check"]
4085
4086 if not domain["visible"]:
4087 kwargs["schema"] = domain["schema"]
4088
4089 else:
4090 try:
4091 charlen = int(attype_args[0])
4092 args = (charlen, *attype_args[1:])
4093 except (ValueError, IndexError):
4094 args = attype_args
4095
4096 if not schema_type:
4097 util.warn(
4098 "Did not recognize type '%s' of %s"
4099 % (attype, type_description)
4100 )
4101 return sqltypes.NULLTYPE
4102
4103 if collation is not None:
4104 kwargs["collation"] = collation
4105
4106 data_type = schema_type(*args, **kwargs)
4107 if array_dim >= 1:
4108 # postgres does not preserve dimensionality or size of array types.
4109 data_type = _array.ARRAY(data_type)
4110
4111 return data_type
4112
4113 def _get_columns_info(self, rows, domains, enums, schema):
4114 columns = defaultdict(list)
4115 for row_dict in rows:
4116 # ensure that each table has an entry, even if it has no columns
4117 if row_dict["name"] is None:
4118 columns[(schema, row_dict["table_name"])] = (
4119 ReflectionDefaults.columns()
4120 )
4121 continue
4122 table_cols = columns[(schema, row_dict["table_name"])]
4123
4124 collation = row_dict["collation"]
4125
4126 coltype = self._reflect_type(
4127 row_dict["format_type"],
4128 domains,
4129 enums,
4130 type_description="column '%s'" % row_dict["name"],
4131 collation=collation,
4132 )
4133
4134 default = row_dict["default"]
4135 name = row_dict["name"]
4136 generated = row_dict["generated"]
4137 nullable = not row_dict["not_null"]
4138
4139 if isinstance(coltype, DOMAIN):
4140 if not default:
4141 # domain can override the default value but
4142 # can't set it to None
4143 if coltype.default is not None:
4144 default = coltype.default
4145
4146 nullable = nullable and not coltype.not_null
4147
4148 identity = row_dict["identity_options"]
4149
4150 # If a zero byte or blank string depending on driver (is also
4151 # absent for older PG versions), then not a generated column.
4152 # Otherwise, s = stored. (Other values might be added in the
4153 # future.)
4154 if generated not in (None, "", b"\x00"):
4155 computed = dict(
4156 sqltext=default, persisted=generated in ("s", b"s")
4157 )
4158 default = None
4159 else:
4160 computed = None
4161
4162 # adjust the default value
4163 autoincrement = False
4164 if default is not None:
4165 match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
4166 if match is not None:
4167 if issubclass(coltype._type_affinity, sqltypes.Integer):
4168 autoincrement = True
4169 # the default is related to a Sequence
4170 if "." not in match.group(2) and schema is not None:
4171 # unconditionally quote the schema name. this could
4172 # later be enhanced to obey quoting rules /
4173 # "quote schema"
4174 default = (
4175 match.group(1)
4176 + ('"%s"' % schema)
4177 + "."
4178 + match.group(2)
4179 + match.group(3)
4180 )
4181
4182 column_info = {
4183 "name": name,
4184 "type": coltype,
4185 "nullable": nullable,
4186 "default": default,
4187 "autoincrement": autoincrement or identity is not None,
4188 "comment": row_dict["comment"],
4189 }
4190 if computed is not None:
4191 column_info["computed"] = computed
4192 if identity is not None:
4193 column_info["identity"] = identity
4194
4195 table_cols.append(column_info)
4196
4197 return columns
4198
4199 @lru_cache()
4200 def _table_oids_query(self, schema, has_filter_names, scope, kind):
4201 relkinds = self._kind_to_relkinds(kind)
4202 oid_q = select(
4203 pg_catalog.pg_class.c.oid, pg_catalog.pg_class.c.relname
4204 ).where(self._pg_class_relkind_condition(relkinds))
4205 oid_q = self._pg_class_filter_scope_schema(oid_q, schema, scope=scope)
4206
4207 if has_filter_names:
4208 oid_q = oid_q.where(
4209 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
4210 )
4211 return oid_q
4212
4213 @reflection.flexi_cache(
4214 ("schema", InternalTraversal.dp_string),
4215 ("filter_names", InternalTraversal.dp_string_list),
4216 ("kind", InternalTraversal.dp_plain_obj),
4217 ("scope", InternalTraversal.dp_plain_obj),
4218 )
4219 def _get_table_oids(
4220 self, connection, schema, filter_names, scope, kind, **kw
4221 ):
4222 has_filter_names, params = self._prepare_filter_names(filter_names)
4223 oid_q = self._table_oids_query(schema, has_filter_names, scope, kind)
4224 result = connection.execute(oid_q, params)
4225 return result.all()
4226
4227 @util.memoized_property
4228 def _constraint_query(self):
4229 if self.server_version_info >= (11, 0):
4230 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts
4231 else:
4232 indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts")
4233
4234 if self.server_version_info >= (15,):
4235 indnullsnotdistinct = pg_catalog.pg_index.c.indnullsnotdistinct
4236 else:
4237 indnullsnotdistinct = sql.false().label("indnullsnotdistinct")
4238
4239 con_sq = (
4240 select(
4241 pg_catalog.pg_constraint.c.conrelid,
4242 pg_catalog.pg_constraint.c.conname,
4243 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"),
4244 sql.func.generate_subscripts(
4245 pg_catalog.pg_index.c.indkey, 1
4246 ).label("ord"),
4247 indnkeyatts,
4248 indnullsnotdistinct,
4249 pg_catalog.pg_description.c.description,
4250 )
4251 .join(
4252 pg_catalog.pg_index,
4253 pg_catalog.pg_constraint.c.conindid
4254 == pg_catalog.pg_index.c.indexrelid,
4255 )
4256 .outerjoin(
4257 pg_catalog.pg_description,
4258 pg_catalog.pg_description.c.objoid
4259 == pg_catalog.pg_constraint.c.oid,
4260 )
4261 .where(
4262 pg_catalog.pg_constraint.c.contype == bindparam("contype"),
4263 pg_catalog.pg_constraint.c.conrelid.in_(bindparam("oids")),
4264 # NOTE: filtering also on pg_index.indrelid for oids does
4265 # not seem to have a performance effect, but it may be an
4266 # option if perf problems are reported
4267 )
4268 .subquery("con")
4269 )
4270
4271 attr_sq = (
4272 select(
4273 con_sq.c.conrelid,
4274 con_sq.c.conname,
4275 con_sq.c.description,
4276 con_sq.c.ord,
4277 con_sq.c.indnkeyatts,
4278 con_sq.c.indnullsnotdistinct,
4279 pg_catalog.pg_attribute.c.attname,
4280 )
4281 .select_from(pg_catalog.pg_attribute)
4282 .join(
4283 con_sq,
4284 sql.and_(
4285 pg_catalog.pg_attribute.c.attnum == con_sq.c.attnum,
4286 pg_catalog.pg_attribute.c.attrelid == con_sq.c.conrelid,
4287 ),
4288 )
4289 .where(
4290 # NOTE: restate the condition here, since pg15 otherwise
4291 # seems to get confused on pscopg2 sometimes, doing
4292 # a sequential scan of pg_attribute.
4293 # The condition in the con_sq subquery is not actually needed
4294 # in pg15, but it may be needed in older versions. Keeping it
4295 # does not seems to have any impact in any case.
4296 con_sq.c.conrelid.in_(bindparam("oids"))
4297 )
4298 .subquery("attr")
4299 )
4300
4301 return (
4302 select(
4303 attr_sq.c.conrelid,
4304 sql.func.array_agg(
4305 # NOTE: cast since some postgresql derivatives may
4306 # not support array_agg on the name type
4307 aggregate_order_by(
4308 attr_sq.c.attname.cast(TEXT), attr_sq.c.ord
4309 )
4310 ).label("cols"),
4311 attr_sq.c.conname,
4312 sql.func.min(attr_sq.c.description).label("description"),
4313 sql.func.min(attr_sq.c.indnkeyatts).label("indnkeyatts"),
4314 sql.func.bool_and(attr_sq.c.indnullsnotdistinct).label(
4315 "indnullsnotdistinct"
4316 ),
4317 )
4318 .group_by(attr_sq.c.conrelid, attr_sq.c.conname)
4319 .order_by(attr_sq.c.conrelid, attr_sq.c.conname)
4320 )
4321
4322 def _reflect_constraint(
4323 self, connection, contype, schema, filter_names, scope, kind, **kw
4324 ):
4325 # used to reflect primary and unique constraint
4326 table_oids = self._get_table_oids(
4327 connection, schema, filter_names, scope, kind, **kw
4328 )
4329 batches = list(table_oids)
4330 is_unique = contype == "u"
4331
4332 while batches:
4333 batch = batches[0:3000]
4334 batches[0:3000] = []
4335
4336 result = connection.execute(
4337 self._constraint_query,
4338 {"oids": [r[0] for r in batch], "contype": contype},
4339 ).mappings()
4340
4341 result_by_oid = defaultdict(list)
4342 for row_dict in result:
4343 result_by_oid[row_dict["conrelid"]].append(row_dict)
4344
4345 for oid, tablename in batch:
4346 for_oid = result_by_oid.get(oid, ())
4347 if for_oid:
4348 for row in for_oid:
4349 # See note in get_multi_indexes
4350 all_cols = row["cols"]
4351 indnkeyatts = row["indnkeyatts"]
4352 if len(all_cols) > indnkeyatts:
4353 inc_cols = all_cols[indnkeyatts:]
4354 cst_cols = all_cols[:indnkeyatts]
4355 else:
4356 inc_cols = []
4357 cst_cols = all_cols
4358
4359 opts = {}
4360 if self.server_version_info >= (11,):
4361 opts["postgresql_include"] = inc_cols
4362 if is_unique:
4363 opts["postgresql_nulls_not_distinct"] = row[
4364 "indnullsnotdistinct"
4365 ]
4366 yield (
4367 tablename,
4368 cst_cols,
4369 row["conname"],
4370 row["description"],
4371 opts,
4372 )
4373 else:
4374 yield tablename, None, None, None, None
4375
4376 @reflection.cache
4377 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
4378 data = self.get_multi_pk_constraint(
4379 connection,
4380 schema=schema,
4381 filter_names=[table_name],
4382 scope=ObjectScope.ANY,
4383 kind=ObjectKind.ANY,
4384 **kw,
4385 )
4386 return self._value_or_raise(data, table_name, schema)
4387
4388 def get_multi_pk_constraint(
4389 self, connection, schema, filter_names, scope, kind, **kw
4390 ):
4391 result = self._reflect_constraint(
4392 connection, "p", schema, filter_names, scope, kind, **kw
4393 )
4394
4395 # only a single pk can be present for each table. Return an entry
4396 # even if a table has no primary key
4397 default = ReflectionDefaults.pk_constraint
4398
4399 def pk_constraint(pk_name, cols, comment, opts):
4400 info = {
4401 "constrained_columns": cols,
4402 "name": pk_name,
4403 "comment": comment,
4404 }
4405 if opts:
4406 info["dialect_options"] = opts
4407 return info
4408
4409 return (
4410 (
4411 (schema, table_name),
4412 (
4413 pk_constraint(pk_name, cols, comment, opts)
4414 if pk_name is not None
4415 else default()
4416 ),
4417 )
4418 for table_name, cols, pk_name, comment, opts in result
4419 )
4420
4421 @reflection.cache
4422 def get_foreign_keys(
4423 self,
4424 connection,
4425 table_name,
4426 schema=None,
4427 postgresql_ignore_search_path=False,
4428 **kw,
4429 ):
4430 data = self.get_multi_foreign_keys(
4431 connection,
4432 schema=schema,
4433 filter_names=[table_name],
4434 postgresql_ignore_search_path=postgresql_ignore_search_path,
4435 scope=ObjectScope.ANY,
4436 kind=ObjectKind.ANY,
4437 **kw,
4438 )
4439 return self._value_or_raise(data, table_name, schema)
4440
4441 @lru_cache()
4442 def _foreing_key_query(self, schema, has_filter_names, scope, kind):
4443 pg_class_ref = pg_catalog.pg_class.alias("cls_ref")
4444 pg_namespace_ref = pg_catalog.pg_namespace.alias("nsp_ref")
4445 relkinds = self._kind_to_relkinds(kind)
4446 query = (
4447 select(
4448 pg_catalog.pg_class.c.relname,
4449 pg_catalog.pg_constraint.c.conname,
4450 # NOTE: avoid calling pg_get_constraintdef when not needed
4451 # to speed up the query
4452 sql.case(
4453 (
4454 pg_catalog.pg_constraint.c.oid.is_not(None),
4455 pg_catalog.pg_get_constraintdef(
4456 pg_catalog.pg_constraint.c.oid, True
4457 ),
4458 ),
4459 else_=None,
4460 ),
4461 pg_namespace_ref.c.nspname,
4462 pg_catalog.pg_description.c.description,
4463 )
4464 .select_from(pg_catalog.pg_class)
4465 .outerjoin(
4466 pg_catalog.pg_constraint,
4467 sql.and_(
4468 pg_catalog.pg_class.c.oid
4469 == pg_catalog.pg_constraint.c.conrelid,
4470 pg_catalog.pg_constraint.c.contype == "f",
4471 ),
4472 )
4473 .outerjoin(
4474 pg_class_ref,
4475 pg_class_ref.c.oid == pg_catalog.pg_constraint.c.confrelid,
4476 )
4477 .outerjoin(
4478 pg_namespace_ref,
4479 pg_class_ref.c.relnamespace == pg_namespace_ref.c.oid,
4480 )
4481 .outerjoin(
4482 pg_catalog.pg_description,
4483 pg_catalog.pg_description.c.objoid
4484 == pg_catalog.pg_constraint.c.oid,
4485 )
4486 .order_by(
4487 pg_catalog.pg_class.c.relname,
4488 pg_catalog.pg_constraint.c.conname,
4489 )
4490 .where(self._pg_class_relkind_condition(relkinds))
4491 )
4492 query = self._pg_class_filter_scope_schema(query, schema, scope)
4493 if has_filter_names:
4494 query = query.where(
4495 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
4496 )
4497 return query
4498
4499 @util.memoized_property
4500 def _fk_regex_pattern(self):
4501 # optionally quoted token
4502 qtoken = r'(?:"[^"]+"|[\w]+?)'
4503
4504 # https://www.postgresql.org/docs/current/static/sql-createtable.html
4505 return re.compile(
4506 r"FOREIGN KEY \((.*?)\) "
4507 rf"REFERENCES (?:({qtoken})\.)?({qtoken})\(((?:{qtoken}(?: *, *)?)+)\)" # noqa: E501
4508 r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?"
4509 r"[\s]?(ON UPDATE "
4510 r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
4511 r"[\s]?(ON DELETE "
4512 r"(CASCADE|RESTRICT|NO ACTION|"
4513 r"SET (?:NULL|DEFAULT)(?:\s\(.+\))?)+)?"
4514 r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?"
4515 r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?"
4516 )
4517
4518 def get_multi_foreign_keys(
4519 self,
4520 connection,
4521 schema,
4522 filter_names,
4523 scope,
4524 kind,
4525 postgresql_ignore_search_path=False,
4526 **kw,
4527 ):
4528 preparer = self.identifier_preparer
4529
4530 has_filter_names, params = self._prepare_filter_names(filter_names)
4531 query = self._foreing_key_query(schema, has_filter_names, scope, kind)
4532 result = connection.execute(query, params)
4533
4534 FK_REGEX = self._fk_regex_pattern
4535
4536 fkeys = defaultdict(list)
4537 default = ReflectionDefaults.foreign_keys
4538 for table_name, conname, condef, conschema, comment in result:
4539 # ensure that each table has an entry, even if it has
4540 # no foreign keys
4541 if conname is None:
4542 fkeys[(schema, table_name)] = default()
4543 continue
4544 table_fks = fkeys[(schema, table_name)]
4545 m = re.search(FK_REGEX, condef).groups()
4546
4547 (
4548 constrained_columns,
4549 referred_schema,
4550 referred_table,
4551 referred_columns,
4552 _,
4553 match,
4554 _,
4555 onupdate,
4556 _,
4557 ondelete,
4558 deferrable,
4559 _,
4560 initially,
4561 ) = m
4562
4563 if deferrable is not None:
4564 deferrable = True if deferrable == "DEFERRABLE" else False
4565 constrained_columns = [
4566 preparer._unquote_identifier(x)
4567 for x in re.split(r"\s*,\s*", constrained_columns)
4568 ]
4569
4570 if postgresql_ignore_search_path:
4571 # when ignoring search path, we use the actual schema
4572 # provided it isn't the "default" schema
4573 if conschema != self.default_schema_name:
4574 referred_schema = conschema
4575 else:
4576 referred_schema = schema
4577 elif referred_schema:
4578 # referred_schema is the schema that we regexp'ed from
4579 # pg_get_constraintdef(). If the schema is in the search
4580 # path, pg_get_constraintdef() will give us None.
4581 referred_schema = preparer._unquote_identifier(referred_schema)
4582 elif schema is not None and schema == conschema:
4583 # If the actual schema matches the schema of the table
4584 # we're reflecting, then we will use that.
4585 referred_schema = schema
4586
4587 referred_table = preparer._unquote_identifier(referred_table)
4588 referred_columns = [
4589 preparer._unquote_identifier(x)
4590 for x in re.split(r"\s*,\s", referred_columns)
4591 ]
4592 options = {
4593 k: v
4594 for k, v in [
4595 ("onupdate", onupdate),
4596 ("ondelete", ondelete),
4597 ("initially", initially),
4598 ("deferrable", deferrable),
4599 ("match", match),
4600 ]
4601 if v is not None and v != "NO ACTION"
4602 }
4603 fkey_d = {
4604 "name": conname,
4605 "constrained_columns": constrained_columns,
4606 "referred_schema": referred_schema,
4607 "referred_table": referred_table,
4608 "referred_columns": referred_columns,
4609 "options": options,
4610 "comment": comment,
4611 }
4612 table_fks.append(fkey_d)
4613 return fkeys.items()
4614
4615 @reflection.cache
4616 def get_indexes(self, connection, table_name, schema=None, **kw):
4617 data = self.get_multi_indexes(
4618 connection,
4619 schema=schema,
4620 filter_names=[table_name],
4621 scope=ObjectScope.ANY,
4622 kind=ObjectKind.ANY,
4623 **kw,
4624 )
4625 return self._value_or_raise(data, table_name, schema)
4626
4627 @util.memoized_property
4628 def _index_query(self):
4629 # NOTE: pg_index is used as from two times to improve performance,
4630 # since extraing all the index information from `idx_sq` to avoid
4631 # the second pg_index use leads to a worse performing query in
4632 # particular when querying for a single table (as of pg 17)
4633 # NOTE: repeating oids clause improve query performance
4634
4635 # subquery to get the columns
4636 idx_sq = (
4637 select(
4638 pg_catalog.pg_index.c.indexrelid,
4639 pg_catalog.pg_index.c.indrelid,
4640 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"),
4641 sql.func.unnest(pg_catalog.pg_index.c.indclass).label(
4642 "att_opclass"
4643 ),
4644 sql.func.generate_subscripts(
4645 pg_catalog.pg_index.c.indkey, 1
4646 ).label("ord"),
4647 )
4648 .where(
4649 ~pg_catalog.pg_index.c.indisprimary,
4650 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")),
4651 )
4652 .subquery("idx")
4653 )
4654
4655 attr_sq = (
4656 select(
4657 idx_sq.c.indexrelid,
4658 idx_sq.c.indrelid,
4659 idx_sq.c.ord,
4660 # NOTE: always using pg_get_indexdef is too slow so just
4661 # invoke when the element is an expression
4662 sql.case(
4663 (
4664 idx_sq.c.attnum == 0,
4665 pg_catalog.pg_get_indexdef(
4666 idx_sq.c.indexrelid, idx_sq.c.ord + 1, True
4667 ),
4668 ),
4669 # NOTE: need to cast this since attname is of type "name"
4670 # that's limited to 63 bytes, while pg_get_indexdef
4671 # returns "text" so its output may get cut
4672 else_=pg_catalog.pg_attribute.c.attname.cast(TEXT),
4673 ).label("element"),
4674 (idx_sq.c.attnum == 0).label("is_expr"),
4675 pg_catalog.pg_opclass.c.opcname,
4676 pg_catalog.pg_opclass.c.opcdefault,
4677 )
4678 .select_from(idx_sq)
4679 .outerjoin(
4680 # do not remove rows where idx_sq.c.attnum is 0
4681 pg_catalog.pg_attribute,
4682 sql.and_(
4683 pg_catalog.pg_attribute.c.attnum == idx_sq.c.attnum,
4684 pg_catalog.pg_attribute.c.attrelid == idx_sq.c.indrelid,
4685 ),
4686 )
4687 .outerjoin(
4688 pg_catalog.pg_opclass,
4689 pg_catalog.pg_opclass.c.oid == idx_sq.c.att_opclass,
4690 )
4691 .where(idx_sq.c.indrelid.in_(bindparam("oids")))
4692 .subquery("idx_attr")
4693 )
4694
4695 cols_sq = (
4696 select(
4697 attr_sq.c.indexrelid,
4698 sql.func.min(attr_sq.c.indrelid),
4699 sql.func.array_agg(
4700 aggregate_order_by(attr_sq.c.element, attr_sq.c.ord)
4701 ).label("elements"),
4702 sql.func.array_agg(
4703 aggregate_order_by(attr_sq.c.is_expr, attr_sq.c.ord)
4704 ).label("elements_is_expr"),
4705 sql.func.array_agg(
4706 aggregate_order_by(attr_sq.c.opcname, attr_sq.c.ord)
4707 ).label("elements_opclass"),
4708 sql.func.array_agg(
4709 aggregate_order_by(attr_sq.c.opcdefault, attr_sq.c.ord)
4710 ).label("elements_opdefault"),
4711 )
4712 .group_by(attr_sq.c.indexrelid)
4713 .subquery("idx_cols")
4714 )
4715
4716 if self.server_version_info >= (11, 0):
4717 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts
4718 else:
4719 indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts")
4720
4721 if self.server_version_info >= (15,):
4722 nulls_not_distinct = pg_catalog.pg_index.c.indnullsnotdistinct
4723 else:
4724 nulls_not_distinct = sql.false().label("indnullsnotdistinct")
4725
4726 return (
4727 select(
4728 pg_catalog.pg_index.c.indrelid,
4729 pg_catalog.pg_class.c.relname,
4730 pg_catalog.pg_index.c.indisunique,
4731 pg_catalog.pg_constraint.c.conrelid.is_not(None).label(
4732 "has_constraint"
4733 ),
4734 pg_catalog.pg_index.c.indoption,
4735 pg_catalog.pg_class.c.reloptions,
4736 pg_catalog.pg_am.c.amname,
4737 # NOTE: pg_get_expr is very fast so this case has almost no
4738 # performance impact
4739 sql.case(
4740 (
4741 pg_catalog.pg_index.c.indpred.is_not(None),
4742 pg_catalog.pg_get_expr(
4743 pg_catalog.pg_index.c.indpred,
4744 pg_catalog.pg_index.c.indrelid,
4745 ),
4746 ),
4747 else_=None,
4748 ).label("filter_definition"),
4749 indnkeyatts,
4750 nulls_not_distinct,
4751 cols_sq.c.elements,
4752 cols_sq.c.elements_is_expr,
4753 cols_sq.c.elements_opclass,
4754 cols_sq.c.elements_opdefault,
4755 )
4756 .select_from(pg_catalog.pg_index)
4757 .where(
4758 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")),
4759 ~pg_catalog.pg_index.c.indisprimary,
4760 )
4761 .join(
4762 pg_catalog.pg_class,
4763 pg_catalog.pg_index.c.indexrelid == pg_catalog.pg_class.c.oid,
4764 )
4765 .join(
4766 pg_catalog.pg_am,
4767 pg_catalog.pg_class.c.relam == pg_catalog.pg_am.c.oid,
4768 )
4769 .outerjoin(
4770 cols_sq,
4771 pg_catalog.pg_index.c.indexrelid == cols_sq.c.indexrelid,
4772 )
4773 .outerjoin(
4774 pg_catalog.pg_constraint,
4775 sql.and_(
4776 pg_catalog.pg_index.c.indrelid
4777 == pg_catalog.pg_constraint.c.conrelid,
4778 pg_catalog.pg_index.c.indexrelid
4779 == pg_catalog.pg_constraint.c.conindid,
4780 pg_catalog.pg_constraint.c.contype
4781 == sql.any_(_array.array(("p", "u", "x"))),
4782 ),
4783 )
4784 .order_by(
4785 pg_catalog.pg_index.c.indrelid, pg_catalog.pg_class.c.relname
4786 )
4787 )
4788
4789 def get_multi_indexes(
4790 self, connection, schema, filter_names, scope, kind, **kw
4791 ):
4792 table_oids = self._get_table_oids(
4793 connection, schema, filter_names, scope, kind, **kw
4794 )
4795
4796 indexes = defaultdict(list)
4797 default = ReflectionDefaults.indexes
4798
4799 batches = list(table_oids)
4800
4801 while batches:
4802 batch = batches[0:3000]
4803 batches[0:3000] = []
4804
4805 result = connection.execute(
4806 self._index_query, {"oids": [r[0] for r in batch]}
4807 ).mappings()
4808
4809 result_by_oid = defaultdict(list)
4810 for row_dict in result:
4811 result_by_oid[row_dict["indrelid"]].append(row_dict)
4812
4813 for oid, table_name in batch:
4814 if oid not in result_by_oid:
4815 # ensure that each table has an entry, even if reflection
4816 # is skipped because not supported
4817 indexes[(schema, table_name)] = default()
4818 continue
4819
4820 for row in result_by_oid[oid]:
4821 index_name = row["relname"]
4822
4823 table_indexes = indexes[(schema, table_name)]
4824
4825 all_elements = row["elements"]
4826 all_elements_is_expr = row["elements_is_expr"]
4827 all_elements_opclass = row["elements_opclass"]
4828 all_elements_opdefault = row["elements_opdefault"]
4829 indnkeyatts = row["indnkeyatts"]
4830 # "The number of key columns in the index, not counting any
4831 # included columns, which are merely stored and do not
4832 # participate in the index semantics"
4833 if len(all_elements) > indnkeyatts:
4834 # this is a "covering index" which has INCLUDE columns
4835 # as well as regular index columns
4836 inc_cols = all_elements[indnkeyatts:]
4837 idx_elements = all_elements[:indnkeyatts]
4838 idx_elements_is_expr = all_elements_is_expr[
4839 :indnkeyatts
4840 ]
4841 # postgresql does not support expression on included
4842 # columns as of v14: "ERROR: expressions are not
4843 # supported in included columns".
4844 assert all(
4845 not is_expr
4846 for is_expr in all_elements_is_expr[indnkeyatts:]
4847 )
4848 idx_elements_opclass = all_elements_opclass[
4849 :indnkeyatts
4850 ]
4851 idx_elements_opdefault = all_elements_opdefault[
4852 :indnkeyatts
4853 ]
4854 else:
4855 idx_elements = all_elements
4856 idx_elements_is_expr = all_elements_is_expr
4857 inc_cols = []
4858 idx_elements_opclass = all_elements_opclass
4859 idx_elements_opdefault = all_elements_opdefault
4860
4861 index = {"name": index_name, "unique": row["indisunique"]}
4862 if any(idx_elements_is_expr):
4863 index["column_names"] = [
4864 None if is_expr else expr
4865 for expr, is_expr in zip(
4866 idx_elements, idx_elements_is_expr
4867 )
4868 ]
4869 index["expressions"] = idx_elements
4870 else:
4871 index["column_names"] = idx_elements
4872
4873 dialect_options = {}
4874
4875 if not all(idx_elements_opdefault):
4876 dialect_options["postgresql_ops"] = {
4877 name: opclass
4878 for name, opclass, is_default in zip(
4879 idx_elements,
4880 idx_elements_opclass,
4881 idx_elements_opdefault,
4882 )
4883 if not is_default
4884 }
4885
4886 sorting = {}
4887 for col_index, col_flags in enumerate(row["indoption"]):
4888 col_sorting = ()
4889 # try to set flags only if they differ from PG
4890 # defaults...
4891 if col_flags & 0x01:
4892 col_sorting += ("desc",)
4893 if not (col_flags & 0x02):
4894 col_sorting += ("nulls_last",)
4895 else:
4896 if col_flags & 0x02:
4897 col_sorting += ("nulls_first",)
4898 if col_sorting:
4899 sorting[idx_elements[col_index]] = col_sorting
4900 if sorting:
4901 index["column_sorting"] = sorting
4902 if row["has_constraint"]:
4903 index["duplicates_constraint"] = index_name
4904
4905 if row["reloptions"]:
4906 dialect_options["postgresql_with"] = dict(
4907 [
4908 option.split("=", 1)
4909 for option in row["reloptions"]
4910 ]
4911 )
4912 # it *might* be nice to include that this is 'btree' in the
4913 # reflection info. But we don't want an Index object
4914 # to have a ``postgresql_using`` in it that is just the
4915 # default, so for the moment leaving this out.
4916 amname = row["amname"]
4917 if amname != "btree":
4918 dialect_options["postgresql_using"] = row["amname"]
4919 if row["filter_definition"]:
4920 dialect_options["postgresql_where"] = row[
4921 "filter_definition"
4922 ]
4923 if self.server_version_info >= (11,):
4924 # NOTE: this is legacy, this is part of
4925 # dialect_options now as of #7382
4926 index["include_columns"] = inc_cols
4927 dialect_options["postgresql_include"] = inc_cols
4928 if row["indnullsnotdistinct"]:
4929 # the default is False, so ignore it.
4930 dialect_options["postgresql_nulls_not_distinct"] = row[
4931 "indnullsnotdistinct"
4932 ]
4933
4934 if dialect_options:
4935 index["dialect_options"] = dialect_options
4936
4937 table_indexes.append(index)
4938 return indexes.items()
4939
4940 @reflection.cache
4941 def get_unique_constraints(
4942 self, connection, table_name, schema=None, **kw
4943 ):
4944 data = self.get_multi_unique_constraints(
4945 connection,
4946 schema=schema,
4947 filter_names=[table_name],
4948 scope=ObjectScope.ANY,
4949 kind=ObjectKind.ANY,
4950 **kw,
4951 )
4952 return self._value_or_raise(data, table_name, schema)
4953
4954 def get_multi_unique_constraints(
4955 self,
4956 connection,
4957 schema,
4958 filter_names,
4959 scope,
4960 kind,
4961 **kw,
4962 ):
4963 result = self._reflect_constraint(
4964 connection, "u", schema, filter_names, scope, kind, **kw
4965 )
4966
4967 # each table can have multiple unique constraints
4968 uniques = defaultdict(list)
4969 default = ReflectionDefaults.unique_constraints
4970 for table_name, cols, con_name, comment, options in result:
4971 # ensure a list is created for each table. leave it empty if
4972 # the table has no unique constraint
4973 if con_name is None:
4974 uniques[(schema, table_name)] = default()
4975 continue
4976
4977 uc_dict = {
4978 "column_names": cols,
4979 "name": con_name,
4980 "comment": comment,
4981 }
4982 if options:
4983 uc_dict["dialect_options"] = options
4984
4985 uniques[(schema, table_name)].append(uc_dict)
4986 return uniques.items()
4987
4988 @reflection.cache
4989 def get_table_comment(self, connection, table_name, schema=None, **kw):
4990 data = self.get_multi_table_comment(
4991 connection,
4992 schema,
4993 [table_name],
4994 scope=ObjectScope.ANY,
4995 kind=ObjectKind.ANY,
4996 **kw,
4997 )
4998 return self._value_or_raise(data, table_name, schema)
4999
5000 @lru_cache()
5001 def _comment_query(self, schema, has_filter_names, scope, kind):
5002 relkinds = self._kind_to_relkinds(kind)
5003 query = (
5004 select(
5005 pg_catalog.pg_class.c.relname,
5006 pg_catalog.pg_description.c.description,
5007 )
5008 .select_from(pg_catalog.pg_class)
5009 .outerjoin(
5010 pg_catalog.pg_description,
5011 sql.and_(
5012 pg_catalog.pg_class.c.oid
5013 == pg_catalog.pg_description.c.objoid,
5014 pg_catalog.pg_description.c.objsubid == 0,
5015 pg_catalog.pg_description.c.classoid
5016 == sql.func.cast("pg_catalog.pg_class", REGCLASS),
5017 ),
5018 )
5019 .where(self._pg_class_relkind_condition(relkinds))
5020 )
5021 query = self._pg_class_filter_scope_schema(query, schema, scope)
5022 if has_filter_names:
5023 query = query.where(
5024 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
5025 )
5026 return query
5027
5028 def get_multi_table_comment(
5029 self, connection, schema, filter_names, scope, kind, **kw
5030 ):
5031 has_filter_names, params = self._prepare_filter_names(filter_names)
5032 query = self._comment_query(schema, has_filter_names, scope, kind)
5033 result = connection.execute(query, params)
5034
5035 default = ReflectionDefaults.table_comment
5036 return (
5037 (
5038 (schema, table),
5039 {"text": comment} if comment is not None else default(),
5040 )
5041 for table, comment in result
5042 )
5043
5044 @reflection.cache
5045 def get_check_constraints(self, connection, table_name, schema=None, **kw):
5046 data = self.get_multi_check_constraints(
5047 connection,
5048 schema,
5049 [table_name],
5050 scope=ObjectScope.ANY,
5051 kind=ObjectKind.ANY,
5052 **kw,
5053 )
5054 return self._value_or_raise(data, table_name, schema)
5055
5056 @lru_cache()
5057 def _check_constraint_query(self, schema, has_filter_names, scope, kind):
5058 relkinds = self._kind_to_relkinds(kind)
5059 query = (
5060 select(
5061 pg_catalog.pg_class.c.relname,
5062 pg_catalog.pg_constraint.c.conname,
5063 # NOTE: avoid calling pg_get_constraintdef when not needed
5064 # to speed up the query
5065 sql.case(
5066 (
5067 pg_catalog.pg_constraint.c.oid.is_not(None),
5068 pg_catalog.pg_get_constraintdef(
5069 pg_catalog.pg_constraint.c.oid, True
5070 ),
5071 ),
5072 else_=None,
5073 ),
5074 pg_catalog.pg_description.c.description,
5075 )
5076 .select_from(pg_catalog.pg_class)
5077 .outerjoin(
5078 pg_catalog.pg_constraint,
5079 sql.and_(
5080 pg_catalog.pg_class.c.oid
5081 == pg_catalog.pg_constraint.c.conrelid,
5082 pg_catalog.pg_constraint.c.contype == "c",
5083 ),
5084 )
5085 .outerjoin(
5086 pg_catalog.pg_description,
5087 pg_catalog.pg_description.c.objoid
5088 == pg_catalog.pg_constraint.c.oid,
5089 )
5090 .order_by(
5091 pg_catalog.pg_class.c.relname,
5092 pg_catalog.pg_constraint.c.conname,
5093 )
5094 .where(self._pg_class_relkind_condition(relkinds))
5095 )
5096 query = self._pg_class_filter_scope_schema(query, schema, scope)
5097 if has_filter_names:
5098 query = query.where(
5099 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
5100 )
5101 return query
5102
5103 def get_multi_check_constraints(
5104 self, connection, schema, filter_names, scope, kind, **kw
5105 ):
5106 has_filter_names, params = self._prepare_filter_names(filter_names)
5107 query = self._check_constraint_query(
5108 schema, has_filter_names, scope, kind
5109 )
5110 result = connection.execute(query, params)
5111
5112 check_constraints = defaultdict(list)
5113 default = ReflectionDefaults.check_constraints
5114 for table_name, check_name, src, comment in result:
5115 # only two cases for check_name and src: both null or both defined
5116 if check_name is None and src is None:
5117 check_constraints[(schema, table_name)] = default()
5118 continue
5119 # samples:
5120 # "CHECK (((a > 1) AND (a < 5)))"
5121 # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))"
5122 # "CHECK (((a > 1) AND (a < 5))) NOT VALID"
5123 # "CHECK (some_boolean_function(a))"
5124 # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)"
5125 # "CHECK (a NOT NULL) NO INHERIT"
5126 # "CHECK (a NOT NULL) NO INHERIT NOT VALID"
5127
5128 m = re.match(
5129 r"^CHECK *\((.+)\)( NO INHERIT)?( NOT VALID)?$",
5130 src,
5131 flags=re.DOTALL,
5132 )
5133 if not m:
5134 util.warn("Could not parse CHECK constraint text: %r" % src)
5135 sqltext = ""
5136 else:
5137 sqltext = re.compile(
5138 r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL
5139 ).sub(r"\1", m.group(1))
5140 entry = {
5141 "name": check_name,
5142 "sqltext": sqltext,
5143 "comment": comment,
5144 }
5145 if m:
5146 do = {}
5147 if " NOT VALID" in m.groups():
5148 do["not_valid"] = True
5149 if " NO INHERIT" in m.groups():
5150 do["no_inherit"] = True
5151 if do:
5152 entry["dialect_options"] = do
5153
5154 check_constraints[(schema, table_name)].append(entry)
5155 return check_constraints.items()
5156
5157 def _pg_type_filter_schema(self, query, schema):
5158 if schema is None:
5159 query = query.where(
5160 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid),
5161 # ignore pg_catalog schema
5162 pg_catalog.pg_namespace.c.nspname != "pg_catalog",
5163 )
5164 elif schema != "*":
5165 query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
5166 return query
5167
5168 @lru_cache()
5169 def _enum_query(self, schema):
5170 lbl_agg_sq = (
5171 select(
5172 pg_catalog.pg_enum.c.enumtypid,
5173 sql.func.array_agg(
5174 aggregate_order_by(
5175 # NOTE: cast since some postgresql derivatives may
5176 # not support array_agg on the name type
5177 pg_catalog.pg_enum.c.enumlabel.cast(TEXT),
5178 pg_catalog.pg_enum.c.enumsortorder,
5179 )
5180 ).label("labels"),
5181 )
5182 .group_by(pg_catalog.pg_enum.c.enumtypid)
5183 .subquery("lbl_agg")
5184 )
5185
5186 query = (
5187 select(
5188 pg_catalog.pg_type.c.typname.label("name"),
5189 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label(
5190 "visible"
5191 ),
5192 pg_catalog.pg_namespace.c.nspname.label("schema"),
5193 lbl_agg_sq.c.labels.label("labels"),
5194 )
5195 .join(
5196 pg_catalog.pg_namespace,
5197 pg_catalog.pg_namespace.c.oid
5198 == pg_catalog.pg_type.c.typnamespace,
5199 )
5200 .outerjoin(
5201 lbl_agg_sq, pg_catalog.pg_type.c.oid == lbl_agg_sq.c.enumtypid
5202 )
5203 .where(pg_catalog.pg_type.c.typtype == "e")
5204 .order_by(
5205 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname
5206 )
5207 )
5208
5209 return self._pg_type_filter_schema(query, schema)
5210
5211 @reflection.cache
5212 def _load_enums(self, connection, schema=None, **kw):
5213 if not self.supports_native_enum:
5214 return []
5215
5216 result = connection.execute(self._enum_query(schema))
5217
5218 enums = []
5219 for name, visible, schema, labels in result:
5220 enums.append(
5221 {
5222 "name": name,
5223 "schema": schema,
5224 "visible": visible,
5225 "labels": [] if labels is None else labels,
5226 }
5227 )
5228 return enums
5229
5230 @lru_cache()
5231 def _domain_query(self, schema):
5232 con_sq = (
5233 select(
5234 pg_catalog.pg_constraint.c.contypid,
5235 sql.func.array_agg(
5236 pg_catalog.pg_get_constraintdef(
5237 pg_catalog.pg_constraint.c.oid, True
5238 )
5239 ).label("condefs"),
5240 sql.func.array_agg(
5241 # NOTE: cast since some postgresql derivatives may
5242 # not support array_agg on the name type
5243 pg_catalog.pg_constraint.c.conname.cast(TEXT)
5244 ).label("connames"),
5245 )
5246 # The domain this constraint is on; zero if not a domain constraint
5247 .where(pg_catalog.pg_constraint.c.contypid != 0)
5248 .group_by(pg_catalog.pg_constraint.c.contypid)
5249 .subquery("domain_constraints")
5250 )
5251
5252 query = (
5253 select(
5254 pg_catalog.pg_type.c.typname.label("name"),
5255 pg_catalog.format_type(
5256 pg_catalog.pg_type.c.typbasetype,
5257 pg_catalog.pg_type.c.typtypmod,
5258 ).label("attype"),
5259 (~pg_catalog.pg_type.c.typnotnull).label("nullable"),
5260 pg_catalog.pg_type.c.typdefault.label("default"),
5261 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label(
5262 "visible"
5263 ),
5264 pg_catalog.pg_namespace.c.nspname.label("schema"),
5265 con_sq.c.condefs,
5266 con_sq.c.connames,
5267 pg_catalog.pg_collation.c.collname,
5268 )
5269 .join(
5270 pg_catalog.pg_namespace,
5271 pg_catalog.pg_namespace.c.oid
5272 == pg_catalog.pg_type.c.typnamespace,
5273 )
5274 .outerjoin(
5275 pg_catalog.pg_collation,
5276 pg_catalog.pg_type.c.typcollation
5277 == pg_catalog.pg_collation.c.oid,
5278 )
5279 .outerjoin(
5280 con_sq,
5281 pg_catalog.pg_type.c.oid == con_sq.c.contypid,
5282 )
5283 .where(pg_catalog.pg_type.c.typtype == "d")
5284 .order_by(
5285 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname
5286 )
5287 )
5288 return self._pg_type_filter_schema(query, schema)
5289
5290 @reflection.cache
5291 def _load_domains(self, connection, schema=None, **kw):
5292 result = connection.execute(self._domain_query(schema))
5293
5294 domains: List[ReflectedDomain] = []
5295 for domain in result.mappings():
5296 # strip (30) from character varying(30)
5297 attype = re.search(r"([^\(]+)", domain["attype"]).group(1)
5298 constraints: List[ReflectedDomainConstraint] = []
5299 if domain["connames"]:
5300 # When a domain has multiple CHECK constraints, they will
5301 # be tested in alphabetical order by name.
5302 sorted_constraints = sorted(
5303 zip(domain["connames"], domain["condefs"]),
5304 key=lambda t: t[0],
5305 )
5306 for name, def_ in sorted_constraints:
5307 # constraint is in the form "CHECK (expression)"
5308 # or "NOT NULL". Ignore the "NOT NULL" and
5309 # remove "CHECK (" and the tailing ")".
5310 if def_.casefold().startswith("check"):
5311 check = def_[7:-1]
5312 constraints.append({"name": name, "check": check})
5313 domain_rec: ReflectedDomain = {
5314 "name": domain["name"],
5315 "schema": domain["schema"],
5316 "visible": domain["visible"],
5317 "type": attype,
5318 "nullable": domain["nullable"],
5319 "default": domain["default"],
5320 "constraints": constraints,
5321 "collation": domain["collname"],
5322 }
5323 domains.append(domain_rec)
5324
5325 return domains
5326
5327 def _set_backslash_escapes(self, connection):
5328 # this method is provided as an override hook for descendant
5329 # dialects (e.g. Redshift), so removing it may break them
5330 std_string = connection.exec_driver_sql(
5331 "show standard_conforming_strings"
5332 ).scalar()
5333 self._backslash_escapes = std_string == "off"