1# dialects/postgresql/base.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9r"""
10.. dialect:: postgresql
11 :name: PostgreSQL
12 :normal_support: 9.6+
13 :best_effort: 9+
14
15.. _postgresql_sequences:
16
17Sequences/SERIAL/IDENTITY
18-------------------------
19
20PostgreSQL supports sequences, and SQLAlchemy uses these as the default means
21of creating new primary key values for integer-based primary key columns. When
22creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for
23integer-based primary key columns, which generates a sequence and server side
24default corresponding to the column.
25
26To specify a specific named sequence to be used for primary key generation,
27use the :func:`~sqlalchemy.schema.Sequence` construct::
28
29 Table(
30 "sometable",
31 metadata,
32 Column(
33 "id", Integer, Sequence("some_id_seq", start=1), primary_key=True
34 ),
35 )
36
37When SQLAlchemy issues a single INSERT statement, to fulfill the contract of
38having the "last insert identifier" available, a RETURNING clause is added to
39the INSERT statement which specifies the primary key columns should be
40returned after the statement completes. The RETURNING functionality only takes
41place if PostgreSQL 8.2 or later is in use. As a fallback approach, the
42sequence, whether specified explicitly or implicitly via ``SERIAL``, is
43executed independently beforehand, the returned value to be used in the
44subsequent insert. Note that when an
45:func:`~sqlalchemy.sql.expression.insert()` construct is executed using
46"executemany" semantics, the "last inserted identifier" functionality does not
47apply; no RETURNING clause is emitted nor is the sequence pre-executed in this
48case.
49
50
51PostgreSQL 10 and above IDENTITY columns
52^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
53
54PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use
55of SERIAL. The :class:`_schema.Identity` construct in a
56:class:`_schema.Column` can be used to control its behavior::
57
58 from sqlalchemy import Table, Column, MetaData, Integer, Computed
59
60 metadata = MetaData()
61
62 data = Table(
63 "data",
64 metadata,
65 Column(
66 "id", Integer, Identity(start=42, cycle=True), primary_key=True
67 ),
68 Column("data", String),
69 )
70
71The CREATE TABLE for the above :class:`_schema.Table` object would be:
72
73.. sourcecode:: sql
74
75 CREATE TABLE data (
76 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE),
77 data VARCHAR,
78 PRIMARY KEY (id)
79 )
80
81.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
82 in a :class:`_schema.Column` to specify the option of an autoincrementing
83 column.
84
85.. note::
86
87 Previous versions of SQLAlchemy did not have built-in support for rendering
88 of IDENTITY, and could use the following compilation hook to replace
89 occurrences of SERIAL with IDENTITY::
90
91 from sqlalchemy.schema import CreateColumn
92 from sqlalchemy.ext.compiler import compiles
93
94
95 @compiles(CreateColumn, "postgresql")
96 def use_identity(element, compiler, **kw):
97 text = compiler.visit_create_column(element, **kw)
98 text = text.replace("SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY")
99 return text
100
101 Using the above, a table such as::
102
103 t = Table(
104 "t", m, Column("id", Integer, primary_key=True), Column("data", String)
105 )
106
107 Will generate on the backing database as:
108
109 .. sourcecode:: sql
110
111 CREATE TABLE t (
112 id INT GENERATED BY DEFAULT AS IDENTITY,
113 data VARCHAR,
114 PRIMARY KEY (id)
115 )
116
117.. _postgresql_ss_cursors:
118
119Server Side Cursors
120-------------------
121
122Server-side cursor support is available for the psycopg2, asyncpg
123dialects and may also be available in others.
124
125Server side cursors are enabled on a per-statement basis by using the
126:paramref:`.Connection.execution_options.stream_results` connection execution
127option::
128
129 with engine.connect() as conn:
130 result = conn.execution_options(stream_results=True).execute(
131 text("select * from table")
132 )
133
134Note that some kinds of SQL statements may not be supported with
135server side cursors; generally, only SQL statements that return rows should be
136used with this option.
137
138.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated
139 and will be removed in a future release. Please use the
140 :paramref:`_engine.Connection.stream_results` execution option for
141 unbuffered cursor support.
142
143.. seealso::
144
145 :ref:`engine_stream_results`
146
147.. _postgresql_isolation_level:
148
149Transaction Isolation Level
150---------------------------
151
152Most SQLAlchemy dialects support setting of transaction isolation level
153using the :paramref:`_sa.create_engine.isolation_level` parameter
154at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection`
155level via the :paramref:`.Connection.execution_options.isolation_level`
156parameter.
157
158For PostgreSQL dialects, this feature works either by making use of the
159DBAPI-specific features, such as psycopg2's isolation level flags which will
160embed the isolation level setting inline with the ``"BEGIN"`` statement, or for
161DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS
162TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement
163emitted by the DBAPI. For the special AUTOCOMMIT isolation level,
164DBAPI-specific techniques are used which is typically an ``.autocommit``
165flag on the DBAPI connection object.
166
167To set isolation level using :func:`_sa.create_engine`::
168
169 engine = create_engine(
170 "postgresql+pg8000://scott:tiger@localhost/test",
171 isolation_level="REPEATABLE READ",
172 )
173
174To set using per-connection execution options::
175
176 with engine.connect() as conn:
177 conn = conn.execution_options(isolation_level="REPEATABLE READ")
178 with conn.begin():
179 ... # work with transaction
180
181There are also more options for isolation level configurations, such as
182"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
183different isolation level settings. See the discussion at
184:ref:`dbapi_autocommit` for background.
185
186Valid values for ``isolation_level`` on most PostgreSQL dialects include:
187
188* ``READ COMMITTED``
189* ``READ UNCOMMITTED``
190* ``REPEATABLE READ``
191* ``SERIALIZABLE``
192* ``AUTOCOMMIT``
193
194.. seealso::
195
196 :ref:`dbapi_autocommit`
197
198 :ref:`postgresql_readonly_deferrable`
199
200 :ref:`psycopg2_isolation_level`
201
202 :ref:`pg8000_isolation_level`
203
204.. _postgresql_readonly_deferrable:
205
206Setting READ ONLY / DEFERRABLE
207------------------------------
208
209Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE"
210characteristics of the transaction, which is in addition to the isolation level
211setting. These two attributes can be established either in conjunction with or
212independently of the isolation level by passing the ``postgresql_readonly`` and
213``postgresql_deferrable`` flags with
214:meth:`_engine.Connection.execution_options`. The example below illustrates
215passing the ``"SERIALIZABLE"`` isolation level at the same time as setting
216"READ ONLY" and "DEFERRABLE"::
217
218 with engine.connect() as conn:
219 conn = conn.execution_options(
220 isolation_level="SERIALIZABLE",
221 postgresql_readonly=True,
222 postgresql_deferrable=True,
223 )
224 with conn.begin():
225 ... # work with transaction
226
227Note that some DBAPIs such as asyncpg only support "readonly" with
228SERIALIZABLE isolation.
229
230.. versionadded:: 1.4 added support for the ``postgresql_readonly``
231 and ``postgresql_deferrable`` execution options.
232
233.. _postgresql_reset_on_return:
234
235Temporary Table / Resource Reset for Connection Pooling
236-------------------------------------------------------
237
238The :class:`.QueuePool` connection pool implementation used
239by the SQLAlchemy :class:`.Engine` object includes
240:ref:`reset on return <pool_reset_on_return>` behavior that will invoke
241the DBAPI ``.rollback()`` method when connections are returned to the pool.
242While this rollback will clear out the immediate state used by the previous
243transaction, it does not cover a wider range of session-level state, including
244temporary tables as well as other server state such as prepared statement
245handles and statement caches. The PostgreSQL database includes a variety
246of commands which may be used to reset this state, including
247``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``.
248
249
250To install
251one or more of these commands as the means of performing reset-on-return,
252the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated
253in the example below. The implementation
254will end transactions in progress as well as discard temporary tables
255using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL
256documentation for background on what each of these statements do.
257
258The :paramref:`_sa.create_engine.pool_reset_on_return` parameter
259is set to ``None`` so that the custom scheme can replace the default behavior
260completely. The custom hook implementation calls ``.rollback()`` in any case,
261as it's usually important that the DBAPI's own tracking of commit/rollback
262will remain consistent with the state of the transaction::
263
264
265 from sqlalchemy import create_engine
266 from sqlalchemy import event
267
268 postgresql_engine = create_engine(
269 "postgresql+psycopg2://scott:tiger@hostname/dbname",
270 # disable default reset-on-return scheme
271 pool_reset_on_return=None,
272 )
273
274
275 @event.listens_for(postgresql_engine, "reset")
276 def _reset_postgresql(dbapi_connection, connection_record, reset_state):
277 if not reset_state.terminate_only:
278 dbapi_connection.execute("CLOSE ALL")
279 dbapi_connection.execute("RESET ALL")
280 dbapi_connection.execute("DISCARD TEMP")
281
282 # so that the DBAPI itself knows that the connection has been
283 # reset
284 dbapi_connection.rollback()
285
286.. versionchanged:: 2.0.0b3 Added additional state arguments to
287 the :meth:`.PoolEvents.reset` event and additionally ensured the event
288 is invoked for all "reset" occurrences, so that it's appropriate
289 as a place for custom "reset" handlers. Previous schemes which
290 use the :meth:`.PoolEvents.checkin` handler remain usable as well.
291
292.. seealso::
293
294 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation
295
296.. _postgresql_alternate_search_path:
297
298Setting Alternate Search Paths on Connect
299------------------------------------------
300
301The PostgreSQL ``search_path`` variable refers to the list of schema names
302that will be implicitly referenced when a particular table or other
303object is referenced in a SQL statement. As detailed in the next section
304:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around
305the concept of keeping this variable at its default value of ``public``,
306however, in order to have it set to any arbitrary name or names when connections
307are used automatically, the "SET SESSION search_path" command may be invoked
308for all connections in a pool using the following event handler, as discussed
309at :ref:`schema_set_default_connections`::
310
311 from sqlalchemy import event
312 from sqlalchemy import create_engine
313
314 engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname")
315
316
317 @event.listens_for(engine, "connect", insert=True)
318 def set_search_path(dbapi_connection, connection_record):
319 existing_autocommit = dbapi_connection.autocommit
320 dbapi_connection.autocommit = True
321 cursor = dbapi_connection.cursor()
322 cursor.execute("SET SESSION search_path='%s'" % schema_name)
323 cursor.close()
324 dbapi_connection.autocommit = existing_autocommit
325
326The reason the recipe is complicated by use of the ``.autocommit`` DBAPI
327attribute is so that when the ``SET SESSION search_path`` directive is invoked,
328it is invoked outside of the scope of any transaction and therefore will not
329be reverted when the DBAPI connection has a rollback.
330
331.. seealso::
332
333 :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation
334
335.. _postgresql_schema_reflection:
336
337Remote-Schema Table Introspection and PostgreSQL search_path
338------------------------------------------------------------
339
340.. admonition:: Section Best Practices Summarized
341
342 keep the ``search_path`` variable set to its default of ``public``, without
343 any other schema names. Ensure the username used to connect **does not**
344 match remote schemas, or ensure the ``"$user"`` token is **removed** from
345 ``search_path``. For other schema names, name these explicitly
346 within :class:`_schema.Table` definitions. Alternatively, the
347 ``postgresql_ignore_search_path`` option will cause all reflected
348 :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema`
349 attribute set up.
350
351The PostgreSQL dialect can reflect tables from any schema, as outlined in
352:ref:`metadata_reflection_schemas`.
353
354In all cases, the first thing SQLAlchemy does when reflecting tables is
355to **determine the default schema for the current database connection**.
356It does this using the PostgreSQL ``current_schema()``
357function, illustated below using a PostgreSQL client session (i.e. using
358the ``psql`` tool):
359
360.. sourcecode:: sql
361
362 test=> select current_schema();
363 current_schema
364 ----------------
365 public
366 (1 row)
367
368Above we see that on a plain install of PostgreSQL, the default schema name
369is the name ``public``.
370
371However, if your database username **matches the name of a schema**, PostgreSQL's
372default is to then **use that name as the default schema**. Below, we log in
373using the username ``scott``. When we create a schema named ``scott``, **it
374implicitly changes the default schema**:
375
376.. sourcecode:: sql
377
378 test=> select current_schema();
379 current_schema
380 ----------------
381 public
382 (1 row)
383
384 test=> create schema scott;
385 CREATE SCHEMA
386 test=> select current_schema();
387 current_schema
388 ----------------
389 scott
390 (1 row)
391
392The behavior of ``current_schema()`` is derived from the
393`PostgreSQL search path
394<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
395variable ``search_path``, which in modern PostgreSQL versions defaults to this:
396
397.. sourcecode:: sql
398
399 test=> show search_path;
400 search_path
401 -----------------
402 "$user", public
403 (1 row)
404
405Where above, the ``"$user"`` variable will inject the current username as the
406default schema, if one exists. Otherwise, ``public`` is used.
407
408When a :class:`_schema.Table` object is reflected, if it is present in the
409schema indicated by the ``current_schema()`` function, **the schema name assigned
410to the ".schema" attribute of the Table is the Python "None" value**. Otherwise, the
411".schema" attribute will be assigned the string name of that schema.
412
413With regards to tables which these :class:`_schema.Table`
414objects refer to via foreign key constraint, a decision must be made as to how
415the ``.schema`` is represented in those remote tables, in the case where that
416remote schema name is also a member of the current ``search_path``.
417
418By default, the PostgreSQL dialect mimics the behavior encouraged by
419PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function
420returns a sample definition for a particular foreign key constraint,
421omitting the referenced schema name from that definition when the name is
422also in the PostgreSQL schema search path. The interaction below
423illustrates this behavior:
424
425.. sourcecode:: sql
426
427 test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY);
428 CREATE TABLE
429 test=> CREATE TABLE referring(
430 test(> id INTEGER PRIMARY KEY,
431 test(> referred_id INTEGER REFERENCES test_schema.referred(id));
432 CREATE TABLE
433 test=> SET search_path TO public, test_schema;
434 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
435 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
436 test-> ON n.oid = c.relnamespace
437 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
438 test-> WHERE c.relname='referring' AND r.contype = 'f'
439 test-> ;
440 pg_get_constraintdef
441 ---------------------------------------------------
442 FOREIGN KEY (referred_id) REFERENCES referred(id)
443 (1 row)
444
445Above, we created a table ``referred`` as a member of the remote schema
446``test_schema``, however when we added ``test_schema`` to the
447PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the
448``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of
449the function.
450
451On the other hand, if we set the search path back to the typical default
452of ``public``:
453
454.. sourcecode:: sql
455
456 test=> SET search_path TO public;
457 SET
458
459The same query against ``pg_get_constraintdef()`` now returns the fully
460schema-qualified name for us:
461
462.. sourcecode:: sql
463
464 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
465 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
466 test-> ON n.oid = c.relnamespace
467 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
468 test-> WHERE c.relname='referring' AND r.contype = 'f';
469 pg_get_constraintdef
470 ---------------------------------------------------------------
471 FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id)
472 (1 row)
473
474SQLAlchemy will by default use the return value of ``pg_get_constraintdef()``
475in order to determine the remote schema name. That is, if our ``search_path``
476were set to include ``test_schema``, and we invoked a table
477reflection process as follows::
478
479 >>> from sqlalchemy import Table, MetaData, create_engine, text
480 >>> engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test")
481 >>> with engine.connect() as conn:
482 ... conn.execute(text("SET search_path TO test_schema, public"))
483 ... metadata_obj = MetaData()
484 ... referring = Table("referring", metadata_obj, autoload_with=conn)
485 <sqlalchemy.engine.result.CursorResult object at 0x101612ed0>
486
487The above process would deliver to the :attr:`_schema.MetaData.tables`
488collection
489``referred`` table named **without** the schema::
490
491 >>> metadata_obj.tables["referred"].schema is None
492 True
493
494To alter the behavior of reflection such that the referred schema is
495maintained regardless of the ``search_path`` setting, use the
496``postgresql_ignore_search_path`` option, which can be specified as a
497dialect-specific argument to both :class:`_schema.Table` as well as
498:meth:`_schema.MetaData.reflect`::
499
500 >>> with engine.connect() as conn:
501 ... conn.execute(text("SET search_path TO test_schema, public"))
502 ... metadata_obj = MetaData()
503 ... referring = Table(
504 ... "referring",
505 ... metadata_obj,
506 ... autoload_with=conn,
507 ... postgresql_ignore_search_path=True,
508 ... )
509 <sqlalchemy.engine.result.CursorResult object at 0x1016126d0>
510
511We will now have ``test_schema.referred`` stored as schema-qualified::
512
513 >>> metadata_obj.tables["test_schema.referred"].schema
514 'test_schema'
515
516.. sidebar:: Best Practices for PostgreSQL Schema reflection
517
518 The description of PostgreSQL schema reflection behavior is complex, and
519 is the product of many years of dealing with widely varied use cases and
520 user preferences. But in fact, there's no need to understand any of it if
521 you just stick to the simplest use pattern: leave the ``search_path`` set
522 to its default of ``public`` only, never refer to the name ``public`` as
523 an explicit schema name otherwise, and refer to all other schema names
524 explicitly when building up a :class:`_schema.Table` object. The options
525 described here are only for those users who can't, or prefer not to, stay
526 within these guidelines.
527
528.. seealso::
529
530 :ref:`reflection_schema_qualified_interaction` - discussion of the issue
531 from a backend-agnostic perspective
532
533 `The Schema Search Path
534 <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
535 - on the PostgreSQL website.
536
537INSERT/UPDATE...RETURNING
538-------------------------
539
540The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and
541``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default
542for single-row INSERT statements in order to fetch newly generated
543primary key identifiers. To specify an explicit ``RETURNING`` clause,
544use the :meth:`._UpdateBase.returning` method on a per-statement basis::
545
546 # INSERT..RETURNING
547 result = (
548 table.insert().returning(table.c.col1, table.c.col2).values(name="foo")
549 )
550 print(result.fetchall())
551
552 # UPDATE..RETURNING
553 result = (
554 table.update()
555 .returning(table.c.col1, table.c.col2)
556 .where(table.c.name == "foo")
557 .values(name="bar")
558 )
559 print(result.fetchall())
560
561 # DELETE..RETURNING
562 result = (
563 table.delete()
564 .returning(table.c.col1, table.c.col2)
565 .where(table.c.name == "foo")
566 )
567 print(result.fetchall())
568
569.. _postgresql_insert_on_conflict:
570
571INSERT...ON CONFLICT (Upsert)
572------------------------------
573
574Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of
575rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A
576candidate row will only be inserted if that row does not violate any unique
577constraints. In the case of a unique constraint violation, a secondary action
578can occur which can be either "DO UPDATE", indicating that the data in the
579target row should be updated, or "DO NOTHING", which indicates to silently skip
580this row.
581
582Conflicts are determined using existing unique constraints and indexes. These
583constraints may be identified either using their name as stated in DDL,
584or they may be inferred by stating the columns and conditions that comprise
585the indexes.
586
587SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific
588:func:`_postgresql.insert()` function, which provides
589the generative methods :meth:`_postgresql.Insert.on_conflict_do_update`
590and :meth:`~.postgresql.Insert.on_conflict_do_nothing`:
591
592.. sourcecode:: pycon+sql
593
594 >>> from sqlalchemy.dialects.postgresql import insert
595 >>> insert_stmt = insert(my_table).values(
596 ... id="some_existing_id", data="inserted value"
597 ... )
598 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(index_elements=["id"])
599 >>> print(do_nothing_stmt)
600 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
601 ON CONFLICT (id) DO NOTHING
602 {stop}
603
604 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
605 ... constraint="pk_my_table", set_=dict(data="updated value")
606 ... )
607 >>> print(do_update_stmt)
608 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
609 ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s
610
611.. seealso::
612
613 `INSERT .. ON CONFLICT
614 <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_
615 - in the PostgreSQL documentation.
616
617Specifying the Target
618^^^^^^^^^^^^^^^^^^^^^
619
620Both methods supply the "target" of the conflict using either the
621named constraint or by column inference:
622
623* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument
624 specifies a sequence containing string column names, :class:`_schema.Column`
625 objects, and/or SQL expression elements, which would identify a unique
626 index:
627
628 .. sourcecode:: pycon+sql
629
630 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
631 ... index_elements=["id"], set_=dict(data="updated value")
632 ... )
633 >>> print(do_update_stmt)
634 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
635 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
636 {stop}
637
638 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
639 ... index_elements=[my_table.c.id], set_=dict(data="updated value")
640 ... )
641 >>> print(do_update_stmt)
642 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
643 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
644
645* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to
646 infer an index, a partial index can be inferred by also specifying the
647 use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter:
648
649 .. sourcecode:: pycon+sql
650
651 >>> stmt = insert(my_table).values(user_email="a@b.com", data="inserted data")
652 >>> stmt = stmt.on_conflict_do_update(
653 ... index_elements=[my_table.c.user_email],
654 ... index_where=my_table.c.user_email.like("%@gmail.com"),
655 ... set_=dict(data=stmt.excluded.data),
656 ... )
657 >>> print(stmt)
658 {printsql}INSERT INTO my_table (data, user_email)
659 VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email)
660 WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data
661
662* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is
663 used to specify an index directly rather than inferring it. This can be
664 the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX:
665
666 .. sourcecode:: pycon+sql
667
668 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
669 ... constraint="my_table_idx_1", set_=dict(data="updated value")
670 ... )
671 >>> print(do_update_stmt)
672 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
673 ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s
674 {stop}
675
676 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
677 ... constraint="my_table_pk", set_=dict(data="updated value")
678 ... )
679 >>> print(do_update_stmt)
680 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
681 ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s
682 {stop}
683
684* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may
685 also refer to a SQLAlchemy construct representing a constraint,
686 e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`,
687 :class:`.Index`, or :class:`.ExcludeConstraint`. In this use,
688 if the constraint has a name, it is used directly. Otherwise, if the
689 constraint is unnamed, then inference will be used, where the expressions
690 and optional WHERE clause of the constraint will be spelled out in the
691 construct. This use is especially convenient
692 to refer to the named or unnamed primary key of a :class:`_schema.Table`
693 using the
694 :attr:`_schema.Table.primary_key` attribute:
695
696 .. sourcecode:: pycon+sql
697
698 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
699 ... constraint=my_table.primary_key, set_=dict(data="updated value")
700 ... )
701 >>> print(do_update_stmt)
702 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
703 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
704
705The SET Clause
706^^^^^^^^^^^^^^^
707
708``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
709existing row, using any combination of new values as well as values
710from the proposed insertion. These values are specified using the
711:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This
712parameter accepts a dictionary which consists of direct values
713for UPDATE:
714
715.. sourcecode:: pycon+sql
716
717 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
718 >>> do_update_stmt = stmt.on_conflict_do_update(
719 ... index_elements=["id"], set_=dict(data="updated value")
720 ... )
721 >>> print(do_update_stmt)
722 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
723 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
724
725.. warning::
726
727 The :meth:`_expression.Insert.on_conflict_do_update`
728 method does **not** take into
729 account Python-side default UPDATE values or generation functions, e.g.
730 those specified using :paramref:`_schema.Column.onupdate`.
731 These values will not be exercised for an ON CONFLICT style of UPDATE,
732 unless they are manually specified in the
733 :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary.
734
735Updating using the Excluded INSERT Values
736^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
737
738In order to refer to the proposed insertion row, the special alias
739:attr:`~.postgresql.Insert.excluded` is available as an attribute on
740the :class:`_postgresql.Insert` object; this object is a
741:class:`_expression.ColumnCollection`
742which alias contains all columns of the target
743table:
744
745.. sourcecode:: pycon+sql
746
747 >>> stmt = insert(my_table).values(
748 ... id="some_id", data="inserted value", author="jlh"
749 ... )
750 >>> do_update_stmt = stmt.on_conflict_do_update(
751 ... index_elements=["id"],
752 ... set_=dict(data="updated value", author=stmt.excluded.author),
753 ... )
754 >>> print(do_update_stmt)
755 {printsql}INSERT INTO my_table (id, data, author)
756 VALUES (%(id)s, %(data)s, %(author)s)
757 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
758
759Additional WHERE Criteria
760^^^^^^^^^^^^^^^^^^^^^^^^^
761
762The :meth:`_expression.Insert.on_conflict_do_update` method also accepts
763a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where`
764parameter, which will limit those rows which receive an UPDATE:
765
766.. sourcecode:: pycon+sql
767
768 >>> stmt = insert(my_table).values(
769 ... id="some_id", data="inserted value", author="jlh"
770 ... )
771 >>> on_update_stmt = stmt.on_conflict_do_update(
772 ... index_elements=["id"],
773 ... set_=dict(data="updated value", author=stmt.excluded.author),
774 ... where=(my_table.c.status == 2),
775 ... )
776 >>> print(on_update_stmt)
777 {printsql}INSERT INTO my_table (id, data, author)
778 VALUES (%(id)s, %(data)s, %(author)s)
779 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
780 WHERE my_table.status = %(status_1)s
781
782Skipping Rows with DO NOTHING
783^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
784
785``ON CONFLICT`` may be used to skip inserting a row entirely
786if any conflict with a unique or exclusion constraint occurs; below
787this is illustrated using the
788:meth:`~.postgresql.Insert.on_conflict_do_nothing` method:
789
790.. sourcecode:: pycon+sql
791
792 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
793 >>> stmt = stmt.on_conflict_do_nothing(index_elements=["id"])
794 >>> print(stmt)
795 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
796 ON CONFLICT (id) DO NOTHING
797
798If ``DO NOTHING`` is used without specifying any columns or constraint,
799it has the effect of skipping the INSERT for any unique or exclusion
800constraint violation which occurs:
801
802.. sourcecode:: pycon+sql
803
804 >>> stmt = insert(my_table).values(id="some_id", data="inserted value")
805 >>> stmt = stmt.on_conflict_do_nothing()
806 >>> print(stmt)
807 {printsql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
808 ON CONFLICT DO NOTHING
809
810.. _postgresql_match:
811
812Full Text Search
813----------------
814
815PostgreSQL's full text search system is available through the use of the
816:data:`.func` namespace, combined with the use of custom operators
817via the :meth:`.Operators.bool_op` method. For simple cases with some
818degree of cross-backend compatibility, the :meth:`.Operators.match` operator
819may also be used.
820
821.. _postgresql_simple_match:
822
823Simple plain text matching with ``match()``
824^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
825
826The :meth:`.Operators.match` operator provides for cross-compatible simple
827text matching. For the PostgreSQL backend, it's hardcoded to generate
828an expression using the ``@@`` operator in conjunction with the
829``plainto_tsquery()`` PostgreSQL function.
830
831On the PostgreSQL dialect, an expression like the following::
832
833 select(sometable.c.text.match("search string"))
834
835would emit to the database:
836
837.. sourcecode:: sql
838
839 SELECT text @@ plainto_tsquery('search string') FROM table
840
841Above, passing a plain string to :meth:`.Operators.match` will automatically
842make use of ``plainto_tsquery()`` to specify the type of tsquery. This
843establishes basic database cross-compatibility for :meth:`.Operators.match`
844with other backends.
845
846.. versionchanged:: 2.0 The default tsquery generation function used by the
847 PostgreSQL dialect with :meth:`.Operators.match` is ``plainto_tsquery()``.
848
849 To render exactly what was rendered in 1.4, use the following form::
850
851 from sqlalchemy import func
852
853 select(sometable.c.text.bool_op("@@")(func.to_tsquery("search string")))
854
855 Which would emit:
856
857 .. sourcecode:: sql
858
859 SELECT text @@ to_tsquery('search string') FROM table
860
861Using PostgreSQL full text functions and operators directly
862^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
863
864Text search operations beyond the simple use of :meth:`.Operators.match`
865may make use of the :data:`.func` namespace to generate PostgreSQL full-text
866functions, in combination with :meth:`.Operators.bool_op` to generate
867any boolean operator.
868
869For example, the query::
870
871 select(func.to_tsquery("cat").bool_op("@>")(func.to_tsquery("cat & rat")))
872
873would generate:
874
875.. sourcecode:: sql
876
877 SELECT to_tsquery('cat') @> to_tsquery('cat & rat')
878
879
880The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST::
881
882 from sqlalchemy.dialects.postgresql import TSVECTOR
883 from sqlalchemy import select, cast
884
885 select(cast("some text", TSVECTOR))
886
887produces a statement equivalent to:
888
889.. sourcecode:: sql
890
891 SELECT CAST('some text' AS TSVECTOR) AS anon_1
892
893The ``func`` namespace is augmented by the PostgreSQL dialect to set up
894correct argument and return types for most full text search functions.
895These functions are used automatically by the :attr:`_sql.func` namespace
896assuming the ``sqlalchemy.dialects.postgresql`` package has been imported,
897or :func:`_sa.create_engine` has been invoked using a ``postgresql``
898dialect. These functions are documented at:
899
900* :class:`_postgresql.to_tsvector`
901* :class:`_postgresql.to_tsquery`
902* :class:`_postgresql.plainto_tsquery`
903* :class:`_postgresql.phraseto_tsquery`
904* :class:`_postgresql.websearch_to_tsquery`
905* :class:`_postgresql.ts_headline`
906
907Specifying the "regconfig" with ``match()`` or custom operators
908^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
909
910PostgreSQL's ``plainto_tsquery()`` function accepts an optional
911"regconfig" argument that is used to instruct PostgreSQL to use a
912particular pre-computed GIN or GiST index in order to perform the search.
913When using :meth:`.Operators.match`, this additional parameter may be
914specified using the ``postgresql_regconfig`` parameter, such as::
915
916 select(mytable.c.id).where(
917 mytable.c.title.match("somestring", postgresql_regconfig="english")
918 )
919
920Which would emit:
921
922.. sourcecode:: sql
923
924 SELECT mytable.id FROM mytable
925 WHERE mytable.title @@ plainto_tsquery('english', 'somestring')
926
927When using other PostgreSQL search functions with :data:`.func`, the
928"regconfig" parameter may be passed directly as the initial argument::
929
930 select(mytable.c.id).where(
931 func.to_tsvector("english", mytable.c.title).bool_op("@@")(
932 func.to_tsquery("english", "somestring")
933 )
934 )
935
936produces a statement equivalent to:
937
938.. sourcecode:: sql
939
940 SELECT mytable.id FROM mytable
941 WHERE to_tsvector('english', mytable.title) @@
942 to_tsquery('english', 'somestring')
943
944It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from
945PostgreSQL to ensure that you are generating queries with SQLAlchemy that
946take full advantage of any indexes you may have created for full text search.
947
948.. seealso::
949
950 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation
951
952
953FROM ONLY ...
954-------------
955
956The dialect supports PostgreSQL's ONLY keyword for targeting only a particular
957table in an inheritance hierarchy. This can be used to produce the
958``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...``
959syntaxes. It uses SQLAlchemy's hints mechanism::
960
961 # SELECT ... FROM ONLY ...
962 result = table.select().with_hint(table, "ONLY", "postgresql")
963 print(result.fetchall())
964
965 # UPDATE ONLY ...
966 table.update(values=dict(foo="bar")).with_hint(
967 "ONLY", dialect_name="postgresql"
968 )
969
970 # DELETE FROM ONLY ...
971 table.delete().with_hint("ONLY", dialect_name="postgresql")
972
973.. _postgresql_indexes:
974
975PostgreSQL-Specific Index Options
976---------------------------------
977
978Several extensions to the :class:`.Index` construct are available, specific
979to the PostgreSQL dialect.
980
981.. _postgresql_covering_indexes:
982
983Covering Indexes
984^^^^^^^^^^^^^^^^
985
986A covering index includes additional columns that are not part of the index key
987but are stored in the index, allowing PostgreSQL to satisfy queries using only
988the index without accessing the table (an "index-only scan"). This is
989indicated on the index using the ``INCLUDE`` clause. The
990``postgresql_include`` option for :class:`.Index` (as well as
991:class:`.UniqueConstraint`) renders ``INCLUDE(colname)`` for the given string
992names::
993
994 Index("my_index", table.c.x, postgresql_include=["y"])
995
996would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
997
998Note that this feature requires PostgreSQL 11 or later.
999
1000.. seealso::
1001
1002 :ref:`postgresql_constraint_options_include` - the same feature implemented
1003 for :class:`.UniqueConstraint`
1004
1005.. versionadded:: 1.4 - support for covering indexes with :class:`.Index`.
1006 support for :class:`.UniqueConstraint` was in 2.0.41
1007
1008.. _postgresql_partial_indexes:
1009
1010Partial Indexes
1011^^^^^^^^^^^^^^^
1012
1013Partial indexes add criterion to the index definition so that the index is
1014applied to a subset of rows. These can be specified on :class:`.Index`
1015using the ``postgresql_where`` keyword argument::
1016
1017 Index("my_index", my_table.c.id, postgresql_where=my_table.c.value > 10)
1018
1019.. _postgresql_operator_classes:
1020
1021Operator Classes
1022^^^^^^^^^^^^^^^^
1023
1024PostgreSQL allows the specification of an *operator class* for each column of
1025an index (see
1026https://www.postgresql.org/docs/current/interactive/indexes-opclass.html).
1027The :class:`.Index` construct allows these to be specified via the
1028``postgresql_ops`` keyword argument::
1029
1030 Index(
1031 "my_index",
1032 my_table.c.id,
1033 my_table.c.data,
1034 postgresql_ops={"data": "text_pattern_ops", "id": "int4_ops"},
1035 )
1036
1037Note that the keys in the ``postgresql_ops`` dictionaries are the
1038"key" name of the :class:`_schema.Column`, i.e. the name used to access it from
1039the ``.c`` collection of :class:`_schema.Table`, which can be configured to be
1040different than the actual name of the column as expressed in the database.
1041
1042If ``postgresql_ops`` is to be used against a complex SQL expression such
1043as a function call, then to apply to the column it must be given a label
1044that is identified in the dictionary by name, e.g.::
1045
1046 Index(
1047 "my_index",
1048 my_table.c.id,
1049 func.lower(my_table.c.data).label("data_lower"),
1050 postgresql_ops={"data_lower": "text_pattern_ops", "id": "int4_ops"},
1051 )
1052
1053Operator classes are also supported by the
1054:class:`_postgresql.ExcludeConstraint` construct using the
1055:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for
1056details.
1057
1058.. versionadded:: 1.3.21 added support for operator classes with
1059 :class:`_postgresql.ExcludeConstraint`.
1060
1061
1062Index Types
1063^^^^^^^^^^^
1064
1065PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well
1066as the ability for users to create their own (see
1067https://www.postgresql.org/docs/current/static/indexes-types.html). These can be
1068specified on :class:`.Index` using the ``postgresql_using`` keyword argument::
1069
1070 Index("my_index", my_table.c.data, postgresql_using="gin")
1071
1072The value passed to the keyword argument will be simply passed through to the
1073underlying CREATE INDEX command, so it *must* be a valid index type for your
1074version of PostgreSQL.
1075
1076.. _postgresql_index_storage:
1077
1078Index Storage Parameters
1079^^^^^^^^^^^^^^^^^^^^^^^^
1080
1081PostgreSQL allows storage parameters to be set on indexes. The storage
1082parameters available depend on the index method used by the index. Storage
1083parameters can be specified on :class:`.Index` using the ``postgresql_with``
1084keyword argument::
1085
1086 Index("my_index", my_table.c.data, postgresql_with={"fillfactor": 50})
1087
1088PostgreSQL allows to define the tablespace in which to create the index.
1089The tablespace can be specified on :class:`.Index` using the
1090``postgresql_tablespace`` keyword argument::
1091
1092 Index("my_index", my_table.c.data, postgresql_tablespace="my_tablespace")
1093
1094Note that the same option is available on :class:`_schema.Table` as well.
1095
1096.. _postgresql_index_concurrently:
1097
1098Indexes with CONCURRENTLY
1099^^^^^^^^^^^^^^^^^^^^^^^^^
1100
1101The PostgreSQL index option CONCURRENTLY is supported by passing the
1102flag ``postgresql_concurrently`` to the :class:`.Index` construct::
1103
1104 tbl = Table("testtbl", m, Column("data", Integer))
1105
1106 idx1 = Index("test_idx1", tbl.c.data, postgresql_concurrently=True)
1107
1108The above index construct will render DDL for CREATE INDEX, assuming
1109PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as:
1110
1111.. sourcecode:: sql
1112
1113 CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)
1114
1115For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for
1116a connection-less dialect, it will emit:
1117
1118.. sourcecode:: sql
1119
1120 DROP INDEX CONCURRENTLY test_idx1
1121
1122When using CONCURRENTLY, the PostgreSQL database requires that the statement
1123be invoked outside of a transaction block. The Python DBAPI enforces that
1124even for a single statement, a transaction is present, so to use this
1125construct, the DBAPI's "autocommit" mode must be used::
1126
1127 metadata = MetaData()
1128 table = Table("foo", metadata, Column("id", String))
1129 index = Index("foo_idx", table.c.id, postgresql_concurrently=True)
1130
1131 with engine.connect() as conn:
1132 with conn.execution_options(isolation_level="AUTOCOMMIT"):
1133 table.create(conn)
1134
1135.. seealso::
1136
1137 :ref:`postgresql_isolation_level`
1138
1139.. _postgresql_index_reflection:
1140
1141PostgreSQL Index Reflection
1142---------------------------
1143
1144The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the
1145UNIQUE CONSTRAINT construct is used. When inspecting a table using
1146:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes`
1147and the :meth:`_reflection.Inspector.get_unique_constraints`
1148will report on these
1149two constructs distinctly; in the case of the index, the key
1150``duplicates_constraint`` will be present in the index entry if it is
1151detected as mirroring a constraint. When performing reflection using
1152``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned
1153in :attr:`_schema.Table.indexes` when it is detected as mirroring a
1154:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection
1155.
1156
1157Special Reflection Options
1158--------------------------
1159
1160The :class:`_reflection.Inspector`
1161used for the PostgreSQL backend is an instance
1162of :class:`.PGInspector`, which offers additional methods::
1163
1164 from sqlalchemy import create_engine, inspect
1165
1166 engine = create_engine("postgresql+psycopg2://localhost/test")
1167 insp = inspect(engine) # will be a PGInspector
1168
1169 print(insp.get_enums())
1170
1171.. autoclass:: PGInspector
1172 :members:
1173
1174.. _postgresql_table_options:
1175
1176PostgreSQL Table Options
1177------------------------
1178
1179Several options for CREATE TABLE are supported directly by the PostgreSQL
1180dialect in conjunction with the :class:`_schema.Table` construct, listed in
1181the following sections.
1182
1183.. seealso::
1184
1185 `PostgreSQL CREATE TABLE options
1186 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ -
1187 in the PostgreSQL documentation.
1188
1189``INHERITS``
1190^^^^^^^^^^^^
1191
1192Specifies one or more parent tables from which this table inherits columns and
1193constraints, enabling table inheritance hierarchies in PostgreSQL.
1194
1195::
1196
1197 Table("some_table", metadata, ..., postgresql_inherits="some_supertable")
1198
1199 Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...))
1200
1201For schema-qualified parent table names, use :class:`.quoted_name` with
1202``quote=False`` to prevent the dotted name from being quoted as a single
1203identifier::
1204
1205 from sqlalchemy.sql import quoted_name
1206
1207 Table(
1208 "some_table",
1209 metadata,
1210 ...,
1211 postgresql_inherits=quoted_name(
1212 "my_schema.some_supertable", quote=False
1213 ),
1214 )
1215
1216``ON COMMIT``
1217^^^^^^^^^^^^^
1218
1219Controls the behavior of temporary tables at transaction commit, with options
1220to preserve rows, delete rows, or drop the table.
1221
1222::
1223
1224 Table("some_table", metadata, ..., postgresql_on_commit="PRESERVE ROWS")
1225
1226``PARTITION BY``
1227^^^^^^^^^^^^^^^^
1228
1229Declares the table as a partitioned table using the specified partitioning
1230strategy (RANGE, LIST, or HASH) on the given column(s).
1231
1232::
1233
1234 Table(
1235 "some_table",
1236 metadata,
1237 ...,
1238 postgresql_partition_by="LIST (part_column)",
1239 )
1240
1241``TABLESPACE``
1242^^^^^^^^^^^^^^
1243
1244Specifies the tablespace where the table will be stored, allowing control over
1245the physical location of table data on disk.
1246
1247::
1248
1249 Table("some_table", metadata, ..., postgresql_tablespace="some_tablespace")
1250
1251The above option is also available on the :class:`.Index` construct.
1252
1253``USING``
1254^^^^^^^^^
1255
1256Specifies the table access method to use for storing table data, such as
1257``heap`` (the default) or other custom access methods.
1258
1259::
1260
1261 Table("some_table", metadata, ..., postgresql_using="heap")
1262
1263.. versionadded:: 2.0.26
1264
1265``WITH OIDS``
1266^^^^^^^^^^^^^
1267
1268Enables the legacy OID (object identifier) system column for the table, which
1269assigns a unique identifier to each row.
1270
1271::
1272
1273 Table("some_table", metadata, ..., postgresql_with_oids=True)
1274
1275``WITHOUT OIDS``
1276^^^^^^^^^^^^^^^^
1277
1278Explicitly disables the OID system column for the table (the default behavior
1279in modern PostgreSQL versions).
1280
1281::
1282
1283 Table("some_table", metadata, ..., postgresql_with_oids=False)
1284
1285.. _postgresql_constraint_options:
1286
1287PostgreSQL Constraint Options
1288-----------------------------
1289
1290The following sections indicate options which are supported by the PostgreSQL
1291dialect in conjunction with selected constraint constructs.
1292
1293
1294``NOT VALID``
1295^^^^^^^^^^^^^
1296
1297Allows a constraint to be added without validating existing rows, improving
1298performance when adding constraints to large tables. This option applies
1299towards CHECK and FOREIGN KEY constraints when the constraint is being added
1300to an existing table via ALTER TABLE, and has the effect that existing rows
1301are not scanned during the ALTER operation against the constraint being added.
1302
1303When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_
1304that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument
1305may be specified as an additional keyword argument within the operation
1306that creates the constraint, as in the following Alembic example::
1307
1308 def update():
1309 op.create_foreign_key(
1310 "fk_user_address",
1311 "address",
1312 "user",
1313 ["user_id"],
1314 ["id"],
1315 postgresql_not_valid=True,
1316 )
1317
1318The keyword is ultimately accepted directly by the
1319:class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint`
1320and :class:`_schema.ForeignKey` constructs; when using a tool like
1321Alembic, dialect-specific keyword arguments are passed through to
1322these constructs from the migration operation directives::
1323
1324 CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True)
1325
1326 ForeignKeyConstraint(
1327 ["some_id"], ["some_table.some_id"], postgresql_not_valid=True
1328 )
1329
1330.. versionadded:: 1.4.32
1331
1332.. seealso::
1333
1334 `PostgreSQL ALTER TABLE options
1335 <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ -
1336 in the PostgreSQL documentation.
1337
1338.. _postgresql_constraint_options_include:
1339
1340``INCLUDE``
1341^^^^^^^^^^^
1342
1343This keyword is applicable to both a ``UNIQUE`` constraint as well as an
1344``INDEX``. The ``postgresql_include`` option available for
1345:class:`.UniqueConstraint` as well as :class:`.Index` creates a covering index
1346by including additional columns in the underlying index without making them
1347part of the key constraint. This option adds one or more columns as a "payload"
1348to the index created automatically by PostgreSQL for the constraint. For
1349example, the following table definition::
1350
1351 Table(
1352 "mytable",
1353 metadata,
1354 Column("id", Integer, nullable=False),
1355 Column("value", Integer, nullable=False),
1356 UniqueConstraint("id", postgresql_include=["value"]),
1357 )
1358
1359would produce the DDL statement
1360
1361.. sourcecode:: sql
1362
1363 CREATE TABLE mytable (
1364 id INTEGER NOT NULL,
1365 value INTEGER NOT NULL,
1366 UNIQUE (id) INCLUDE (value)
1367 )
1368
1369Note that this feature requires PostgreSQL 11 or later.
1370
1371.. versionadded:: 2.0.41 - added support for ``postgresql_include`` to
1372 :class:`.UniqueConstraint`, to complement the existing feature in
1373 :class:`.Index`.
1374
1375.. seealso::
1376
1377 :ref:`postgresql_covering_indexes` - background on ``postgresql_include``
1378 for the :class:`.Index` construct.
1379
1380
1381Column list with foreign key ``ON DELETE SET`` actions
1382^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1383
1384Allows selective column updates when a foreign key action is triggered, limiting
1385which columns are set to NULL or DEFAULT upon deletion of a referenced row.
1386This applies to :class:`.ForeignKey` and :class:`.ForeignKeyConstraint`, the
1387:paramref:`.ForeignKey.ondelete` parameter will accept on the PostgreSQL
1388backend only a string list of column names inside parenthesis, following the
1389``SET NULL`` or ``SET DEFAULT`` phrases, which will limit the set of columns
1390that are subject to the action::
1391
1392 fktable = Table(
1393 "fktable",
1394 metadata,
1395 Column("tid", Integer),
1396 Column("id", Integer),
1397 Column("fk_id_del_set_null", Integer),
1398 ForeignKeyConstraint(
1399 columns=["tid", "fk_id_del_set_null"],
1400 refcolumns=[pktable.c.tid, pktable.c.id],
1401 ondelete="SET NULL (fk_id_del_set_null)",
1402 ),
1403 )
1404
1405.. versionadded:: 2.0.40
1406
1407
1408``NULLS NOT DISTINCT``
1409^^^^^^^^^^^^^^^^^^^^^^
1410
1411By default, two ``null`` values are not considered equal for unique constraints
1412and indexes. Therefore, seemingly duplicate rows may be stored if one of the
1413values in the constraint is ``null``. This default behavior is implementation
1414defined, so other SQL dialects may behave differently than PostgreSQL.
1415
1416The ``NULLS NOT DISTINCT`` clause can be used to change this behavior, treating
1417null values as equal and preventing unintended duplicate rows. The opposite
1418``NULLS DISTINCT`` clause can also be used to make PostgreSQL's default behavior
1419explict.
1420
1421The ``postgresql_nulls_not_distinct`` parameter can be set to ``True`` to
1422add the ``NULLS NOT DISTINCT`` clause, or ``False`` to add ``NULLS DISTINCT``.
1423Not setting it, or passing ``None``, will not add a clause and keep the default
1424behavior.
1425
1426This feature requires PostgreSQL 15 or later.
1427
1428.. versionadded:: 2.0.16
1429
1430
1431.. _postgresql_table_valued_overview:
1432
1433Table values, Table and Column valued functions, Row and Tuple objects
1434-----------------------------------------------------------------------
1435
1436PostgreSQL makes great use of modern SQL forms such as table-valued functions,
1437tables and rows as values. These constructs are commonly used as part
1438of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other
1439datatypes. SQLAlchemy's SQL expression language has native support for
1440most table-valued and row-valued forms.
1441
1442.. _postgresql_table_valued:
1443
1444Table-Valued Functions
1445^^^^^^^^^^^^^^^^^^^^^^^
1446
1447Many PostgreSQL built-in functions are intended to be used in the FROM clause
1448of a SELECT statement, and are capable of returning table rows or sets of table
1449rows. A large portion of PostgreSQL's JSON functions for example such as
1450``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``,
1451``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such
1452forms. These classes of SQL function calling forms in SQLAlchemy are available
1453using the :meth:`_functions.FunctionElement.table_valued` method in conjunction
1454with :class:`_functions.Function` objects generated from the :data:`_sql.func`
1455namespace.
1456
1457Examples from PostgreSQL's reference documentation follow below:
1458
1459* ``json_each()``:
1460
1461 .. sourcecode:: pycon+sql
1462
1463 >>> from sqlalchemy import select, func
1464 >>> stmt = select(
1465 ... func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value")
1466 ... )
1467 >>> print(stmt)
1468 {printsql}SELECT anon_1.key, anon_1.value
1469 FROM json_each(:json_each_1) AS anon_1
1470
1471* ``json_populate_record()``:
1472
1473 .. sourcecode:: pycon+sql
1474
1475 >>> from sqlalchemy import select, func, literal_column
1476 >>> stmt = select(
1477 ... func.json_populate_record(
1478 ... literal_column("null::myrowtype"), '{"a":1,"b":2}'
1479 ... ).table_valued("a", "b", name="x")
1480 ... )
1481 >>> print(stmt)
1482 {printsql}SELECT x.a, x.b
1483 FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x
1484
1485* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived
1486 columns in the alias, where we may make use of :func:`_sql.column` elements with
1487 types to produce them. The :meth:`_functions.FunctionElement.table_valued`
1488 method produces a :class:`_sql.TableValuedAlias` construct, and the method
1489 :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived
1490 columns specification:
1491
1492 .. sourcecode:: pycon+sql
1493
1494 >>> from sqlalchemy import select, func, column, Integer, Text
1495 >>> stmt = select(
1496 ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}')
1497 ... .table_valued(
1498 ... column("a", Integer),
1499 ... column("b", Text),
1500 ... column("d", Text),
1501 ... )
1502 ... .render_derived(name="x", with_types=True)
1503 ... )
1504 >>> print(stmt)
1505 {printsql}SELECT x.a, x.b, x.d
1506 FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT)
1507
1508* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an
1509 ordinal counter to the output of a function and is accepted by a limited set
1510 of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The
1511 :meth:`_functions.FunctionElement.table_valued` method accepts a keyword
1512 parameter ``with_ordinality`` for this purpose, which accepts the string name
1513 that will be applied to the "ordinality" column:
1514
1515 .. sourcecode:: pycon+sql
1516
1517 >>> from sqlalchemy import select, func
1518 >>> stmt = select(
1519 ... func.generate_series(4, 1, -1)
1520 ... .table_valued("value", with_ordinality="ordinality")
1521 ... .render_derived()
1522 ... )
1523 >>> print(stmt)
1524 {printsql}SELECT anon_1.value, anon_1.ordinality
1525 FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3)
1526 WITH ORDINALITY AS anon_1(value, ordinality)
1527
1528.. versionadded:: 1.4.0b2
1529
1530.. seealso::
1531
1532 :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial`
1533
1534.. _postgresql_column_valued:
1535
1536Column Valued Functions
1537^^^^^^^^^^^^^^^^^^^^^^^
1538
1539Similar to the table valued function, a column valued function is present
1540in the FROM clause, but delivers itself to the columns clause as a single
1541scalar value. PostgreSQL functions such as ``json_array_elements()``,
1542``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the
1543:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`:
1544
1545* ``json_array_elements()``:
1546
1547 .. sourcecode:: pycon+sql
1548
1549 >>> from sqlalchemy import select, func
1550 >>> stmt = select(
1551 ... func.json_array_elements('["one", "two"]').column_valued("x")
1552 ... )
1553 >>> print(stmt)
1554 {printsql}SELECT x
1555 FROM json_array_elements(:json_array_elements_1) AS x
1556
1557* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the
1558 :func:`_postgresql.array` construct may be used:
1559
1560 .. sourcecode:: pycon+sql
1561
1562 >>> from sqlalchemy.dialects.postgresql import array
1563 >>> from sqlalchemy import select, func
1564 >>> stmt = select(func.unnest(array([1, 2])).column_valued())
1565 >>> print(stmt)
1566 {printsql}SELECT anon_1
1567 FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1
1568
1569 The function can of course be used against an existing table-bound column
1570 that's of type :class:`_types.ARRAY`:
1571
1572 .. sourcecode:: pycon+sql
1573
1574 >>> from sqlalchemy import table, column, ARRAY, Integer
1575 >>> from sqlalchemy import select, func
1576 >>> t = table("t", column("value", ARRAY(Integer)))
1577 >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value"))
1578 >>> print(stmt)
1579 {printsql}SELECT unnested_value
1580 FROM unnest(t.value) AS unnested_value
1581
1582.. seealso::
1583
1584 :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial`
1585
1586
1587Row Types
1588^^^^^^^^^
1589
1590Built-in support for rendering a ``ROW`` may be approximated using
1591``func.ROW`` with the :attr:`_sa.func` namespace, or by using the
1592:func:`_sql.tuple_` construct:
1593
1594.. sourcecode:: pycon+sql
1595
1596 >>> from sqlalchemy import table, column, func, tuple_
1597 >>> t = table("t", column("id"), column("fk"))
1598 >>> stmt = (
1599 ... t.select()
1600 ... .where(tuple_(t.c.id, t.c.fk) > (1, 2))
1601 ... .where(func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7))
1602 ... )
1603 >>> print(stmt)
1604 {printsql}SELECT t.id, t.fk
1605 FROM t
1606 WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2)
1607
1608.. seealso::
1609
1610 `PostgreSQL Row Constructors
1611 <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_
1612
1613 `PostgreSQL Row Constructor Comparison
1614 <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_
1615
1616Table Types passed to Functions
1617^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1618
1619PostgreSQL supports passing a table as an argument to a function, which is
1620known as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects
1621such as :class:`_schema.Table` support this special form using the
1622:meth:`_sql.FromClause.table_valued` method, which is comparable to the
1623:meth:`_functions.FunctionElement.table_valued` method except that the collection
1624of columns is already established by that of the :class:`_sql.FromClause`
1625itself:
1626
1627.. sourcecode:: pycon+sql
1628
1629 >>> from sqlalchemy import table, column, func, select
1630 >>> a = table("a", column("id"), column("x"), column("y"))
1631 >>> stmt = select(func.row_to_json(a.table_valued()))
1632 >>> print(stmt)
1633 {printsql}SELECT row_to_json(a) AS row_to_json_1
1634 FROM a
1635
1636.. versionadded:: 1.4.0b2
1637
1638
1639
1640""" # noqa: E501
1641
1642from __future__ import annotations
1643
1644from collections import defaultdict
1645from functools import lru_cache
1646import re
1647from typing import Any
1648from typing import cast
1649from typing import Dict
1650from typing import List
1651from typing import Optional
1652from typing import Tuple
1653from typing import TYPE_CHECKING
1654from typing import Union
1655
1656from . import arraylib as _array
1657from . import json as _json
1658from . import pg_catalog
1659from . import ranges as _ranges
1660from .ext import _regconfig_fn
1661from .ext import aggregate_order_by
1662from .hstore import HSTORE
1663from .named_types import CreateDomainType as CreateDomainType # noqa: F401
1664from .named_types import CreateEnumType as CreateEnumType # noqa: F401
1665from .named_types import DOMAIN as DOMAIN # noqa: F401
1666from .named_types import DropDomainType as DropDomainType # noqa: F401
1667from .named_types import DropEnumType as DropEnumType # noqa: F401
1668from .named_types import ENUM as ENUM # noqa: F401
1669from .named_types import NamedType as NamedType # noqa: F401
1670from .types import _DECIMAL_TYPES # noqa: F401
1671from .types import _FLOAT_TYPES # noqa: F401
1672from .types import _INT_TYPES # noqa: F401
1673from .types import BIT as BIT
1674from .types import BYTEA as BYTEA
1675from .types import CIDR as CIDR
1676from .types import CITEXT as CITEXT
1677from .types import INET as INET
1678from .types import INTERVAL as INTERVAL
1679from .types import MACADDR as MACADDR
1680from .types import MACADDR8 as MACADDR8
1681from .types import MONEY as MONEY
1682from .types import OID as OID
1683from .types import PGBit as PGBit # noqa: F401
1684from .types import PGCidr as PGCidr # noqa: F401
1685from .types import PGInet as PGInet # noqa: F401
1686from .types import PGInterval as PGInterval # noqa: F401
1687from .types import PGMacAddr as PGMacAddr # noqa: F401
1688from .types import PGMacAddr8 as PGMacAddr8 # noqa: F401
1689from .types import PGUuid as PGUuid
1690from .types import REGCLASS as REGCLASS
1691from .types import REGCONFIG as REGCONFIG # noqa: F401
1692from .types import TIME as TIME
1693from .types import TIMESTAMP as TIMESTAMP
1694from .types import TSVECTOR as TSVECTOR
1695from ... import exc
1696from ... import schema
1697from ... import select
1698from ... import sql
1699from ... import util
1700from ...engine import characteristics
1701from ...engine import default
1702from ...engine import interfaces
1703from ...engine import ObjectKind
1704from ...engine import ObjectScope
1705from ...engine import reflection
1706from ...engine import URL
1707from ...engine.reflection import ReflectionDefaults
1708from ...sql import bindparam
1709from ...sql import coercions
1710from ...sql import compiler
1711from ...sql import elements
1712from ...sql import expression
1713from ...sql import functions
1714from ...sql import roles
1715from ...sql import sqltypes
1716from ...sql import util as sql_util
1717from ...sql.compiler import InsertmanyvaluesSentinelOpts
1718from ...sql.visitors import InternalTraversal
1719from ...types import BIGINT
1720from ...types import BOOLEAN
1721from ...types import CHAR
1722from ...types import DATE
1723from ...types import DOUBLE_PRECISION
1724from ...types import FLOAT
1725from ...types import INTEGER
1726from ...types import NUMERIC
1727from ...types import REAL
1728from ...types import SMALLINT
1729from ...types import TEXT
1730from ...types import UUID as UUID
1731from ...types import VARCHAR
1732from ...util.typing import TypedDict
1733
1734IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I)
1735
1736RESERVED_WORDS = {
1737 "all",
1738 "analyse",
1739 "analyze",
1740 "and",
1741 "any",
1742 "array",
1743 "as",
1744 "asc",
1745 "asymmetric",
1746 "both",
1747 "case",
1748 "cast",
1749 "check",
1750 "collate",
1751 "column",
1752 "constraint",
1753 "create",
1754 "current_catalog",
1755 "current_date",
1756 "current_role",
1757 "current_time",
1758 "current_timestamp",
1759 "current_user",
1760 "default",
1761 "deferrable",
1762 "desc",
1763 "distinct",
1764 "do",
1765 "else",
1766 "end",
1767 "except",
1768 "false",
1769 "fetch",
1770 "for",
1771 "foreign",
1772 "from",
1773 "grant",
1774 "group",
1775 "having",
1776 "in",
1777 "initially",
1778 "intersect",
1779 "into",
1780 "leading",
1781 "limit",
1782 "localtime",
1783 "localtimestamp",
1784 "new",
1785 "not",
1786 "null",
1787 "of",
1788 "off",
1789 "offset",
1790 "old",
1791 "on",
1792 "only",
1793 "or",
1794 "order",
1795 "placing",
1796 "primary",
1797 "references",
1798 "returning",
1799 "select",
1800 "session_user",
1801 "some",
1802 "symmetric",
1803 "table",
1804 "then",
1805 "to",
1806 "trailing",
1807 "true",
1808 "union",
1809 "unique",
1810 "user",
1811 "using",
1812 "variadic",
1813 "when",
1814 "where",
1815 "window",
1816 "with",
1817 "authorization",
1818 "between",
1819 "binary",
1820 "cross",
1821 "current_schema",
1822 "freeze",
1823 "full",
1824 "ilike",
1825 "inner",
1826 "is",
1827 "isnull",
1828 "join",
1829 "left",
1830 "like",
1831 "natural",
1832 "notnull",
1833 "outer",
1834 "over",
1835 "overlaps",
1836 "right",
1837 "similar",
1838 "verbose",
1839}
1840
1841
1842colspecs = {
1843 sqltypes.ARRAY: _array.ARRAY,
1844 sqltypes.Interval: INTERVAL,
1845 sqltypes.Enum: ENUM,
1846 sqltypes.JSON.JSONPathType: _json.JSONPATH,
1847 sqltypes.JSON: _json.JSON,
1848 sqltypes.Uuid: PGUuid,
1849}
1850
1851
1852ischema_names = {
1853 "_array": _array.ARRAY,
1854 "hstore": HSTORE,
1855 "json": _json.JSON,
1856 "jsonb": _json.JSONB,
1857 "int4range": _ranges.INT4RANGE,
1858 "int8range": _ranges.INT8RANGE,
1859 "numrange": _ranges.NUMRANGE,
1860 "daterange": _ranges.DATERANGE,
1861 "tsrange": _ranges.TSRANGE,
1862 "tstzrange": _ranges.TSTZRANGE,
1863 "int4multirange": _ranges.INT4MULTIRANGE,
1864 "int8multirange": _ranges.INT8MULTIRANGE,
1865 "nummultirange": _ranges.NUMMULTIRANGE,
1866 "datemultirange": _ranges.DATEMULTIRANGE,
1867 "tsmultirange": _ranges.TSMULTIRANGE,
1868 "tstzmultirange": _ranges.TSTZMULTIRANGE,
1869 "integer": INTEGER,
1870 "bigint": BIGINT,
1871 "smallint": SMALLINT,
1872 "character varying": VARCHAR,
1873 "character": CHAR,
1874 '"char"': sqltypes.String,
1875 "name": sqltypes.String,
1876 "text": TEXT,
1877 "numeric": NUMERIC,
1878 "float": FLOAT,
1879 "real": REAL,
1880 "inet": INET,
1881 "cidr": CIDR,
1882 "citext": CITEXT,
1883 "uuid": UUID,
1884 "bit": BIT,
1885 "bit varying": BIT,
1886 "macaddr": MACADDR,
1887 "macaddr8": MACADDR8,
1888 "money": MONEY,
1889 "oid": OID,
1890 "regclass": REGCLASS,
1891 "double precision": DOUBLE_PRECISION,
1892 "timestamp": TIMESTAMP,
1893 "timestamp with time zone": TIMESTAMP,
1894 "timestamp without time zone": TIMESTAMP,
1895 "time with time zone": TIME,
1896 "time without time zone": TIME,
1897 "date": DATE,
1898 "time": TIME,
1899 "bytea": BYTEA,
1900 "boolean": BOOLEAN,
1901 "interval": INTERVAL,
1902 "tsvector": TSVECTOR,
1903}
1904
1905
1906class PGCompiler(compiler.SQLCompiler):
1907 def visit_to_tsvector_func(self, element, **kw):
1908 return self._assert_pg_ts_ext(element, **kw)
1909
1910 def visit_to_tsquery_func(self, element, **kw):
1911 return self._assert_pg_ts_ext(element, **kw)
1912
1913 def visit_plainto_tsquery_func(self, element, **kw):
1914 return self._assert_pg_ts_ext(element, **kw)
1915
1916 def visit_phraseto_tsquery_func(self, element, **kw):
1917 return self._assert_pg_ts_ext(element, **kw)
1918
1919 def visit_websearch_to_tsquery_func(self, element, **kw):
1920 return self._assert_pg_ts_ext(element, **kw)
1921
1922 def visit_ts_headline_func(self, element, **kw):
1923 return self._assert_pg_ts_ext(element, **kw)
1924
1925 def _assert_pg_ts_ext(self, element, **kw):
1926 if not isinstance(element, _regconfig_fn):
1927 # other options here include trying to rewrite the function
1928 # with the correct types. however, that means we have to
1929 # "un-SQL-ize" the first argument, which can't work in a
1930 # generalized way. Also, parent compiler class has already added
1931 # the incorrect return type to the result map. So let's just
1932 # make sure the function we want is used up front.
1933
1934 raise exc.CompileError(
1935 f'Can\'t compile "{element.name}()" full text search '
1936 f"function construct that does not originate from the "
1937 f'"sqlalchemy.dialects.postgresql" package. '
1938 f'Please ensure "import sqlalchemy.dialects.postgresql" is '
1939 f"called before constructing "
1940 f'"sqlalchemy.func.{element.name}()" to ensure registration '
1941 f"of the correct argument and return types."
1942 )
1943
1944 return f"{element.name}{self.function_argspec(element, **kw)}"
1945
1946 def render_bind_cast(self, type_, dbapi_type, sqltext):
1947 if dbapi_type._type_affinity is sqltypes.String and dbapi_type.length:
1948 # use VARCHAR with no length for VARCHAR cast.
1949 # see #9511
1950 dbapi_type = sqltypes.STRINGTYPE
1951 return f"""{sqltext}::{
1952 self.dialect.type_compiler_instance.process(
1953 dbapi_type, identifier_preparer=self.preparer
1954 )
1955 }"""
1956
1957 def visit_array(self, element, **kw):
1958 if not element.clauses and not element.type.item_type._isnull:
1959 return "ARRAY[]::%s" % element.type.compile(self.dialect)
1960 return "ARRAY[%s]" % self.visit_clauselist(element, **kw)
1961
1962 def visit_slice(self, element, **kw):
1963 return "%s:%s" % (
1964 self.process(element.start, **kw),
1965 self.process(element.stop, **kw),
1966 )
1967
1968 def visit_bitwise_xor_op_binary(self, binary, operator, **kw):
1969 return self._generate_generic_binary(binary, " # ", **kw)
1970
1971 def visit_json_getitem_op_binary(
1972 self, binary, operator, _cast_applied=False, **kw
1973 ):
1974 if (
1975 not _cast_applied
1976 and binary.type._type_affinity is not sqltypes.JSON
1977 ):
1978 kw["_cast_applied"] = True
1979 return self.process(sql.cast(binary, binary.type), **kw)
1980
1981 kw["eager_grouping"] = True
1982
1983 if (
1984 not _cast_applied
1985 and isinstance(binary.left.type, _json.JSONB)
1986 and self.dialect._supports_jsonb_subscripting
1987 ):
1988 left = binary.left
1989 if isinstance(left, (functions.FunctionElement, elements.Cast)):
1990 left = elements.Grouping(left)
1991
1992 # for pg14+JSONB use subscript notation: col['key'] instead
1993 # of col -> 'key'
1994 return "%s[%s]" % (
1995 self.process(left, **kw),
1996 self.process(binary.right, **kw),
1997 )
1998 else:
1999 # Fall back to arrow notation for older versions or when cast
2000 # is applied
2001 return self._generate_generic_binary(
2002 binary, " -> " if not _cast_applied else " ->> ", **kw
2003 )
2004
2005 def visit_json_path_getitem_op_binary(
2006 self, binary, operator, _cast_applied=False, **kw
2007 ):
2008 if (
2009 not _cast_applied
2010 and binary.type._type_affinity is not sqltypes.JSON
2011 ):
2012 kw["_cast_applied"] = True
2013 return self.process(sql.cast(binary, binary.type), **kw)
2014
2015 kw["eager_grouping"] = True
2016 return self._generate_generic_binary(
2017 binary, " #> " if not _cast_applied else " #>> ", **kw
2018 )
2019
2020 def visit_getitem_binary(self, binary, operator, **kw):
2021 return "%s[%s]" % (
2022 self.process(binary.left, **kw),
2023 self.process(binary.right, **kw),
2024 )
2025
2026 def visit_aggregate_order_by(self, element, **kw):
2027 return "%s ORDER BY %s" % (
2028 self.process(element.target, **kw),
2029 self.process(element.order_by, **kw),
2030 )
2031
2032 def visit_match_op_binary(self, binary, operator, **kw):
2033 if "postgresql_regconfig" in binary.modifiers:
2034 regconfig = self.render_literal_value(
2035 binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE
2036 )
2037 if regconfig:
2038 return "%s @@ plainto_tsquery(%s, %s)" % (
2039 self.process(binary.left, **kw),
2040 regconfig,
2041 self.process(binary.right, **kw),
2042 )
2043 return "%s @@ plainto_tsquery(%s)" % (
2044 self.process(binary.left, **kw),
2045 self.process(binary.right, **kw),
2046 )
2047
2048 def visit_ilike_case_insensitive_operand(self, element, **kw):
2049 return element.element._compiler_dispatch(self, **kw)
2050
2051 def visit_ilike_op_binary(self, binary, operator, **kw):
2052 escape = binary.modifiers.get("escape", None)
2053
2054 return "%s ILIKE %s" % (
2055 self.process(binary.left, **kw),
2056 self.process(binary.right, **kw),
2057 ) + (
2058 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
2059 if escape is not None
2060 else ""
2061 )
2062
2063 def visit_not_ilike_op_binary(self, binary, operator, **kw):
2064 escape = binary.modifiers.get("escape", None)
2065 return "%s NOT ILIKE %s" % (
2066 self.process(binary.left, **kw),
2067 self.process(binary.right, **kw),
2068 ) + (
2069 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
2070 if escape is not None
2071 else ""
2072 )
2073
2074 def _regexp_match(self, base_op, binary, operator, kw):
2075 flags = binary.modifiers["flags"]
2076 if flags is None:
2077 return self._generate_generic_binary(
2078 binary, " %s " % base_op, **kw
2079 )
2080 if flags == "i":
2081 return self._generate_generic_binary(
2082 binary, " %s* " % base_op, **kw
2083 )
2084 return "%s %s CONCAT('(?', %s, ')', %s)" % (
2085 self.process(binary.left, **kw),
2086 base_op,
2087 self.render_literal_value(flags, sqltypes.STRINGTYPE),
2088 self.process(binary.right, **kw),
2089 )
2090
2091 def visit_regexp_match_op_binary(self, binary, operator, **kw):
2092 return self._regexp_match("~", binary, operator, kw)
2093
2094 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
2095 return self._regexp_match("!~", binary, operator, kw)
2096
2097 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
2098 string = self.process(binary.left, **kw)
2099 pattern_replace = self.process(binary.right, **kw)
2100 flags = binary.modifiers["flags"]
2101 if flags is None:
2102 return "REGEXP_REPLACE(%s, %s)" % (
2103 string,
2104 pattern_replace,
2105 )
2106 else:
2107 return "REGEXP_REPLACE(%s, %s, %s)" % (
2108 string,
2109 pattern_replace,
2110 self.render_literal_value(flags, sqltypes.STRINGTYPE),
2111 )
2112
2113 def visit_empty_set_expr(self, element_types, **kw):
2114 # cast the empty set to the type we are comparing against. if
2115 # we are comparing against the null type, pick an arbitrary
2116 # datatype for the empty set
2117 return "SELECT %s WHERE 1!=1" % (
2118 ", ".join(
2119 "CAST(NULL AS %s)"
2120 % self.dialect.type_compiler_instance.process(
2121 INTEGER() if type_._isnull else type_
2122 )
2123 for type_ in element_types or [INTEGER()]
2124 ),
2125 )
2126
2127 def render_literal_value(self, value, type_):
2128 value = super().render_literal_value(value, type_)
2129
2130 if self.dialect._backslash_escapes:
2131 value = value.replace("\\", "\\\\")
2132 return value
2133
2134 def visit_aggregate_strings_func(self, fn, **kw):
2135 return "string_agg%s" % self.function_argspec(fn)
2136
2137 def visit_sequence(self, seq, **kw):
2138 return "nextval('%s')" % self.preparer.format_sequence(seq)
2139
2140 def limit_clause(self, select, **kw):
2141 text = ""
2142 if select._limit_clause is not None:
2143 text += " \n LIMIT " + self.process(select._limit_clause, **kw)
2144 if select._offset_clause is not None:
2145 if select._limit_clause is None:
2146 text += "\n LIMIT ALL"
2147 text += " OFFSET " + self.process(select._offset_clause, **kw)
2148 return text
2149
2150 def format_from_hint_text(self, sqltext, table, hint, iscrud):
2151 if hint.upper() != "ONLY":
2152 raise exc.CompileError("Unrecognized hint: %r" % hint)
2153 return "ONLY " + sqltext
2154
2155 def get_select_precolumns(self, select, **kw):
2156 # Do not call super().get_select_precolumns because
2157 # it will warn/raise when distinct on is present
2158 if select._distinct or select._distinct_on:
2159 if select._distinct_on:
2160 return (
2161 "DISTINCT ON ("
2162 + ", ".join(
2163 [
2164 self.process(col, **kw)
2165 for col in select._distinct_on
2166 ]
2167 )
2168 + ") "
2169 )
2170 else:
2171 return "DISTINCT "
2172 else:
2173 return ""
2174
2175 def for_update_clause(self, select, **kw):
2176 if select._for_update_arg.read:
2177 if select._for_update_arg.key_share:
2178 tmp = " FOR KEY SHARE"
2179 else:
2180 tmp = " FOR SHARE"
2181 elif select._for_update_arg.key_share:
2182 tmp = " FOR NO KEY UPDATE"
2183 else:
2184 tmp = " FOR UPDATE"
2185
2186 if select._for_update_arg.of:
2187 tables = util.OrderedSet()
2188 for c in select._for_update_arg.of:
2189 tables.update(sql_util.surface_selectables_only(c))
2190
2191 of_kw = dict(kw)
2192 of_kw.update(ashint=True, use_schema=False)
2193 tmp += " OF " + ", ".join(
2194 self.process(table, **of_kw) for table in tables
2195 )
2196
2197 if select._for_update_arg.nowait:
2198 tmp += " NOWAIT"
2199 if select._for_update_arg.skip_locked:
2200 tmp += " SKIP LOCKED"
2201
2202 return tmp
2203
2204 def visit_substring_func(self, func, **kw):
2205 s = self.process(func.clauses.clauses[0], **kw)
2206 start = self.process(func.clauses.clauses[1], **kw)
2207 if len(func.clauses.clauses) > 2:
2208 length = self.process(func.clauses.clauses[2], **kw)
2209 return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length)
2210 else:
2211 return "SUBSTRING(%s FROM %s)" % (s, start)
2212
2213 def _on_conflict_target(self, clause, **kw):
2214 if clause.constraint_target is not None:
2215 # target may be a name of an Index, UniqueConstraint or
2216 # ExcludeConstraint. While there is a separate
2217 # "max_identifier_length" for indexes, PostgreSQL uses the same
2218 # length for all objects so we can use
2219 # truncate_and_render_constraint_name
2220 target_text = (
2221 "ON CONSTRAINT %s"
2222 % self.preparer.truncate_and_render_constraint_name(
2223 clause.constraint_target
2224 )
2225 )
2226 elif clause.inferred_target_elements is not None:
2227 target_text = "(%s)" % ", ".join(
2228 (
2229 self.preparer.quote(c)
2230 if isinstance(c, str)
2231 else self.process(c, include_table=False, use_schema=False)
2232 )
2233 for c in clause.inferred_target_elements
2234 )
2235 if clause.inferred_target_whereclause is not None:
2236 whereclause_kw = dict(kw)
2237 whereclause_kw.update(include_table=False, use_schema=False)
2238 target_text += " WHERE %s" % self.process(
2239 clause.inferred_target_whereclause,
2240 **whereclause_kw,
2241 )
2242 else:
2243 target_text = ""
2244
2245 return target_text
2246
2247 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
2248 target_text = self._on_conflict_target(on_conflict, **kw)
2249
2250 if target_text:
2251 return "ON CONFLICT %s DO NOTHING" % target_text
2252 else:
2253 return "ON CONFLICT DO NOTHING"
2254
2255 def visit_on_conflict_do_update(self, on_conflict, **kw):
2256 clause = on_conflict
2257
2258 target_text = self._on_conflict_target(on_conflict, **kw)
2259
2260 action_set_ops = []
2261
2262 set_parameters = dict(clause.update_values_to_set)
2263 # create a list of column assignment clauses as tuples
2264
2265 insert_statement = self.stack[-1]["selectable"]
2266 cols = insert_statement.table.c
2267 set_kw = dict(kw)
2268 set_kw.update(use_schema=False)
2269 for c in cols:
2270 col_key = c.key
2271
2272 if col_key in set_parameters:
2273 value = set_parameters.pop(col_key)
2274 elif c in set_parameters:
2275 value = set_parameters.pop(c)
2276 else:
2277 continue
2278
2279 # TODO: this coercion should be up front. we can't cache
2280 # SQL constructs with non-bound literals buried in them
2281 if coercions._is_literal(value):
2282 value = elements.BindParameter(None, value, type_=c.type)
2283
2284 else:
2285 if (
2286 isinstance(value, elements.BindParameter)
2287 and value.type._isnull
2288 ):
2289 value = value._clone()
2290 value.type = c.type
2291 value_text = self.process(
2292 value.self_group(), is_upsert_set=True, **set_kw
2293 )
2294
2295 key_text = self.preparer.quote(c.name)
2296 action_set_ops.append("%s = %s" % (key_text, value_text))
2297
2298 # check for names that don't match columns
2299 if set_parameters:
2300 util.warn(
2301 "Additional column names not matching "
2302 "any column keys in table '%s': %s"
2303 % (
2304 self.current_executable.table.name,
2305 (", ".join("'%s'" % c for c in set_parameters)),
2306 )
2307 )
2308 for k, v in set_parameters.items():
2309 key_text = (
2310 self.preparer.quote(k)
2311 if isinstance(k, str)
2312 else self.process(k, use_schema=False)
2313 )
2314 value_text = self.process(
2315 coercions.expect(roles.ExpressionElementRole, v),
2316 is_upsert_set=True,
2317 **set_kw,
2318 )
2319 action_set_ops.append("%s = %s" % (key_text, value_text))
2320
2321 action_text = ", ".join(action_set_ops)
2322 if clause.update_whereclause is not None:
2323 where_kw = dict(kw)
2324 where_kw.update(include_table=True, use_schema=False)
2325 action_text += " WHERE %s" % self.process(
2326 clause.update_whereclause, **where_kw
2327 )
2328
2329 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
2330
2331 def update_from_clause(
2332 self, update_stmt, from_table, extra_froms, from_hints, **kw
2333 ):
2334 kw["asfrom"] = True
2335 return "FROM " + ", ".join(
2336 t._compiler_dispatch(self, fromhints=from_hints, **kw)
2337 for t in extra_froms
2338 )
2339
2340 def delete_extra_from_clause(
2341 self, delete_stmt, from_table, extra_froms, from_hints, **kw
2342 ):
2343 """Render the DELETE .. USING clause specific to PostgreSQL."""
2344 kw["asfrom"] = True
2345 return "USING " + ", ".join(
2346 t._compiler_dispatch(self, fromhints=from_hints, **kw)
2347 for t in extra_froms
2348 )
2349
2350 def fetch_clause(self, select, **kw):
2351 # pg requires parens for non literal clauses. It's also required for
2352 # bind parameters if a ::type casts is used by the driver (asyncpg),
2353 # so it's easiest to just always add it
2354 text = ""
2355 if select._offset_clause is not None:
2356 text += "\n OFFSET (%s) ROWS" % self.process(
2357 select._offset_clause, **kw
2358 )
2359 if select._fetch_clause is not None:
2360 text += "\n FETCH FIRST (%s)%s ROWS %s" % (
2361 self.process(select._fetch_clause, **kw),
2362 " PERCENT" if select._fetch_clause_options["percent"] else "",
2363 (
2364 "WITH TIES"
2365 if select._fetch_clause_options["with_ties"]
2366 else "ONLY"
2367 ),
2368 )
2369 return text
2370
2371
2372class PGDDLCompiler(compiler.DDLCompiler):
2373 def get_column_specification(self, column, **kwargs):
2374 colspec = self.preparer.format_column(column)
2375 impl_type = column.type.dialect_impl(self.dialect)
2376 if isinstance(impl_type, sqltypes.TypeDecorator):
2377 impl_type = impl_type.impl
2378
2379 has_identity = (
2380 column.identity is not None
2381 and self.dialect.supports_identity_columns
2382 )
2383
2384 if (
2385 column.primary_key
2386 and column is column.table._autoincrement_column
2387 and (
2388 self.dialect.supports_smallserial
2389 or not isinstance(impl_type, sqltypes.SmallInteger)
2390 )
2391 and not has_identity
2392 and (
2393 column.default is None
2394 or (
2395 isinstance(column.default, schema.Sequence)
2396 and column.default.optional
2397 )
2398 )
2399 ):
2400 if isinstance(impl_type, sqltypes.BigInteger):
2401 colspec += " BIGSERIAL"
2402 elif isinstance(impl_type, sqltypes.SmallInteger):
2403 colspec += " SMALLSERIAL"
2404 else:
2405 colspec += " SERIAL"
2406 else:
2407 colspec += " " + self.dialect.type_compiler_instance.process(
2408 column.type,
2409 type_expression=column,
2410 identifier_preparer=self.preparer,
2411 )
2412 default = self.get_column_default_string(column)
2413 if default is not None:
2414 colspec += " DEFAULT " + default
2415
2416 if column.computed is not None:
2417 colspec += " " + self.process(column.computed)
2418 if has_identity:
2419 colspec += " " + self.process(column.identity)
2420
2421 if not column.nullable and not has_identity:
2422 colspec += " NOT NULL"
2423 elif column.nullable and has_identity:
2424 colspec += " NULL"
2425 return colspec
2426
2427 def _define_constraint_validity(self, constraint):
2428 not_valid = constraint.dialect_options["postgresql"]["not_valid"]
2429 return " NOT VALID" if not_valid else ""
2430
2431 def _define_include(self, obj):
2432 includeclause = obj.dialect_options["postgresql"]["include"]
2433 if not includeclause:
2434 return ""
2435 inclusions = [
2436 obj.table.c[col] if isinstance(col, str) else col
2437 for col in includeclause
2438 ]
2439 return " INCLUDE (%s)" % ", ".join(
2440 [self.preparer.quote(c.name) for c in inclusions]
2441 )
2442
2443 def visit_check_constraint(self, constraint, **kw):
2444 if constraint._type_bound:
2445 typ = list(constraint.columns)[0].type
2446 if (
2447 isinstance(typ, sqltypes.ARRAY)
2448 and isinstance(typ.item_type, sqltypes.Enum)
2449 and not typ.item_type.native_enum
2450 ):
2451 raise exc.CompileError(
2452 "PostgreSQL dialect cannot produce the CHECK constraint "
2453 "for ARRAY of non-native ENUM; please specify "
2454 "create_constraint=False on this Enum datatype."
2455 )
2456
2457 text = super().visit_check_constraint(constraint)
2458 text += self._define_constraint_validity(constraint)
2459 return text
2460
2461 def visit_foreign_key_constraint(self, constraint, **kw):
2462 text = super().visit_foreign_key_constraint(constraint)
2463 text += self._define_constraint_validity(constraint)
2464 return text
2465
2466 def visit_primary_key_constraint(self, constraint, **kw):
2467 text = self.define_constraint_preamble(constraint, **kw)
2468 text += self.define_primary_key_body(constraint, **kw)
2469 text += self._define_include(constraint)
2470 text += self.define_constraint_deferrability(constraint)
2471 return text
2472
2473 def visit_unique_constraint(self, constraint, **kw):
2474 if len(constraint) == 0:
2475 return ""
2476 text = self.define_constraint_preamble(constraint, **kw)
2477 text += self.define_unique_body(constraint, **kw)
2478 text += self._define_include(constraint)
2479 text += self.define_constraint_deferrability(constraint)
2480 return text
2481
2482 @util.memoized_property
2483 def _fk_ondelete_pattern(self):
2484 return re.compile(
2485 r"^(?:RESTRICT|CASCADE|SET (?:NULL|DEFAULT)(?:\s*\(.+\))?"
2486 r"|NO ACTION)$",
2487 re.I,
2488 )
2489
2490 def define_constraint_ondelete_cascade(self, constraint):
2491 return " ON DELETE %s" % self.preparer.validate_sql_phrase(
2492 constraint.ondelete, self._fk_ondelete_pattern
2493 )
2494
2495 def visit_create_enum_type(self, create, **kw):
2496 type_ = create.element
2497
2498 return "CREATE TYPE %s AS ENUM (%s)" % (
2499 self.preparer.format_type(type_),
2500 ", ".join(
2501 self.sql_compiler.process(sql.literal(e), literal_binds=True)
2502 for e in type_.enums
2503 ),
2504 )
2505
2506 def visit_drop_enum_type(self, drop, **kw):
2507 type_ = drop.element
2508
2509 return "DROP TYPE %s" % (self.preparer.format_type(type_))
2510
2511 def visit_create_domain_type(self, create, **kw):
2512 domain: DOMAIN = create.element
2513
2514 options = []
2515 if domain.collation is not None:
2516 options.append(f"COLLATE {self.preparer.quote(domain.collation)}")
2517 if domain.default is not None:
2518 default = self.render_default_string(domain.default)
2519 options.append(f"DEFAULT {default}")
2520 if domain.constraint_name is not None:
2521 name = self.preparer.truncate_and_render_constraint_name(
2522 domain.constraint_name
2523 )
2524 options.append(f"CONSTRAINT {name}")
2525 if domain.not_null:
2526 options.append("NOT NULL")
2527 if domain.check is not None:
2528 check = self.sql_compiler.process(
2529 domain.check, include_table=False, literal_binds=True
2530 )
2531 options.append(f"CHECK ({check})")
2532
2533 return (
2534 f"CREATE DOMAIN {self.preparer.format_type(domain)} AS "
2535 f"{self.type_compiler.process(domain.data_type)} "
2536 f"{' '.join(options)}"
2537 )
2538
2539 def visit_drop_domain_type(self, drop, **kw):
2540 domain = drop.element
2541 return f"DROP DOMAIN {self.preparer.format_type(domain)}"
2542
2543 def visit_create_index(self, create, **kw):
2544 preparer = self.preparer
2545 index = create.element
2546 self._verify_index_table(index)
2547 text = "CREATE "
2548 if index.unique:
2549 text += "UNIQUE "
2550
2551 text += "INDEX "
2552
2553 if self.dialect._supports_create_index_concurrently:
2554 concurrently = index.dialect_options["postgresql"]["concurrently"]
2555 if concurrently:
2556 text += "CONCURRENTLY "
2557
2558 if create.if_not_exists:
2559 text += "IF NOT EXISTS "
2560
2561 text += "%s ON %s " % (
2562 self._prepared_index_name(index, include_schema=False),
2563 preparer.format_table(index.table),
2564 )
2565
2566 using = index.dialect_options["postgresql"]["using"]
2567 if using:
2568 text += (
2569 "USING %s "
2570 % self.preparer.validate_sql_phrase(using, IDX_USING).lower()
2571 )
2572
2573 ops = index.dialect_options["postgresql"]["ops"]
2574 text += "(%s)" % (
2575 ", ".join(
2576 [
2577 self.sql_compiler.process(
2578 (
2579 expr.self_group()
2580 if not isinstance(expr, expression.ColumnClause)
2581 else expr
2582 ),
2583 include_table=False,
2584 literal_binds=True,
2585 )
2586 + (
2587 (" " + ops[expr.key])
2588 if hasattr(expr, "key") and expr.key in ops
2589 else ""
2590 )
2591 for expr in index.expressions
2592 ]
2593 )
2594 )
2595
2596 text += self._define_include(index)
2597
2598 nulls_not_distinct = index.dialect_options["postgresql"][
2599 "nulls_not_distinct"
2600 ]
2601 if nulls_not_distinct is True:
2602 text += " NULLS NOT DISTINCT"
2603 elif nulls_not_distinct is False:
2604 text += " NULLS DISTINCT"
2605
2606 withclause = index.dialect_options["postgresql"]["with"]
2607 if withclause:
2608 text += " WITH (%s)" % (
2609 ", ".join(
2610 [
2611 "%s = %s" % storage_parameter
2612 for storage_parameter in withclause.items()
2613 ]
2614 )
2615 )
2616
2617 tablespace_name = index.dialect_options["postgresql"]["tablespace"]
2618 if tablespace_name:
2619 text += " TABLESPACE %s" % preparer.quote(tablespace_name)
2620
2621 whereclause = index.dialect_options["postgresql"]["where"]
2622 if whereclause is not None:
2623 whereclause = coercions.expect(
2624 roles.DDLExpressionRole, whereclause
2625 )
2626
2627 where_compiled = self.sql_compiler.process(
2628 whereclause, include_table=False, literal_binds=True
2629 )
2630 text += " WHERE " + where_compiled
2631
2632 return text
2633
2634 def define_unique_constraint_distinct(self, constraint, **kw):
2635 nulls_not_distinct = constraint.dialect_options["postgresql"][
2636 "nulls_not_distinct"
2637 ]
2638 if nulls_not_distinct is True:
2639 nulls_not_distinct_param = "NULLS NOT DISTINCT "
2640 elif nulls_not_distinct is False:
2641 nulls_not_distinct_param = "NULLS DISTINCT "
2642 else:
2643 nulls_not_distinct_param = ""
2644 return nulls_not_distinct_param
2645
2646 def visit_drop_index(self, drop, **kw):
2647 index = drop.element
2648
2649 text = "\nDROP INDEX "
2650
2651 if self.dialect._supports_drop_index_concurrently:
2652 concurrently = index.dialect_options["postgresql"]["concurrently"]
2653 if concurrently:
2654 text += "CONCURRENTLY "
2655
2656 if drop.if_exists:
2657 text += "IF EXISTS "
2658
2659 text += self._prepared_index_name(index, include_schema=True)
2660 return text
2661
2662 def visit_exclude_constraint(self, constraint, **kw):
2663 text = ""
2664 if constraint.name is not None:
2665 text += "CONSTRAINT %s " % self.preparer.format_constraint(
2666 constraint
2667 )
2668 elements = []
2669 kw["include_table"] = False
2670 kw["literal_binds"] = True
2671 for expr, name, op in constraint._render_exprs:
2672 exclude_element = self.sql_compiler.process(expr, **kw) + (
2673 (" " + constraint.ops[expr.key])
2674 if hasattr(expr, "key") and expr.key in constraint.ops
2675 else ""
2676 )
2677
2678 elements.append("%s WITH %s" % (exclude_element, op))
2679 text += "EXCLUDE USING %s (%s)" % (
2680 self.preparer.validate_sql_phrase(
2681 constraint.using, IDX_USING
2682 ).lower(),
2683 ", ".join(elements),
2684 )
2685 if constraint.where is not None:
2686 text += " WHERE (%s)" % self.sql_compiler.process(
2687 constraint.where, literal_binds=True
2688 )
2689 text += self.define_constraint_deferrability(constraint)
2690 return text
2691
2692 def post_create_table(self, table):
2693 table_opts = []
2694 pg_opts = table.dialect_options["postgresql"]
2695
2696 inherits = pg_opts.get("inherits")
2697 if inherits is not None:
2698 if not isinstance(inherits, (list, tuple)):
2699 inherits = (inherits,)
2700 table_opts.append(
2701 "\n INHERITS ( "
2702 + ", ".join(self.preparer.quote(name) for name in inherits)
2703 + " )"
2704 )
2705
2706 if pg_opts["partition_by"]:
2707 table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"])
2708
2709 if pg_opts["using"]:
2710 table_opts.append("\n USING %s" % pg_opts["using"])
2711
2712 if pg_opts["with_oids"] is True:
2713 table_opts.append("\n WITH OIDS")
2714 elif pg_opts["with_oids"] is False:
2715 table_opts.append("\n WITHOUT OIDS")
2716
2717 if pg_opts["on_commit"]:
2718 on_commit_options = pg_opts["on_commit"].replace("_", " ").upper()
2719 table_opts.append("\n ON COMMIT %s" % on_commit_options)
2720
2721 if pg_opts["tablespace"]:
2722 tablespace_name = pg_opts["tablespace"]
2723 table_opts.append(
2724 "\n TABLESPACE %s" % self.preparer.quote(tablespace_name)
2725 )
2726
2727 return "".join(table_opts)
2728
2729 def visit_computed_column(self, generated, **kw):
2730 if generated.persisted is False:
2731 raise exc.CompileError(
2732 "PostrgreSQL computed columns do not support 'virtual' "
2733 "persistence; set the 'persisted' flag to None or True for "
2734 "PostgreSQL support."
2735 )
2736
2737 return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process(
2738 generated.sqltext, include_table=False, literal_binds=True
2739 )
2740
2741 def visit_create_sequence(self, create, **kw):
2742 prefix = None
2743 if create.element.data_type is not None:
2744 prefix = " AS %s" % self.type_compiler.process(
2745 create.element.data_type
2746 )
2747
2748 return super().visit_create_sequence(create, prefix=prefix, **kw)
2749
2750 def _can_comment_on_constraint(self, ddl_instance):
2751 constraint = ddl_instance.element
2752 if constraint.name is None:
2753 raise exc.CompileError(
2754 f"Can't emit COMMENT ON for constraint {constraint!r}: "
2755 "it has no name"
2756 )
2757 if constraint.table is None:
2758 raise exc.CompileError(
2759 f"Can't emit COMMENT ON for constraint {constraint!r}: "
2760 "it has no associated table"
2761 )
2762
2763 def visit_set_constraint_comment(self, create, **kw):
2764 self._can_comment_on_constraint(create)
2765 return "COMMENT ON CONSTRAINT %s ON %s IS %s" % (
2766 self.preparer.format_constraint(create.element),
2767 self.preparer.format_table(create.element.table),
2768 self.sql_compiler.render_literal_value(
2769 create.element.comment, sqltypes.String()
2770 ),
2771 )
2772
2773 def visit_drop_constraint_comment(self, drop, **kw):
2774 self._can_comment_on_constraint(drop)
2775 return "COMMENT ON CONSTRAINT %s ON %s IS NULL" % (
2776 self.preparer.format_constraint(drop.element),
2777 self.preparer.format_table(drop.element.table),
2778 )
2779
2780
2781class PGTypeCompiler(compiler.GenericTypeCompiler):
2782 def visit_TSVECTOR(self, type_, **kw):
2783 return "TSVECTOR"
2784
2785 def visit_TSQUERY(self, type_, **kw):
2786 return "TSQUERY"
2787
2788 def visit_INET(self, type_, **kw):
2789 return "INET"
2790
2791 def visit_CIDR(self, type_, **kw):
2792 return "CIDR"
2793
2794 def visit_CITEXT(self, type_, **kw):
2795 return "CITEXT"
2796
2797 def visit_MACADDR(self, type_, **kw):
2798 return "MACADDR"
2799
2800 def visit_MACADDR8(self, type_, **kw):
2801 return "MACADDR8"
2802
2803 def visit_MONEY(self, type_, **kw):
2804 return "MONEY"
2805
2806 def visit_OID(self, type_, **kw):
2807 return "OID"
2808
2809 def visit_REGCONFIG(self, type_, **kw):
2810 return "REGCONFIG"
2811
2812 def visit_REGCLASS(self, type_, **kw):
2813 return "REGCLASS"
2814
2815 def visit_FLOAT(self, type_, **kw):
2816 if not type_.precision:
2817 return "FLOAT"
2818 else:
2819 return "FLOAT(%(precision)s)" % {"precision": type_.precision}
2820
2821 def visit_double(self, type_, **kw):
2822 return self.visit_DOUBLE_PRECISION(type, **kw)
2823
2824 def visit_BIGINT(self, type_, **kw):
2825 return "BIGINT"
2826
2827 def visit_HSTORE(self, type_, **kw):
2828 return "HSTORE"
2829
2830 def visit_JSON(self, type_, **kw):
2831 return "JSON"
2832
2833 def visit_JSONB(self, type_, **kw):
2834 return "JSONB"
2835
2836 def visit_INT4MULTIRANGE(self, type_, **kw):
2837 return "INT4MULTIRANGE"
2838
2839 def visit_INT8MULTIRANGE(self, type_, **kw):
2840 return "INT8MULTIRANGE"
2841
2842 def visit_NUMMULTIRANGE(self, type_, **kw):
2843 return "NUMMULTIRANGE"
2844
2845 def visit_DATEMULTIRANGE(self, type_, **kw):
2846 return "DATEMULTIRANGE"
2847
2848 def visit_TSMULTIRANGE(self, type_, **kw):
2849 return "TSMULTIRANGE"
2850
2851 def visit_TSTZMULTIRANGE(self, type_, **kw):
2852 return "TSTZMULTIRANGE"
2853
2854 def visit_INT4RANGE(self, type_, **kw):
2855 return "INT4RANGE"
2856
2857 def visit_INT8RANGE(self, type_, **kw):
2858 return "INT8RANGE"
2859
2860 def visit_NUMRANGE(self, type_, **kw):
2861 return "NUMRANGE"
2862
2863 def visit_DATERANGE(self, type_, **kw):
2864 return "DATERANGE"
2865
2866 def visit_TSRANGE(self, type_, **kw):
2867 return "TSRANGE"
2868
2869 def visit_TSTZRANGE(self, type_, **kw):
2870 return "TSTZRANGE"
2871
2872 def visit_json_int_index(self, type_, **kw):
2873 return "INT"
2874
2875 def visit_json_str_index(self, type_, **kw):
2876 return "TEXT"
2877
2878 def visit_datetime(self, type_, **kw):
2879 return self.visit_TIMESTAMP(type_, **kw)
2880
2881 def visit_enum(self, type_, **kw):
2882 if not type_.native_enum or not self.dialect.supports_native_enum:
2883 return super().visit_enum(type_, **kw)
2884 else:
2885 return self.visit_ENUM(type_, **kw)
2886
2887 def visit_ENUM(self, type_, identifier_preparer=None, **kw):
2888 if identifier_preparer is None:
2889 identifier_preparer = self.dialect.identifier_preparer
2890 return identifier_preparer.format_type(type_)
2891
2892 def visit_DOMAIN(self, type_, identifier_preparer=None, **kw):
2893 if identifier_preparer is None:
2894 identifier_preparer = self.dialect.identifier_preparer
2895 return identifier_preparer.format_type(type_)
2896
2897 def visit_TIMESTAMP(self, type_, **kw):
2898 return "TIMESTAMP%s %s" % (
2899 (
2900 "(%d)" % type_.precision
2901 if getattr(type_, "precision", None) is not None
2902 else ""
2903 ),
2904 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
2905 )
2906
2907 def visit_TIME(self, type_, **kw):
2908 return "TIME%s %s" % (
2909 (
2910 "(%d)" % type_.precision
2911 if getattr(type_, "precision", None) is not None
2912 else ""
2913 ),
2914 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
2915 )
2916
2917 def visit_INTERVAL(self, type_, **kw):
2918 text = "INTERVAL"
2919 if type_.fields is not None:
2920 text += " " + type_.fields
2921 if type_.precision is not None:
2922 text += " (%d)" % type_.precision
2923 return text
2924
2925 def visit_BIT(self, type_, **kw):
2926 if type_.varying:
2927 compiled = "BIT VARYING"
2928 if type_.length is not None:
2929 compiled += "(%d)" % type_.length
2930 else:
2931 compiled = "BIT(%d)" % type_.length
2932 return compiled
2933
2934 def visit_uuid(self, type_, **kw):
2935 if type_.native_uuid:
2936 return self.visit_UUID(type_, **kw)
2937 else:
2938 return super().visit_uuid(type_, **kw)
2939
2940 def visit_UUID(self, type_, **kw):
2941 return "UUID"
2942
2943 def visit_large_binary(self, type_, **kw):
2944 return self.visit_BYTEA(type_, **kw)
2945
2946 def visit_BYTEA(self, type_, **kw):
2947 return "BYTEA"
2948
2949 def visit_ARRAY(self, type_, **kw):
2950 inner = self.process(type_.item_type, **kw)
2951 return re.sub(
2952 r"((?: COLLATE.*)?)$",
2953 (
2954 r"%s\1"
2955 % (
2956 "[]"
2957 * (type_.dimensions if type_.dimensions is not None else 1)
2958 )
2959 ),
2960 inner,
2961 count=1,
2962 )
2963
2964 def visit_json_path(self, type_, **kw):
2965 return self.visit_JSONPATH(type_, **kw)
2966
2967 def visit_JSONPATH(self, type_, **kw):
2968 return "JSONPATH"
2969
2970
2971class PGIdentifierPreparer(compiler.IdentifierPreparer):
2972 reserved_words = RESERVED_WORDS
2973
2974 def _unquote_identifier(self, value):
2975 if value[0] == self.initial_quote:
2976 value = value[1:-1].replace(
2977 self.escape_to_quote, self.escape_quote
2978 )
2979 return value
2980
2981 def format_type(self, type_, use_schema=True):
2982 if not type_.name:
2983 raise exc.CompileError(
2984 f"PostgreSQL {type_.__class__.__name__} type requires a name."
2985 )
2986
2987 name = self.quote(type_.name)
2988 effective_schema = self.schema_for_object(type_)
2989
2990 if (
2991 not self.omit_schema
2992 and use_schema
2993 and effective_schema is not None
2994 ):
2995 name = f"{self.quote_schema(effective_schema)}.{name}"
2996 return name
2997
2998
2999class ReflectedNamedType(TypedDict):
3000 """Represents a reflected named type."""
3001
3002 name: str
3003 """Name of the type."""
3004 schema: str
3005 """The schema of the type."""
3006 visible: bool
3007 """Indicates if this type is in the current search path."""
3008
3009
3010class ReflectedDomainConstraint(TypedDict):
3011 """Represents a reflect check constraint of a domain."""
3012
3013 name: str
3014 """Name of the constraint."""
3015 check: str
3016 """The check constraint text."""
3017
3018
3019class ReflectedDomain(ReflectedNamedType):
3020 """Represents a reflected enum."""
3021
3022 type: str
3023 """The string name of the underlying data type of the domain."""
3024 nullable: bool
3025 """Indicates if the domain allows null or not."""
3026 default: Optional[str]
3027 """The string representation of the default value of this domain
3028 or ``None`` if none present.
3029 """
3030 constraints: List[ReflectedDomainConstraint]
3031 """The constraints defined in the domain, if any.
3032 The constraint are in order of evaluation by postgresql.
3033 """
3034 collation: Optional[str]
3035 """The collation for the domain."""
3036
3037
3038class ReflectedEnum(ReflectedNamedType):
3039 """Represents a reflected enum."""
3040
3041 labels: List[str]
3042 """The labels that compose the enum."""
3043
3044
3045class PGInspector(reflection.Inspector):
3046 dialect: PGDialect
3047
3048 def get_table_oid(
3049 self, table_name: str, schema: Optional[str] = None
3050 ) -> int:
3051 """Return the OID for the given table name.
3052
3053 :param table_name: string name of the table. For special quoting,
3054 use :class:`.quoted_name`.
3055
3056 :param schema: string schema name; if omitted, uses the default schema
3057 of the database connection. For special quoting,
3058 use :class:`.quoted_name`.
3059
3060 """
3061
3062 with self._operation_context() as conn:
3063 return self.dialect.get_table_oid(
3064 conn, table_name, schema, info_cache=self.info_cache
3065 )
3066
3067 def get_domains(
3068 self, schema: Optional[str] = None
3069 ) -> List[ReflectedDomain]:
3070 """Return a list of DOMAIN objects.
3071
3072 Each member is a dictionary containing these fields:
3073
3074 * name - name of the domain
3075 * schema - the schema name for the domain.
3076 * visible - boolean, whether or not this domain is visible
3077 in the default search path.
3078 * type - the type defined by this domain.
3079 * nullable - Indicates if this domain can be ``NULL``.
3080 * default - The default value of the domain or ``None`` if the
3081 domain has no default.
3082 * constraints - A list of dict with the constraint defined by this
3083 domain. Each element contains two keys: ``name`` of the
3084 constraint and ``check`` with the constraint text.
3085
3086 :param schema: schema name. If None, the default schema
3087 (typically 'public') is used. May also be set to ``'*'`` to
3088 indicate load domains for all schemas.
3089
3090 .. versionadded:: 2.0
3091
3092 """
3093 with self._operation_context() as conn:
3094 return self.dialect._load_domains(
3095 conn, schema, info_cache=self.info_cache
3096 )
3097
3098 def get_enums(self, schema: Optional[str] = None) -> List[ReflectedEnum]:
3099 """Return a list of ENUM objects.
3100
3101 Each member is a dictionary containing these fields:
3102
3103 * name - name of the enum
3104 * schema - the schema name for the enum.
3105 * visible - boolean, whether or not this enum is visible
3106 in the default search path.
3107 * labels - a list of string labels that apply to the enum.
3108
3109 :param schema: schema name. If None, the default schema
3110 (typically 'public') is used. May also be set to ``'*'`` to
3111 indicate load enums for all schemas.
3112
3113 """
3114 with self._operation_context() as conn:
3115 return self.dialect._load_enums(
3116 conn, schema, info_cache=self.info_cache
3117 )
3118
3119 def get_foreign_table_names(
3120 self, schema: Optional[str] = None
3121 ) -> List[str]:
3122 """Return a list of FOREIGN TABLE names.
3123
3124 Behavior is similar to that of
3125 :meth:`_reflection.Inspector.get_table_names`,
3126 except that the list is limited to those tables that report a
3127 ``relkind`` value of ``f``.
3128
3129 """
3130 with self._operation_context() as conn:
3131 return self.dialect._get_foreign_table_names(
3132 conn, schema, info_cache=self.info_cache
3133 )
3134
3135 def has_type(
3136 self, type_name: str, schema: Optional[str] = None, **kw: Any
3137 ) -> bool:
3138 """Return if the database has the specified type in the provided
3139 schema.
3140
3141 :param type_name: the type to check.
3142 :param schema: schema name. If None, the default schema
3143 (typically 'public') is used. May also be set to ``'*'`` to
3144 check in all schemas.
3145
3146 .. versionadded:: 2.0
3147
3148 """
3149 with self._operation_context() as conn:
3150 return self.dialect.has_type(
3151 conn, type_name, schema, info_cache=self.info_cache
3152 )
3153
3154
3155class PGExecutionContext(default.DefaultExecutionContext):
3156 def fire_sequence(self, seq, type_):
3157 return self._execute_scalar(
3158 (
3159 "select nextval('%s')"
3160 % self.identifier_preparer.format_sequence(seq)
3161 ),
3162 type_,
3163 )
3164
3165 def get_insert_default(self, column):
3166 if column.primary_key and column is column.table._autoincrement_column:
3167 if column.server_default and column.server_default.has_argument:
3168 # pre-execute passive defaults on primary key columns
3169 return self._execute_scalar(
3170 "select %s" % column.server_default.arg, column.type
3171 )
3172
3173 elif column.default is None or (
3174 column.default.is_sequence and column.default.optional
3175 ):
3176 # execute the sequence associated with a SERIAL primary
3177 # key column. for non-primary-key SERIAL, the ID just
3178 # generates server side.
3179
3180 try:
3181 seq_name = column._postgresql_seq_name
3182 except AttributeError:
3183 tab = column.table.name
3184 col = column.name
3185 tab = tab[0 : 29 + max(0, (29 - len(col)))]
3186 col = col[0 : 29 + max(0, (29 - len(tab)))]
3187 name = "%s_%s_seq" % (tab, col)
3188 column._postgresql_seq_name = seq_name = name
3189
3190 if column.table is not None:
3191 effective_schema = self.connection.schema_for_object(
3192 column.table
3193 )
3194 else:
3195 effective_schema = None
3196
3197 if effective_schema is not None:
3198 exc = 'select nextval(\'"%s"."%s"\')' % (
3199 effective_schema,
3200 seq_name,
3201 )
3202 else:
3203 exc = "select nextval('\"%s\"')" % (seq_name,)
3204
3205 return self._execute_scalar(exc, column.type)
3206
3207 return super().get_insert_default(column)
3208
3209
3210class PGReadOnlyConnectionCharacteristic(
3211 characteristics.ConnectionCharacteristic
3212):
3213 transactional = True
3214
3215 def reset_characteristic(self, dialect, dbapi_conn):
3216 dialect.set_readonly(dbapi_conn, False)
3217
3218 def set_characteristic(self, dialect, dbapi_conn, value):
3219 dialect.set_readonly(dbapi_conn, value)
3220
3221 def get_characteristic(self, dialect, dbapi_conn):
3222 return dialect.get_readonly(dbapi_conn)
3223
3224
3225class PGDeferrableConnectionCharacteristic(
3226 characteristics.ConnectionCharacteristic
3227):
3228 transactional = True
3229
3230 def reset_characteristic(self, dialect, dbapi_conn):
3231 dialect.set_deferrable(dbapi_conn, False)
3232
3233 def set_characteristic(self, dialect, dbapi_conn, value):
3234 dialect.set_deferrable(dbapi_conn, value)
3235
3236 def get_characteristic(self, dialect, dbapi_conn):
3237 return dialect.get_deferrable(dbapi_conn)
3238
3239
3240class PGDialect(default.DefaultDialect):
3241 name = "postgresql"
3242 supports_statement_cache = True
3243 supports_alter = True
3244 max_identifier_length = 63
3245 supports_sane_rowcount = True
3246
3247 bind_typing = interfaces.BindTyping.RENDER_CASTS
3248
3249 supports_native_enum = True
3250 supports_native_boolean = True
3251 supports_native_uuid = True
3252 supports_smallserial = True
3253
3254 supports_sequences = True
3255 sequences_optional = True
3256 preexecute_autoincrement_sequences = True
3257 postfetch_lastrowid = False
3258 use_insertmanyvalues = True
3259
3260 returns_native_bytes = True
3261
3262 insertmanyvalues_implicit_sentinel = (
3263 InsertmanyvaluesSentinelOpts.ANY_AUTOINCREMENT
3264 | InsertmanyvaluesSentinelOpts.USE_INSERT_FROM_SELECT
3265 | InsertmanyvaluesSentinelOpts.RENDER_SELECT_COL_CASTS
3266 )
3267
3268 supports_comments = True
3269 supports_constraint_comments = True
3270 supports_default_values = True
3271
3272 supports_default_metavalue = True
3273
3274 supports_empty_insert = False
3275 supports_multivalues_insert = True
3276
3277 supports_identity_columns = True
3278
3279 default_paramstyle = "pyformat"
3280 ischema_names = ischema_names
3281 colspecs = colspecs
3282
3283 statement_compiler = PGCompiler
3284 ddl_compiler = PGDDLCompiler
3285 type_compiler_cls = PGTypeCompiler
3286 preparer = PGIdentifierPreparer
3287 execution_ctx_cls = PGExecutionContext
3288 inspector = PGInspector
3289
3290 update_returning = True
3291 delete_returning = True
3292 insert_returning = True
3293 update_returning_multifrom = True
3294 delete_returning_multifrom = True
3295
3296 connection_characteristics = (
3297 default.DefaultDialect.connection_characteristics
3298 )
3299 connection_characteristics = connection_characteristics.union(
3300 {
3301 "postgresql_readonly": PGReadOnlyConnectionCharacteristic(),
3302 "postgresql_deferrable": PGDeferrableConnectionCharacteristic(),
3303 }
3304 )
3305
3306 construct_arguments = [
3307 (
3308 schema.Index,
3309 {
3310 "using": False,
3311 "include": None,
3312 "where": None,
3313 "ops": {},
3314 "concurrently": False,
3315 "with": {},
3316 "tablespace": None,
3317 "nulls_not_distinct": None,
3318 },
3319 ),
3320 (
3321 schema.Table,
3322 {
3323 "ignore_search_path": False,
3324 "tablespace": None,
3325 "partition_by": None,
3326 "with_oids": None,
3327 "on_commit": None,
3328 "inherits": None,
3329 "using": None,
3330 },
3331 ),
3332 (
3333 schema.CheckConstraint,
3334 {
3335 "not_valid": False,
3336 },
3337 ),
3338 (
3339 schema.ForeignKeyConstraint,
3340 {
3341 "not_valid": False,
3342 },
3343 ),
3344 (
3345 schema.PrimaryKeyConstraint,
3346 {"include": None},
3347 ),
3348 (
3349 schema.UniqueConstraint,
3350 {
3351 "include": None,
3352 "nulls_not_distinct": None,
3353 },
3354 ),
3355 ]
3356
3357 reflection_options = ("postgresql_ignore_search_path",)
3358
3359 _backslash_escapes = True
3360 _supports_create_index_concurrently = True
3361 _supports_drop_index_concurrently = True
3362 _supports_jsonb_subscripting = True
3363
3364 def __init__(
3365 self,
3366 native_inet_types=None,
3367 json_serializer=None,
3368 json_deserializer=None,
3369 **kwargs,
3370 ):
3371 default.DefaultDialect.__init__(self, **kwargs)
3372
3373 self._native_inet_types = native_inet_types
3374 self._json_deserializer = json_deserializer
3375 self._json_serializer = json_serializer
3376
3377 def initialize(self, connection):
3378 super().initialize(connection)
3379
3380 # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689
3381 self.supports_smallserial = self.server_version_info >= (9, 2)
3382
3383 self._set_backslash_escapes(connection)
3384
3385 self._supports_drop_index_concurrently = self.server_version_info >= (
3386 9,
3387 2,
3388 )
3389 self.supports_identity_columns = self.server_version_info >= (10,)
3390
3391 self._supports_jsonb_subscripting = self.server_version_info >= (14,)
3392
3393 def get_isolation_level_values(self, dbapi_conn):
3394 # note the generic dialect doesn't have AUTOCOMMIT, however
3395 # all postgresql dialects should include AUTOCOMMIT.
3396 return (
3397 "SERIALIZABLE",
3398 "READ UNCOMMITTED",
3399 "READ COMMITTED",
3400 "REPEATABLE READ",
3401 )
3402
3403 def set_isolation_level(self, dbapi_connection, level):
3404 cursor = dbapi_connection.cursor()
3405 cursor.execute(
3406 "SET SESSION CHARACTERISTICS AS TRANSACTION "
3407 f"ISOLATION LEVEL {level}"
3408 )
3409 cursor.execute("COMMIT")
3410 cursor.close()
3411
3412 def get_isolation_level(self, dbapi_connection):
3413 cursor = dbapi_connection.cursor()
3414 cursor.execute("show transaction isolation level")
3415 val = cursor.fetchone()[0]
3416 cursor.close()
3417 return val.upper()
3418
3419 def set_readonly(self, connection, value):
3420 raise NotImplementedError()
3421
3422 def get_readonly(self, connection):
3423 raise NotImplementedError()
3424
3425 def set_deferrable(self, connection, value):
3426 raise NotImplementedError()
3427
3428 def get_deferrable(self, connection):
3429 raise NotImplementedError()
3430
3431 def _split_multihost_from_url(self, url: URL) -> Union[
3432 Tuple[None, None],
3433 Tuple[Tuple[Optional[str], ...], Tuple[Optional[int], ...]],
3434 ]:
3435 hosts: Optional[Tuple[Optional[str], ...]] = None
3436 ports_str: Union[str, Tuple[Optional[str], ...], None] = None
3437
3438 integrated_multihost = False
3439
3440 if "host" in url.query:
3441 if isinstance(url.query["host"], (list, tuple)):
3442 integrated_multihost = True
3443 hosts, ports_str = zip(
3444 *[
3445 token.split(":") if ":" in token else (token, None)
3446 for token in url.query["host"]
3447 ]
3448 )
3449
3450 elif isinstance(url.query["host"], str):
3451 hosts = tuple(url.query["host"].split(","))
3452
3453 if (
3454 "port" not in url.query
3455 and len(hosts) == 1
3456 and ":" in hosts[0]
3457 ):
3458 # internet host is alphanumeric plus dots or hyphens.
3459 # this is essentially rfc1123, which refers to rfc952.
3460 # https://stackoverflow.com/questions/3523028/
3461 # valid-characters-of-a-hostname
3462 host_port_match = re.match(
3463 r"^([a-zA-Z0-9\-\.]*)(?:\:(\d*))?$", hosts[0]
3464 )
3465 if host_port_match:
3466 integrated_multihost = True
3467 h, p = host_port_match.group(1, 2)
3468 if TYPE_CHECKING:
3469 assert isinstance(h, str)
3470 assert isinstance(p, str)
3471 hosts = (h,)
3472 ports_str = cast(
3473 "Tuple[Optional[str], ...]", (p,) if p else (None,)
3474 )
3475
3476 if "port" in url.query:
3477 if integrated_multihost:
3478 raise exc.ArgumentError(
3479 "Can't mix 'multihost' formats together; use "
3480 '"host=h1,h2,h3&port=p1,p2,p3" or '
3481 '"host=h1:p1&host=h2:p2&host=h3:p3" separately'
3482 )
3483 if isinstance(url.query["port"], (list, tuple)):
3484 ports_str = url.query["port"]
3485 elif isinstance(url.query["port"], str):
3486 ports_str = tuple(url.query["port"].split(","))
3487
3488 ports: Optional[Tuple[Optional[int], ...]] = None
3489
3490 if ports_str:
3491 try:
3492 ports = tuple(int(x) if x else None for x in ports_str)
3493 except ValueError:
3494 raise exc.ArgumentError(
3495 f"Received non-integer port arguments: {ports_str}"
3496 ) from None
3497
3498 if ports and (
3499 (not hosts and len(ports) > 1)
3500 or (
3501 hosts
3502 and ports
3503 and len(hosts) != len(ports)
3504 and (len(hosts) > 1 or len(ports) > 1)
3505 )
3506 ):
3507 raise exc.ArgumentError("number of hosts and ports don't match")
3508
3509 if hosts is not None:
3510 if ports is None:
3511 ports = tuple(None for _ in hosts)
3512
3513 return hosts, ports # type: ignore
3514
3515 def do_begin_twophase(self, connection, xid):
3516 self.do_begin(connection.connection)
3517
3518 def do_prepare_twophase(self, connection, xid):
3519 connection.execute(
3520 sql.text("PREPARE TRANSACTION :xid").bindparams(
3521 sql.bindparam("xid", xid, literal_execute=True)
3522 )
3523 )
3524
3525 def do_rollback_twophase(
3526 self, connection, xid, is_prepared=True, recover=False
3527 ):
3528 if is_prepared:
3529 if recover:
3530 # FIXME: ugly hack to get out of transaction
3531 # context when committing recoverable transactions
3532 # Must find out a way how to make the dbapi not
3533 # open a transaction.
3534 connection.exec_driver_sql("ROLLBACK")
3535 connection.execute(
3536 sql.text("ROLLBACK PREPARED :xid").bindparams(
3537 sql.bindparam("xid", xid, literal_execute=True)
3538 )
3539 )
3540 connection.exec_driver_sql("BEGIN")
3541 self.do_rollback(connection.connection)
3542 else:
3543 self.do_rollback(connection.connection)
3544
3545 def do_commit_twophase(
3546 self, connection, xid, is_prepared=True, recover=False
3547 ):
3548 if is_prepared:
3549 if recover:
3550 connection.exec_driver_sql("ROLLBACK")
3551 connection.execute(
3552 sql.text("COMMIT PREPARED :xid").bindparams(
3553 sql.bindparam("xid", xid, literal_execute=True)
3554 )
3555 )
3556 connection.exec_driver_sql("BEGIN")
3557 self.do_rollback(connection.connection)
3558 else:
3559 self.do_commit(connection.connection)
3560
3561 def do_recover_twophase(self, connection):
3562 return connection.scalars(
3563 sql.text("SELECT gid FROM pg_prepared_xacts")
3564 ).all()
3565
3566 def _get_default_schema_name(self, connection):
3567 return connection.exec_driver_sql("select current_schema()").scalar()
3568
3569 @reflection.cache
3570 def has_schema(self, connection, schema, **kw):
3571 query = select(pg_catalog.pg_namespace.c.nspname).where(
3572 pg_catalog.pg_namespace.c.nspname == schema
3573 )
3574 return bool(connection.scalar(query))
3575
3576 def _pg_class_filter_scope_schema(
3577 self, query, schema, scope, pg_class_table=None
3578 ):
3579 if pg_class_table is None:
3580 pg_class_table = pg_catalog.pg_class
3581 query = query.join(
3582 pg_catalog.pg_namespace,
3583 pg_catalog.pg_namespace.c.oid == pg_class_table.c.relnamespace,
3584 )
3585
3586 if scope is ObjectScope.DEFAULT:
3587 query = query.where(pg_class_table.c.relpersistence != "t")
3588 elif scope is ObjectScope.TEMPORARY:
3589 query = query.where(pg_class_table.c.relpersistence == "t")
3590
3591 if schema is None:
3592 query = query.where(
3593 pg_catalog.pg_table_is_visible(pg_class_table.c.oid),
3594 # ignore pg_catalog schema
3595 pg_catalog.pg_namespace.c.nspname != "pg_catalog",
3596 )
3597 else:
3598 query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
3599 return query
3600
3601 def _pg_class_relkind_condition(self, relkinds, pg_class_table=None):
3602 if pg_class_table is None:
3603 pg_class_table = pg_catalog.pg_class
3604 # uses the any form instead of in otherwise postgresql complaings
3605 # that 'IN could not convert type character to "char"'
3606 return pg_class_table.c.relkind == sql.any_(_array.array(relkinds))
3607
3608 @lru_cache()
3609 def _has_table_query(self, schema):
3610 query = select(pg_catalog.pg_class.c.relname).where(
3611 pg_catalog.pg_class.c.relname == bindparam("table_name"),
3612 self._pg_class_relkind_condition(
3613 pg_catalog.RELKINDS_ALL_TABLE_LIKE
3614 ),
3615 )
3616 return self._pg_class_filter_scope_schema(
3617 query, schema, scope=ObjectScope.ANY
3618 )
3619
3620 @reflection.cache
3621 def has_table(self, connection, table_name, schema=None, **kw):
3622 self._ensure_has_table_connection(connection)
3623 query = self._has_table_query(schema)
3624 return bool(connection.scalar(query, {"table_name": table_name}))
3625
3626 @reflection.cache
3627 def has_sequence(self, connection, sequence_name, schema=None, **kw):
3628 query = select(pg_catalog.pg_class.c.relname).where(
3629 pg_catalog.pg_class.c.relkind == "S",
3630 pg_catalog.pg_class.c.relname == sequence_name,
3631 )
3632 query = self._pg_class_filter_scope_schema(
3633 query, schema, scope=ObjectScope.ANY
3634 )
3635 return bool(connection.scalar(query))
3636
3637 @reflection.cache
3638 def has_type(self, connection, type_name, schema=None, **kw):
3639 query = (
3640 select(pg_catalog.pg_type.c.typname)
3641 .join(
3642 pg_catalog.pg_namespace,
3643 pg_catalog.pg_namespace.c.oid
3644 == pg_catalog.pg_type.c.typnamespace,
3645 )
3646 .where(pg_catalog.pg_type.c.typname == type_name)
3647 )
3648 if schema is None:
3649 query = query.where(
3650 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid),
3651 # ignore pg_catalog schema
3652 pg_catalog.pg_namespace.c.nspname != "pg_catalog",
3653 )
3654 elif schema != "*":
3655 query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
3656
3657 return bool(connection.scalar(query))
3658
3659 def _get_server_version_info(self, connection):
3660 v = connection.exec_driver_sql("select pg_catalog.version()").scalar()
3661 m = re.match(
3662 r".*(?:PostgreSQL|EnterpriseDB) "
3663 r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?",
3664 v,
3665 )
3666 if not m:
3667 raise AssertionError(
3668 "Could not determine version from string '%s'" % v
3669 )
3670 return tuple([int(x) for x in m.group(1, 2, 3) if x is not None])
3671
3672 @reflection.cache
3673 def get_table_oid(self, connection, table_name, schema=None, **kw):
3674 """Fetch the oid for schema.table_name."""
3675 query = select(pg_catalog.pg_class.c.oid).where(
3676 pg_catalog.pg_class.c.relname == table_name,
3677 self._pg_class_relkind_condition(
3678 pg_catalog.RELKINDS_ALL_TABLE_LIKE
3679 ),
3680 )
3681 query = self._pg_class_filter_scope_schema(
3682 query, schema, scope=ObjectScope.ANY
3683 )
3684 table_oid = connection.scalar(query)
3685 if table_oid is None:
3686 raise exc.NoSuchTableError(
3687 f"{schema}.{table_name}" if schema else table_name
3688 )
3689 return table_oid
3690
3691 @reflection.cache
3692 def get_schema_names(self, connection, **kw):
3693 query = (
3694 select(pg_catalog.pg_namespace.c.nspname)
3695 .where(pg_catalog.pg_namespace.c.nspname.not_like("pg_%"))
3696 .order_by(pg_catalog.pg_namespace.c.nspname)
3697 )
3698 return connection.scalars(query).all()
3699
3700 def _get_relnames_for_relkinds(self, connection, schema, relkinds, scope):
3701 query = select(pg_catalog.pg_class.c.relname).where(
3702 self._pg_class_relkind_condition(relkinds)
3703 )
3704 query = self._pg_class_filter_scope_schema(query, schema, scope=scope)
3705 return connection.scalars(query).all()
3706
3707 @reflection.cache
3708 def get_table_names(self, connection, schema=None, **kw):
3709 return self._get_relnames_for_relkinds(
3710 connection,
3711 schema,
3712 pg_catalog.RELKINDS_TABLE_NO_FOREIGN,
3713 scope=ObjectScope.DEFAULT,
3714 )
3715
3716 @reflection.cache
3717 def get_temp_table_names(self, connection, **kw):
3718 return self._get_relnames_for_relkinds(
3719 connection,
3720 schema=None,
3721 relkinds=pg_catalog.RELKINDS_TABLE_NO_FOREIGN,
3722 scope=ObjectScope.TEMPORARY,
3723 )
3724
3725 @reflection.cache
3726 def _get_foreign_table_names(self, connection, schema=None, **kw):
3727 return self._get_relnames_for_relkinds(
3728 connection, schema, relkinds=("f",), scope=ObjectScope.ANY
3729 )
3730
3731 @reflection.cache
3732 def get_view_names(self, connection, schema=None, **kw):
3733 return self._get_relnames_for_relkinds(
3734 connection,
3735 schema,
3736 pg_catalog.RELKINDS_VIEW,
3737 scope=ObjectScope.DEFAULT,
3738 )
3739
3740 @reflection.cache
3741 def get_materialized_view_names(self, connection, schema=None, **kw):
3742 return self._get_relnames_for_relkinds(
3743 connection,
3744 schema,
3745 pg_catalog.RELKINDS_MAT_VIEW,
3746 scope=ObjectScope.DEFAULT,
3747 )
3748
3749 @reflection.cache
3750 def get_temp_view_names(self, connection, schema=None, **kw):
3751 return self._get_relnames_for_relkinds(
3752 connection,
3753 schema,
3754 # NOTE: do not include temp materialzied views (that do not
3755 # seem to be a thing at least up to version 14)
3756 pg_catalog.RELKINDS_VIEW,
3757 scope=ObjectScope.TEMPORARY,
3758 )
3759
3760 @reflection.cache
3761 def get_sequence_names(self, connection, schema=None, **kw):
3762 return self._get_relnames_for_relkinds(
3763 connection, schema, relkinds=("S",), scope=ObjectScope.ANY
3764 )
3765
3766 @reflection.cache
3767 def get_view_definition(self, connection, view_name, schema=None, **kw):
3768 query = (
3769 select(pg_catalog.pg_get_viewdef(pg_catalog.pg_class.c.oid))
3770 .select_from(pg_catalog.pg_class)
3771 .where(
3772 pg_catalog.pg_class.c.relname == view_name,
3773 self._pg_class_relkind_condition(
3774 pg_catalog.RELKINDS_VIEW + pg_catalog.RELKINDS_MAT_VIEW
3775 ),
3776 )
3777 )
3778 query = self._pg_class_filter_scope_schema(
3779 query, schema, scope=ObjectScope.ANY
3780 )
3781 res = connection.scalar(query)
3782 if res is None:
3783 raise exc.NoSuchTableError(
3784 f"{schema}.{view_name}" if schema else view_name
3785 )
3786 else:
3787 return res
3788
3789 def _value_or_raise(self, data, table, schema):
3790 try:
3791 return dict(data)[(schema, table)]
3792 except KeyError:
3793 raise exc.NoSuchTableError(
3794 f"{schema}.{table}" if schema else table
3795 ) from None
3796
3797 def _prepare_filter_names(self, filter_names):
3798 if filter_names:
3799 return True, {"filter_names": filter_names}
3800 else:
3801 return False, {}
3802
3803 def _kind_to_relkinds(self, kind: ObjectKind) -> Tuple[str, ...]:
3804 if kind is ObjectKind.ANY:
3805 return pg_catalog.RELKINDS_ALL_TABLE_LIKE
3806 relkinds = ()
3807 if ObjectKind.TABLE in kind:
3808 relkinds += pg_catalog.RELKINDS_TABLE
3809 if ObjectKind.VIEW in kind:
3810 relkinds += pg_catalog.RELKINDS_VIEW
3811 if ObjectKind.MATERIALIZED_VIEW in kind:
3812 relkinds += pg_catalog.RELKINDS_MAT_VIEW
3813 return relkinds
3814
3815 @reflection.cache
3816 def get_columns(self, connection, table_name, schema=None, **kw):
3817 data = self.get_multi_columns(
3818 connection,
3819 schema=schema,
3820 filter_names=[table_name],
3821 scope=ObjectScope.ANY,
3822 kind=ObjectKind.ANY,
3823 **kw,
3824 )
3825 return self._value_or_raise(data, table_name, schema)
3826
3827 @lru_cache()
3828 def _columns_query(self, schema, has_filter_names, scope, kind):
3829 # NOTE: the query with the default and identity options scalar
3830 # subquery is faster than trying to use outer joins for them
3831 generated = (
3832 pg_catalog.pg_attribute.c.attgenerated.label("generated")
3833 if self.server_version_info >= (12,)
3834 else sql.null().label("generated")
3835 )
3836 if self.server_version_info >= (10,):
3837 # join lateral performs worse (~2x slower) than a scalar_subquery
3838 identity = (
3839 select(
3840 sql.func.json_build_object(
3841 "always",
3842 pg_catalog.pg_attribute.c.attidentity == "a",
3843 "start",
3844 pg_catalog.pg_sequence.c.seqstart,
3845 "increment",
3846 pg_catalog.pg_sequence.c.seqincrement,
3847 "minvalue",
3848 pg_catalog.pg_sequence.c.seqmin,
3849 "maxvalue",
3850 pg_catalog.pg_sequence.c.seqmax,
3851 "cache",
3852 pg_catalog.pg_sequence.c.seqcache,
3853 "cycle",
3854 pg_catalog.pg_sequence.c.seqcycle,
3855 type_=sqltypes.JSON(),
3856 )
3857 )
3858 .select_from(pg_catalog.pg_sequence)
3859 .where(
3860 # attidentity != '' is required or it will reflect also
3861 # serial columns as identity.
3862 pg_catalog.pg_attribute.c.attidentity != "",
3863 pg_catalog.pg_sequence.c.seqrelid
3864 == sql.cast(
3865 sql.cast(
3866 pg_catalog.pg_get_serial_sequence(
3867 sql.cast(
3868 sql.cast(
3869 pg_catalog.pg_attribute.c.attrelid,
3870 REGCLASS,
3871 ),
3872 TEXT,
3873 ),
3874 pg_catalog.pg_attribute.c.attname,
3875 ),
3876 REGCLASS,
3877 ),
3878 OID,
3879 ),
3880 )
3881 .correlate(pg_catalog.pg_attribute)
3882 .scalar_subquery()
3883 .label("identity_options")
3884 )
3885 else:
3886 identity = sql.null().label("identity_options")
3887
3888 # join lateral performs the same as scalar_subquery here
3889 default = (
3890 select(
3891 pg_catalog.pg_get_expr(
3892 pg_catalog.pg_attrdef.c.adbin,
3893 pg_catalog.pg_attrdef.c.adrelid,
3894 )
3895 )
3896 .select_from(pg_catalog.pg_attrdef)
3897 .where(
3898 pg_catalog.pg_attrdef.c.adrelid
3899 == pg_catalog.pg_attribute.c.attrelid,
3900 pg_catalog.pg_attrdef.c.adnum
3901 == pg_catalog.pg_attribute.c.attnum,
3902 pg_catalog.pg_attribute.c.atthasdef,
3903 )
3904 .correlate(pg_catalog.pg_attribute)
3905 .scalar_subquery()
3906 .label("default")
3907 )
3908
3909 # get the name of the collate when it's different from the default one
3910 collate = sql.case(
3911 (
3912 sql.and_(
3913 pg_catalog.pg_attribute.c.attcollation != 0,
3914 select(pg_catalog.pg_type.c.typcollation)
3915 .where(
3916 pg_catalog.pg_type.c.oid
3917 == pg_catalog.pg_attribute.c.atttypid,
3918 )
3919 .correlate(pg_catalog.pg_attribute)
3920 .scalar_subquery()
3921 != pg_catalog.pg_attribute.c.attcollation,
3922 ),
3923 select(pg_catalog.pg_collation.c.collname)
3924 .where(
3925 pg_catalog.pg_collation.c.oid
3926 == pg_catalog.pg_attribute.c.attcollation
3927 )
3928 .correlate(pg_catalog.pg_attribute)
3929 .scalar_subquery(),
3930 ),
3931 else_=sql.null(),
3932 ).label("collation")
3933
3934 relkinds = self._kind_to_relkinds(kind)
3935 query = (
3936 select(
3937 pg_catalog.pg_attribute.c.attname.label("name"),
3938 pg_catalog.format_type(
3939 pg_catalog.pg_attribute.c.atttypid,
3940 pg_catalog.pg_attribute.c.atttypmod,
3941 ).label("format_type"),
3942 default,
3943 pg_catalog.pg_attribute.c.attnotnull.label("not_null"),
3944 pg_catalog.pg_class.c.relname.label("table_name"),
3945 pg_catalog.pg_description.c.description.label("comment"),
3946 generated,
3947 identity,
3948 collate,
3949 )
3950 .select_from(pg_catalog.pg_class)
3951 # NOTE: postgresql support table with no user column, meaning
3952 # there is no row with pg_attribute.attnum > 0. use a left outer
3953 # join to avoid filtering these tables.
3954 .outerjoin(
3955 pg_catalog.pg_attribute,
3956 sql.and_(
3957 pg_catalog.pg_class.c.oid
3958 == pg_catalog.pg_attribute.c.attrelid,
3959 pg_catalog.pg_attribute.c.attnum > 0,
3960 ~pg_catalog.pg_attribute.c.attisdropped,
3961 ),
3962 )
3963 .outerjoin(
3964 pg_catalog.pg_description,
3965 sql.and_(
3966 pg_catalog.pg_description.c.objoid
3967 == pg_catalog.pg_attribute.c.attrelid,
3968 pg_catalog.pg_description.c.objsubid
3969 == pg_catalog.pg_attribute.c.attnum,
3970 ),
3971 )
3972 .where(self._pg_class_relkind_condition(relkinds))
3973 .order_by(
3974 pg_catalog.pg_class.c.relname, pg_catalog.pg_attribute.c.attnum
3975 )
3976 )
3977 query = self._pg_class_filter_scope_schema(query, schema, scope=scope)
3978 if has_filter_names:
3979 query = query.where(
3980 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
3981 )
3982 return query
3983
3984 def get_multi_columns(
3985 self, connection, schema, filter_names, scope, kind, **kw
3986 ):
3987 has_filter_names, params = self._prepare_filter_names(filter_names)
3988 query = self._columns_query(schema, has_filter_names, scope, kind)
3989 rows = connection.execute(query, params).mappings()
3990
3991 # dictionary with (name, ) if default search path or (schema, name)
3992 # as keys
3993 domains = {
3994 ((d["schema"], d["name"]) if not d["visible"] else (d["name"],)): d
3995 for d in self._load_domains(
3996 connection, schema="*", info_cache=kw.get("info_cache")
3997 )
3998 }
3999
4000 # dictionary with (name, ) if default search path or (schema, name)
4001 # as keys
4002 enums = dict(
4003 (
4004 ((rec["name"],), rec)
4005 if rec["visible"]
4006 else ((rec["schema"], rec["name"]), rec)
4007 )
4008 for rec in self._load_enums(
4009 connection, schema="*", info_cache=kw.get("info_cache")
4010 )
4011 )
4012
4013 columns = self._get_columns_info(rows, domains, enums, schema)
4014
4015 return columns.items()
4016
4017 _format_type_args_pattern = re.compile(r"\((.*)\)")
4018 _format_type_args_delim = re.compile(r"\s*,\s*")
4019 _format_array_spec_pattern = re.compile(r"((?:\[\])*)$")
4020
4021 def _reflect_type(
4022 self,
4023 format_type: Optional[str],
4024 domains: Dict[str, ReflectedDomain],
4025 enums: Dict[str, ReflectedEnum],
4026 type_description: str,
4027 collation: Optional[str],
4028 ) -> sqltypes.TypeEngine[Any]:
4029 """
4030 Attempts to reconstruct a column type defined in ischema_names based
4031 on the information available in the format_type.
4032
4033 If the `format_type` cannot be associated with a known `ischema_names`,
4034 it is treated as a reference to a known PostgreSQL named `ENUM` or
4035 `DOMAIN` type.
4036 """
4037 type_description = type_description or "unknown type"
4038 if format_type is None:
4039 util.warn(
4040 "PostgreSQL format_type() returned NULL for %s"
4041 % type_description
4042 )
4043 return sqltypes.NULLTYPE
4044
4045 attype_args_match = self._format_type_args_pattern.search(format_type)
4046 if attype_args_match and attype_args_match.group(1):
4047 attype_args = self._format_type_args_delim.split(
4048 attype_args_match.group(1)
4049 )
4050 else:
4051 attype_args = ()
4052
4053 match_array_dim = self._format_array_spec_pattern.search(format_type)
4054 # Each "[]" in array specs corresponds to an array dimension
4055 array_dim = len(match_array_dim.group(1) or "") // 2
4056
4057 # Remove all parameters and array specs from format_type to obtain an
4058 # ischema_name candidate
4059 attype = self._format_type_args_pattern.sub("", format_type)
4060 attype = self._format_array_spec_pattern.sub("", attype)
4061
4062 schema_type = self.ischema_names.get(attype.lower(), None)
4063 args, kwargs = (), {}
4064
4065 if attype == "numeric":
4066 if len(attype_args) == 2:
4067 precision, scale = map(int, attype_args)
4068 args = (precision, scale)
4069
4070 elif attype == "double precision":
4071 args = (53,)
4072
4073 elif attype == "integer":
4074 args = ()
4075
4076 elif attype in ("timestamp with time zone", "time with time zone"):
4077 kwargs["timezone"] = True
4078 if len(attype_args) == 1:
4079 kwargs["precision"] = int(attype_args[0])
4080
4081 elif attype in (
4082 "timestamp without time zone",
4083 "time without time zone",
4084 "time",
4085 ):
4086 kwargs["timezone"] = False
4087 if len(attype_args) == 1:
4088 kwargs["precision"] = int(attype_args[0])
4089
4090 elif attype == "bit varying":
4091 kwargs["varying"] = True
4092 if len(attype_args) == 1:
4093 charlen = int(attype_args[0])
4094 args = (charlen,)
4095
4096 # a domain or enum can start with interval, so be mindful of that.
4097 elif attype == "interval" or attype.startswith("interval "):
4098 schema_type = INTERVAL
4099
4100 field_match = re.match(r"interval (.+)", attype)
4101 if field_match:
4102 kwargs["fields"] = field_match.group(1)
4103
4104 if len(attype_args) == 1:
4105 kwargs["precision"] = int(attype_args[0])
4106
4107 else:
4108 enum_or_domain_key = tuple(util.quoted_token_parser(attype))
4109
4110 if enum_or_domain_key in enums:
4111 schema_type = ENUM
4112 enum = enums[enum_or_domain_key]
4113
4114 kwargs["name"] = enum["name"]
4115
4116 if not enum["visible"]:
4117 kwargs["schema"] = enum["schema"]
4118 args = tuple(enum["labels"])
4119 elif enum_or_domain_key in domains:
4120 schema_type = DOMAIN
4121 domain = domains[enum_or_domain_key]
4122
4123 data_type = self._reflect_type(
4124 domain["type"],
4125 domains,
4126 enums,
4127 type_description="DOMAIN '%s'" % domain["name"],
4128 collation=domain["collation"],
4129 )
4130 args = (domain["name"], data_type)
4131
4132 kwargs["collation"] = domain["collation"]
4133 kwargs["default"] = domain["default"]
4134 kwargs["not_null"] = not domain["nullable"]
4135 kwargs["create_type"] = False
4136
4137 if domain["constraints"]:
4138 # We only support a single constraint
4139 check_constraint = domain["constraints"][0]
4140
4141 kwargs["constraint_name"] = check_constraint["name"]
4142 kwargs["check"] = check_constraint["check"]
4143
4144 if not domain["visible"]:
4145 kwargs["schema"] = domain["schema"]
4146
4147 else:
4148 try:
4149 charlen = int(attype_args[0])
4150 args = (charlen, *attype_args[1:])
4151 except (ValueError, IndexError):
4152 args = attype_args
4153
4154 if not schema_type:
4155 util.warn(
4156 "Did not recognize type '%s' of %s"
4157 % (attype, type_description)
4158 )
4159 return sqltypes.NULLTYPE
4160
4161 if collation is not None:
4162 kwargs["collation"] = collation
4163
4164 data_type = schema_type(*args, **kwargs)
4165 if array_dim >= 1:
4166 # postgres does not preserve dimensionality or size of array types.
4167 data_type = _array.ARRAY(data_type)
4168
4169 return data_type
4170
4171 def _get_columns_info(self, rows, domains, enums, schema):
4172 columns = defaultdict(list)
4173 for row_dict in rows:
4174 # ensure that each table has an entry, even if it has no columns
4175 if row_dict["name"] is None:
4176 columns[(schema, row_dict["table_name"])] = (
4177 ReflectionDefaults.columns()
4178 )
4179 continue
4180 table_cols = columns[(schema, row_dict["table_name"])]
4181
4182 collation = row_dict["collation"]
4183
4184 coltype = self._reflect_type(
4185 row_dict["format_type"],
4186 domains,
4187 enums,
4188 type_description="column '%s'" % row_dict["name"],
4189 collation=collation,
4190 )
4191
4192 default = row_dict["default"]
4193 name = row_dict["name"]
4194 generated = row_dict["generated"]
4195 nullable = not row_dict["not_null"]
4196
4197 if isinstance(coltype, DOMAIN):
4198 if not default:
4199 # domain can override the default value but
4200 # can't set it to None
4201 if coltype.default is not None:
4202 default = coltype.default
4203
4204 nullable = nullable and not coltype.not_null
4205
4206 identity = row_dict["identity_options"]
4207
4208 # If a zero byte or blank string depending on driver (is also
4209 # absent for older PG versions), then not a generated column.
4210 # Otherwise, s = stored. (Other values might be added in the
4211 # future.)
4212 if generated not in (None, "", b"\x00"):
4213 computed = dict(
4214 sqltext=default, persisted=generated in ("s", b"s")
4215 )
4216 default = None
4217 else:
4218 computed = None
4219
4220 # adjust the default value
4221 autoincrement = False
4222 if default is not None:
4223 match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
4224 if match is not None:
4225 if issubclass(coltype._type_affinity, sqltypes.Integer):
4226 autoincrement = True
4227 # the default is related to a Sequence
4228 if "." not in match.group(2) and schema is not None:
4229 # unconditionally quote the schema name. this could
4230 # later be enhanced to obey quoting rules /
4231 # "quote schema"
4232 default = (
4233 match.group(1)
4234 + ('"%s"' % schema)
4235 + "."
4236 + match.group(2)
4237 + match.group(3)
4238 )
4239
4240 column_info = {
4241 "name": name,
4242 "type": coltype,
4243 "nullable": nullable,
4244 "default": default,
4245 "autoincrement": autoincrement or identity is not None,
4246 "comment": row_dict["comment"],
4247 }
4248 if computed is not None:
4249 column_info["computed"] = computed
4250 if identity is not None:
4251 column_info["identity"] = identity
4252
4253 table_cols.append(column_info)
4254
4255 return columns
4256
4257 @lru_cache()
4258 def _table_oids_query(self, schema, has_filter_names, scope, kind):
4259 relkinds = self._kind_to_relkinds(kind)
4260 oid_q = select(
4261 pg_catalog.pg_class.c.oid, pg_catalog.pg_class.c.relname
4262 ).where(self._pg_class_relkind_condition(relkinds))
4263 oid_q = self._pg_class_filter_scope_schema(oid_q, schema, scope=scope)
4264
4265 if has_filter_names:
4266 oid_q = oid_q.where(
4267 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
4268 )
4269 return oid_q
4270
4271 @reflection.flexi_cache(
4272 ("schema", InternalTraversal.dp_string),
4273 ("filter_names", InternalTraversal.dp_string_list),
4274 ("kind", InternalTraversal.dp_plain_obj),
4275 ("scope", InternalTraversal.dp_plain_obj),
4276 )
4277 def _get_table_oids(
4278 self, connection, schema, filter_names, scope, kind, **kw
4279 ):
4280 has_filter_names, params = self._prepare_filter_names(filter_names)
4281 oid_q = self._table_oids_query(schema, has_filter_names, scope, kind)
4282 result = connection.execute(oid_q, params)
4283 return result.all()
4284
4285 @util.memoized_property
4286 def _constraint_query(self):
4287 if self.server_version_info >= (11, 0):
4288 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts
4289 else:
4290 indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts")
4291
4292 if self.server_version_info >= (15,):
4293 indnullsnotdistinct = pg_catalog.pg_index.c.indnullsnotdistinct
4294 else:
4295 indnullsnotdistinct = sql.false().label("indnullsnotdistinct")
4296
4297 con_sq = (
4298 select(
4299 pg_catalog.pg_constraint.c.conrelid,
4300 pg_catalog.pg_constraint.c.conname,
4301 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"),
4302 sql.func.generate_subscripts(
4303 pg_catalog.pg_index.c.indkey, 1
4304 ).label("ord"),
4305 indnkeyatts,
4306 indnullsnotdistinct,
4307 pg_catalog.pg_description.c.description,
4308 )
4309 .join(
4310 pg_catalog.pg_index,
4311 pg_catalog.pg_constraint.c.conindid
4312 == pg_catalog.pg_index.c.indexrelid,
4313 )
4314 .outerjoin(
4315 pg_catalog.pg_description,
4316 pg_catalog.pg_description.c.objoid
4317 == pg_catalog.pg_constraint.c.oid,
4318 )
4319 .where(
4320 pg_catalog.pg_constraint.c.contype == bindparam("contype"),
4321 pg_catalog.pg_constraint.c.conrelid.in_(bindparam("oids")),
4322 # NOTE: filtering also on pg_index.indrelid for oids does
4323 # not seem to have a performance effect, but it may be an
4324 # option if perf problems are reported
4325 )
4326 .subquery("con")
4327 )
4328
4329 attr_sq = (
4330 select(
4331 con_sq.c.conrelid,
4332 con_sq.c.conname,
4333 con_sq.c.description,
4334 con_sq.c.ord,
4335 con_sq.c.indnkeyatts,
4336 con_sq.c.indnullsnotdistinct,
4337 pg_catalog.pg_attribute.c.attname,
4338 )
4339 .select_from(pg_catalog.pg_attribute)
4340 .join(
4341 con_sq,
4342 sql.and_(
4343 pg_catalog.pg_attribute.c.attnum == con_sq.c.attnum,
4344 pg_catalog.pg_attribute.c.attrelid == con_sq.c.conrelid,
4345 ),
4346 )
4347 .where(
4348 # NOTE: restate the condition here, since pg15 otherwise
4349 # seems to get confused on pscopg2 sometimes, doing
4350 # a sequential scan of pg_attribute.
4351 # The condition in the con_sq subquery is not actually needed
4352 # in pg15, but it may be needed in older versions. Keeping it
4353 # does not seems to have any impact in any case.
4354 con_sq.c.conrelid.in_(bindparam("oids"))
4355 )
4356 .subquery("attr")
4357 )
4358
4359 return (
4360 select(
4361 attr_sq.c.conrelid,
4362 sql.func.array_agg(
4363 # NOTE: cast since some postgresql derivatives may
4364 # not support array_agg on the name type
4365 aggregate_order_by(
4366 attr_sq.c.attname.cast(TEXT), attr_sq.c.ord
4367 )
4368 ).label("cols"),
4369 attr_sq.c.conname,
4370 sql.func.min(attr_sq.c.description).label("description"),
4371 sql.func.min(attr_sq.c.indnkeyatts).label("indnkeyatts"),
4372 sql.func.bool_and(attr_sq.c.indnullsnotdistinct).label(
4373 "indnullsnotdistinct"
4374 ),
4375 )
4376 .group_by(attr_sq.c.conrelid, attr_sq.c.conname)
4377 .order_by(attr_sq.c.conrelid, attr_sq.c.conname)
4378 )
4379
4380 def _reflect_constraint(
4381 self, connection, contype, schema, filter_names, scope, kind, **kw
4382 ):
4383 # used to reflect primary and unique constraint
4384 table_oids = self._get_table_oids(
4385 connection, schema, filter_names, scope, kind, **kw
4386 )
4387 batches = list(table_oids)
4388 is_unique = contype == "u"
4389
4390 while batches:
4391 batch = batches[0:3000]
4392 batches[0:3000] = []
4393
4394 result = connection.execute(
4395 self._constraint_query,
4396 {"oids": [r[0] for r in batch], "contype": contype},
4397 ).mappings()
4398
4399 result_by_oid = defaultdict(list)
4400 for row_dict in result:
4401 result_by_oid[row_dict["conrelid"]].append(row_dict)
4402
4403 for oid, tablename in batch:
4404 for_oid = result_by_oid.get(oid, ())
4405 if for_oid:
4406 for row in for_oid:
4407 # See note in get_multi_indexes
4408 all_cols = row["cols"]
4409 indnkeyatts = row["indnkeyatts"]
4410 if len(all_cols) > indnkeyatts:
4411 inc_cols = all_cols[indnkeyatts:]
4412 cst_cols = all_cols[:indnkeyatts]
4413 else:
4414 inc_cols = []
4415 cst_cols = all_cols
4416
4417 opts = {}
4418 if self.server_version_info >= (11,):
4419 opts["postgresql_include"] = inc_cols
4420 if is_unique:
4421 opts["postgresql_nulls_not_distinct"] = row[
4422 "indnullsnotdistinct"
4423 ]
4424 yield (
4425 tablename,
4426 cst_cols,
4427 row["conname"],
4428 row["description"],
4429 opts,
4430 )
4431 else:
4432 yield tablename, None, None, None, None
4433
4434 @reflection.cache
4435 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
4436 data = self.get_multi_pk_constraint(
4437 connection,
4438 schema=schema,
4439 filter_names=[table_name],
4440 scope=ObjectScope.ANY,
4441 kind=ObjectKind.ANY,
4442 **kw,
4443 )
4444 return self._value_or_raise(data, table_name, schema)
4445
4446 def get_multi_pk_constraint(
4447 self, connection, schema, filter_names, scope, kind, **kw
4448 ):
4449 result = self._reflect_constraint(
4450 connection, "p", schema, filter_names, scope, kind, **kw
4451 )
4452
4453 # only a single pk can be present for each table. Return an entry
4454 # even if a table has no primary key
4455 default = ReflectionDefaults.pk_constraint
4456
4457 def pk_constraint(pk_name, cols, comment, opts):
4458 info = {
4459 "constrained_columns": cols,
4460 "name": pk_name,
4461 "comment": comment,
4462 }
4463 if opts:
4464 info["dialect_options"] = opts
4465 return info
4466
4467 return (
4468 (
4469 (schema, table_name),
4470 (
4471 pk_constraint(pk_name, cols, comment, opts)
4472 if pk_name is not None
4473 else default()
4474 ),
4475 )
4476 for table_name, cols, pk_name, comment, opts in result
4477 )
4478
4479 @reflection.cache
4480 def get_foreign_keys(
4481 self,
4482 connection,
4483 table_name,
4484 schema=None,
4485 postgresql_ignore_search_path=False,
4486 **kw,
4487 ):
4488 data = self.get_multi_foreign_keys(
4489 connection,
4490 schema=schema,
4491 filter_names=[table_name],
4492 postgresql_ignore_search_path=postgresql_ignore_search_path,
4493 scope=ObjectScope.ANY,
4494 kind=ObjectKind.ANY,
4495 **kw,
4496 )
4497 return self._value_or_raise(data, table_name, schema)
4498
4499 @lru_cache()
4500 def _foreing_key_query(self, schema, has_filter_names, scope, kind):
4501 pg_class_ref = pg_catalog.pg_class.alias("cls_ref")
4502 pg_namespace_ref = pg_catalog.pg_namespace.alias("nsp_ref")
4503 relkinds = self._kind_to_relkinds(kind)
4504 query = (
4505 select(
4506 pg_catalog.pg_class.c.relname,
4507 pg_catalog.pg_constraint.c.conname,
4508 # NOTE: avoid calling pg_get_constraintdef when not needed
4509 # to speed up the query
4510 sql.case(
4511 (
4512 pg_catalog.pg_constraint.c.oid.is_not(None),
4513 pg_catalog.pg_get_constraintdef(
4514 pg_catalog.pg_constraint.c.oid, True
4515 ),
4516 ),
4517 else_=None,
4518 ),
4519 pg_namespace_ref.c.nspname,
4520 pg_catalog.pg_description.c.description,
4521 )
4522 .select_from(pg_catalog.pg_class)
4523 .outerjoin(
4524 pg_catalog.pg_constraint,
4525 sql.and_(
4526 pg_catalog.pg_class.c.oid
4527 == pg_catalog.pg_constraint.c.conrelid,
4528 pg_catalog.pg_constraint.c.contype == "f",
4529 ),
4530 )
4531 .outerjoin(
4532 pg_class_ref,
4533 pg_class_ref.c.oid == pg_catalog.pg_constraint.c.confrelid,
4534 )
4535 .outerjoin(
4536 pg_namespace_ref,
4537 pg_class_ref.c.relnamespace == pg_namespace_ref.c.oid,
4538 )
4539 .outerjoin(
4540 pg_catalog.pg_description,
4541 pg_catalog.pg_description.c.objoid
4542 == pg_catalog.pg_constraint.c.oid,
4543 )
4544 .order_by(
4545 pg_catalog.pg_class.c.relname,
4546 pg_catalog.pg_constraint.c.conname,
4547 )
4548 .where(self._pg_class_relkind_condition(relkinds))
4549 )
4550 query = self._pg_class_filter_scope_schema(query, schema, scope)
4551 if has_filter_names:
4552 query = query.where(
4553 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
4554 )
4555 return query
4556
4557 @util.memoized_property
4558 def _fk_regex_pattern(self):
4559 # optionally quoted token
4560 qtoken = r'(?:"(?:[^"]|"")+"|[\w]+?)'
4561
4562 # https://www.postgresql.org/docs/current/static/sql-createtable.html
4563 return re.compile(
4564 r"FOREIGN KEY \((.*?)\) "
4565 rf"REFERENCES (?:({qtoken})\.)?({qtoken})\(((?:{qtoken}(?: *, *)?)+)\)" # noqa: E501
4566 r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?"
4567 r"[\s]?(?:ON (UPDATE|DELETE) "
4568 r"(CASCADE|RESTRICT|NO ACTION|"
4569 r"SET (?:NULL|DEFAULT)(?:\s\(.+\))?)+)?"
4570 r"[\s]?(?:ON (UPDATE|DELETE) "
4571 r"(CASCADE|RESTRICT|NO ACTION|"
4572 r"SET (?:NULL|DEFAULT)(?:\s\(.+\))?)+)?"
4573 r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?"
4574 r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?"
4575 )
4576
4577 def _parse_fk(self, condef):
4578 FK_REGEX = self._fk_regex_pattern
4579 m = re.search(FK_REGEX, condef).groups()
4580
4581 (
4582 constrained_columns,
4583 referred_schema,
4584 referred_table,
4585 referred_columns,
4586 _,
4587 match,
4588 upddelkey1,
4589 upddelval1,
4590 upddelkey2,
4591 upddelval2,
4592 deferrable,
4593 _,
4594 initially,
4595 ) = m
4596
4597 onupdate = (
4598 upddelval1
4599 if upddelkey1 == "UPDATE"
4600 else upddelval2 if upddelkey2 == "UPDATE" else None
4601 )
4602 ondelete = (
4603 upddelval1
4604 if upddelkey1 == "DELETE"
4605 else upddelval2 if upddelkey2 == "DELETE" else None
4606 )
4607
4608 return (
4609 constrained_columns,
4610 referred_schema,
4611 referred_table,
4612 referred_columns,
4613 match,
4614 onupdate,
4615 ondelete,
4616 deferrable,
4617 initially,
4618 )
4619
4620 def get_multi_foreign_keys(
4621 self,
4622 connection,
4623 schema,
4624 filter_names,
4625 scope,
4626 kind,
4627 postgresql_ignore_search_path=False,
4628 **kw,
4629 ):
4630 preparer = self.identifier_preparer
4631
4632 has_filter_names, params = self._prepare_filter_names(filter_names)
4633 query = self._foreing_key_query(schema, has_filter_names, scope, kind)
4634 result = connection.execute(query, params)
4635
4636 fkeys = defaultdict(list)
4637 default = ReflectionDefaults.foreign_keys
4638 for table_name, conname, condef, conschema, comment in result:
4639 # ensure that each table has an entry, even if it has
4640 # no foreign keys
4641 if conname is None:
4642 fkeys[(schema, table_name)] = default()
4643 continue
4644 table_fks = fkeys[(schema, table_name)]
4645
4646 (
4647 constrained_columns,
4648 referred_schema,
4649 referred_table,
4650 referred_columns,
4651 match,
4652 onupdate,
4653 ondelete,
4654 deferrable,
4655 initially,
4656 ) = self._parse_fk(condef)
4657
4658 if deferrable is not None:
4659 deferrable = True if deferrable == "DEFERRABLE" else False
4660 constrained_columns = [
4661 preparer._unquote_identifier(x)
4662 for x in re.split(r"\s*,\s*", constrained_columns)
4663 ]
4664
4665 if postgresql_ignore_search_path:
4666 # when ignoring search path, we use the actual schema
4667 # provided it isn't the "default" schema
4668 if conschema != self.default_schema_name:
4669 referred_schema = conschema
4670 else:
4671 referred_schema = schema
4672 elif referred_schema:
4673 # referred_schema is the schema that we regexp'ed from
4674 # pg_get_constraintdef(). If the schema is in the search
4675 # path, pg_get_constraintdef() will give us None.
4676 referred_schema = preparer._unquote_identifier(referred_schema)
4677 elif schema is not None and schema == conschema:
4678 # If the actual schema matches the schema of the table
4679 # we're reflecting, then we will use that.
4680 referred_schema = schema
4681
4682 referred_table = preparer._unquote_identifier(referred_table)
4683 referred_columns = [
4684 preparer._unquote_identifier(x)
4685 for x in re.split(r"\s*,\s", referred_columns)
4686 ]
4687 options = {
4688 k: v
4689 for k, v in [
4690 ("onupdate", onupdate),
4691 ("ondelete", ondelete),
4692 ("initially", initially),
4693 ("deferrable", deferrable),
4694 ("match", match),
4695 ]
4696 if v is not None and v != "NO ACTION"
4697 }
4698 fkey_d = {
4699 "name": conname,
4700 "constrained_columns": constrained_columns,
4701 "referred_schema": referred_schema,
4702 "referred_table": referred_table,
4703 "referred_columns": referred_columns,
4704 "options": options,
4705 "comment": comment,
4706 }
4707 table_fks.append(fkey_d)
4708 return fkeys.items()
4709
4710 @reflection.cache
4711 def get_indexes(self, connection, table_name, schema=None, **kw):
4712 data = self.get_multi_indexes(
4713 connection,
4714 schema=schema,
4715 filter_names=[table_name],
4716 scope=ObjectScope.ANY,
4717 kind=ObjectKind.ANY,
4718 **kw,
4719 )
4720 return self._value_or_raise(data, table_name, schema)
4721
4722 @util.memoized_property
4723 def _index_query(self):
4724 # NOTE: pg_index is used as from two times to improve performance,
4725 # since extraing all the index information from `idx_sq` to avoid
4726 # the second pg_index use leads to a worse performing query in
4727 # particular when querying for a single table (as of pg 17)
4728 # NOTE: repeating oids clause improve query performance
4729
4730 # subquery to get the columns
4731 idx_sq = (
4732 select(
4733 pg_catalog.pg_index.c.indexrelid,
4734 pg_catalog.pg_index.c.indrelid,
4735 sql.func.unnest(pg_catalog.pg_index.c.indkey).label("attnum"),
4736 sql.func.unnest(pg_catalog.pg_index.c.indclass).label(
4737 "att_opclass"
4738 ),
4739 sql.func.generate_subscripts(
4740 pg_catalog.pg_index.c.indkey, 1
4741 ).label("ord"),
4742 )
4743 .where(
4744 ~pg_catalog.pg_index.c.indisprimary,
4745 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")),
4746 )
4747 .subquery("idx")
4748 )
4749
4750 attr_sq = (
4751 select(
4752 idx_sq.c.indexrelid,
4753 idx_sq.c.indrelid,
4754 idx_sq.c.ord,
4755 # NOTE: always using pg_get_indexdef is too slow so just
4756 # invoke when the element is an expression
4757 sql.case(
4758 (
4759 idx_sq.c.attnum == 0,
4760 pg_catalog.pg_get_indexdef(
4761 idx_sq.c.indexrelid, idx_sq.c.ord + 1, True
4762 ),
4763 ),
4764 # NOTE: need to cast this since attname is of type "name"
4765 # that's limited to 63 bytes, while pg_get_indexdef
4766 # returns "text" so its output may get cut
4767 else_=pg_catalog.pg_attribute.c.attname.cast(TEXT),
4768 ).label("element"),
4769 (idx_sq.c.attnum == 0).label("is_expr"),
4770 pg_catalog.pg_opclass.c.opcname,
4771 pg_catalog.pg_opclass.c.opcdefault,
4772 )
4773 .select_from(idx_sq)
4774 .outerjoin(
4775 # do not remove rows where idx_sq.c.attnum is 0
4776 pg_catalog.pg_attribute,
4777 sql.and_(
4778 pg_catalog.pg_attribute.c.attnum == idx_sq.c.attnum,
4779 pg_catalog.pg_attribute.c.attrelid == idx_sq.c.indrelid,
4780 ),
4781 )
4782 .outerjoin(
4783 pg_catalog.pg_opclass,
4784 pg_catalog.pg_opclass.c.oid == idx_sq.c.att_opclass,
4785 )
4786 .where(idx_sq.c.indrelid.in_(bindparam("oids")))
4787 .subquery("idx_attr")
4788 )
4789
4790 cols_sq = (
4791 select(
4792 attr_sq.c.indexrelid,
4793 sql.func.min(attr_sq.c.indrelid),
4794 sql.func.array_agg(
4795 aggregate_order_by(attr_sq.c.element, attr_sq.c.ord)
4796 ).label("elements"),
4797 sql.func.array_agg(
4798 aggregate_order_by(attr_sq.c.is_expr, attr_sq.c.ord)
4799 ).label("elements_is_expr"),
4800 sql.func.array_agg(
4801 aggregate_order_by(attr_sq.c.opcname, attr_sq.c.ord)
4802 ).label("elements_opclass"),
4803 sql.func.array_agg(
4804 aggregate_order_by(attr_sq.c.opcdefault, attr_sq.c.ord)
4805 ).label("elements_opdefault"),
4806 )
4807 .group_by(attr_sq.c.indexrelid)
4808 .subquery("idx_cols")
4809 )
4810
4811 if self.server_version_info >= (11, 0):
4812 indnkeyatts = pg_catalog.pg_index.c.indnkeyatts
4813 else:
4814 indnkeyatts = pg_catalog.pg_index.c.indnatts.label("indnkeyatts")
4815
4816 if self.server_version_info >= (15,):
4817 nulls_not_distinct = pg_catalog.pg_index.c.indnullsnotdistinct
4818 else:
4819 nulls_not_distinct = sql.false().label("indnullsnotdistinct")
4820
4821 return (
4822 select(
4823 pg_catalog.pg_index.c.indrelid,
4824 pg_catalog.pg_class.c.relname,
4825 pg_catalog.pg_index.c.indisunique,
4826 pg_catalog.pg_constraint.c.conrelid.is_not(None).label(
4827 "has_constraint"
4828 ),
4829 pg_catalog.pg_index.c.indoption,
4830 pg_catalog.pg_class.c.reloptions,
4831 pg_catalog.pg_am.c.amname,
4832 # NOTE: pg_get_expr is very fast so this case has almost no
4833 # performance impact
4834 sql.case(
4835 (
4836 pg_catalog.pg_index.c.indpred.is_not(None),
4837 pg_catalog.pg_get_expr(
4838 pg_catalog.pg_index.c.indpred,
4839 pg_catalog.pg_index.c.indrelid,
4840 ),
4841 ),
4842 else_=None,
4843 ).label("filter_definition"),
4844 indnkeyatts,
4845 nulls_not_distinct,
4846 cols_sq.c.elements,
4847 cols_sq.c.elements_is_expr,
4848 cols_sq.c.elements_opclass,
4849 cols_sq.c.elements_opdefault,
4850 )
4851 .select_from(pg_catalog.pg_index)
4852 .where(
4853 pg_catalog.pg_index.c.indrelid.in_(bindparam("oids")),
4854 ~pg_catalog.pg_index.c.indisprimary,
4855 )
4856 .join(
4857 pg_catalog.pg_class,
4858 pg_catalog.pg_index.c.indexrelid == pg_catalog.pg_class.c.oid,
4859 )
4860 .join(
4861 pg_catalog.pg_am,
4862 pg_catalog.pg_class.c.relam == pg_catalog.pg_am.c.oid,
4863 )
4864 .outerjoin(
4865 cols_sq,
4866 pg_catalog.pg_index.c.indexrelid == cols_sq.c.indexrelid,
4867 )
4868 .outerjoin(
4869 pg_catalog.pg_constraint,
4870 sql.and_(
4871 pg_catalog.pg_index.c.indrelid
4872 == pg_catalog.pg_constraint.c.conrelid,
4873 pg_catalog.pg_index.c.indexrelid
4874 == pg_catalog.pg_constraint.c.conindid,
4875 pg_catalog.pg_constraint.c.contype
4876 == sql.any_(_array.array(("p", "u", "x"))),
4877 ),
4878 )
4879 .order_by(
4880 pg_catalog.pg_index.c.indrelid, pg_catalog.pg_class.c.relname
4881 )
4882 )
4883
4884 def get_multi_indexes(
4885 self, connection, schema, filter_names, scope, kind, **kw
4886 ):
4887 table_oids = self._get_table_oids(
4888 connection, schema, filter_names, scope, kind, **kw
4889 )
4890
4891 indexes = defaultdict(list)
4892 default = ReflectionDefaults.indexes
4893
4894 batches = list(table_oids)
4895
4896 while batches:
4897 batch = batches[0:3000]
4898 batches[0:3000] = []
4899
4900 result = connection.execute(
4901 self._index_query, {"oids": [r[0] for r in batch]}
4902 ).mappings()
4903
4904 result_by_oid = defaultdict(list)
4905 for row_dict in result:
4906 result_by_oid[row_dict["indrelid"]].append(row_dict)
4907
4908 for oid, table_name in batch:
4909 if oid not in result_by_oid:
4910 # ensure that each table has an entry, even if reflection
4911 # is skipped because not supported
4912 indexes[(schema, table_name)] = default()
4913 continue
4914
4915 for row in result_by_oid[oid]:
4916 index_name = row["relname"]
4917
4918 table_indexes = indexes[(schema, table_name)]
4919
4920 all_elements = row["elements"]
4921 all_elements_is_expr = row["elements_is_expr"]
4922 all_elements_opclass = row["elements_opclass"]
4923 all_elements_opdefault = row["elements_opdefault"]
4924 indnkeyatts = row["indnkeyatts"]
4925 # "The number of key columns in the index, not counting any
4926 # included columns, which are merely stored and do not
4927 # participate in the index semantics"
4928 if len(all_elements) > indnkeyatts:
4929 # this is a "covering index" which has INCLUDE columns
4930 # as well as regular index columns
4931 inc_cols = all_elements[indnkeyatts:]
4932 idx_elements = all_elements[:indnkeyatts]
4933 idx_elements_is_expr = all_elements_is_expr[
4934 :indnkeyatts
4935 ]
4936 # postgresql does not support expression on included
4937 # columns as of v14: "ERROR: expressions are not
4938 # supported in included columns".
4939 assert all(
4940 not is_expr
4941 for is_expr in all_elements_is_expr[indnkeyatts:]
4942 )
4943 idx_elements_opclass = all_elements_opclass[
4944 :indnkeyatts
4945 ]
4946 idx_elements_opdefault = all_elements_opdefault[
4947 :indnkeyatts
4948 ]
4949 else:
4950 idx_elements = all_elements
4951 idx_elements_is_expr = all_elements_is_expr
4952 inc_cols = []
4953 idx_elements_opclass = all_elements_opclass
4954 idx_elements_opdefault = all_elements_opdefault
4955
4956 index = {"name": index_name, "unique": row["indisunique"]}
4957 if any(idx_elements_is_expr):
4958 index["column_names"] = [
4959 None if is_expr else expr
4960 for expr, is_expr in zip(
4961 idx_elements, idx_elements_is_expr
4962 )
4963 ]
4964 index["expressions"] = idx_elements
4965 else:
4966 index["column_names"] = idx_elements
4967
4968 dialect_options = {}
4969
4970 if not all(idx_elements_opdefault):
4971 dialect_options["postgresql_ops"] = {
4972 name: opclass
4973 for name, opclass, is_default in zip(
4974 idx_elements,
4975 idx_elements_opclass,
4976 idx_elements_opdefault,
4977 )
4978 if not is_default
4979 }
4980
4981 sorting = {}
4982 for col_index, col_flags in enumerate(row["indoption"]):
4983 col_sorting = ()
4984 # try to set flags only if they differ from PG
4985 # defaults...
4986 if col_flags & 0x01:
4987 col_sorting += ("desc",)
4988 if not (col_flags & 0x02):
4989 col_sorting += ("nulls_last",)
4990 else:
4991 if col_flags & 0x02:
4992 col_sorting += ("nulls_first",)
4993 if col_sorting:
4994 sorting[idx_elements[col_index]] = col_sorting
4995 if sorting:
4996 index["column_sorting"] = sorting
4997 if row["has_constraint"]:
4998 index["duplicates_constraint"] = index_name
4999
5000 if row["reloptions"]:
5001 dialect_options["postgresql_with"] = dict(
5002 [
5003 option.split("=", 1)
5004 for option in row["reloptions"]
5005 ]
5006 )
5007 # it *might* be nice to include that this is 'btree' in the
5008 # reflection info. But we don't want an Index object
5009 # to have a ``postgresql_using`` in it that is just the
5010 # default, so for the moment leaving this out.
5011 amname = row["amname"]
5012 if amname != "btree":
5013 dialect_options["postgresql_using"] = row["amname"]
5014 if row["filter_definition"]:
5015 dialect_options["postgresql_where"] = row[
5016 "filter_definition"
5017 ]
5018 if self.server_version_info >= (11,):
5019 # NOTE: this is legacy, this is part of
5020 # dialect_options now as of #7382
5021 index["include_columns"] = inc_cols
5022 dialect_options["postgresql_include"] = inc_cols
5023 if row["indnullsnotdistinct"]:
5024 # the default is False, so ignore it.
5025 dialect_options["postgresql_nulls_not_distinct"] = row[
5026 "indnullsnotdistinct"
5027 ]
5028
5029 if dialect_options:
5030 index["dialect_options"] = dialect_options
5031
5032 table_indexes.append(index)
5033 return indexes.items()
5034
5035 @reflection.cache
5036 def get_unique_constraints(
5037 self, connection, table_name, schema=None, **kw
5038 ):
5039 data = self.get_multi_unique_constraints(
5040 connection,
5041 schema=schema,
5042 filter_names=[table_name],
5043 scope=ObjectScope.ANY,
5044 kind=ObjectKind.ANY,
5045 **kw,
5046 )
5047 return self._value_or_raise(data, table_name, schema)
5048
5049 def get_multi_unique_constraints(
5050 self,
5051 connection,
5052 schema,
5053 filter_names,
5054 scope,
5055 kind,
5056 **kw,
5057 ):
5058 result = self._reflect_constraint(
5059 connection, "u", schema, filter_names, scope, kind, **kw
5060 )
5061
5062 # each table can have multiple unique constraints
5063 uniques = defaultdict(list)
5064 default = ReflectionDefaults.unique_constraints
5065 for table_name, cols, con_name, comment, options in result:
5066 # ensure a list is created for each table. leave it empty if
5067 # the table has no unique constraint
5068 if con_name is None:
5069 uniques[(schema, table_name)] = default()
5070 continue
5071
5072 uc_dict = {
5073 "column_names": cols,
5074 "name": con_name,
5075 "comment": comment,
5076 }
5077 if options:
5078 uc_dict["dialect_options"] = options
5079
5080 uniques[(schema, table_name)].append(uc_dict)
5081 return uniques.items()
5082
5083 @reflection.cache
5084 def get_table_comment(self, connection, table_name, schema=None, **kw):
5085 data = self.get_multi_table_comment(
5086 connection,
5087 schema,
5088 [table_name],
5089 scope=ObjectScope.ANY,
5090 kind=ObjectKind.ANY,
5091 **kw,
5092 )
5093 return self._value_or_raise(data, table_name, schema)
5094
5095 @lru_cache()
5096 def _comment_query(self, schema, has_filter_names, scope, kind):
5097 relkinds = self._kind_to_relkinds(kind)
5098 query = (
5099 select(
5100 pg_catalog.pg_class.c.relname,
5101 pg_catalog.pg_description.c.description,
5102 )
5103 .select_from(pg_catalog.pg_class)
5104 .outerjoin(
5105 pg_catalog.pg_description,
5106 sql.and_(
5107 pg_catalog.pg_class.c.oid
5108 == pg_catalog.pg_description.c.objoid,
5109 pg_catalog.pg_description.c.objsubid == 0,
5110 pg_catalog.pg_description.c.classoid
5111 == sql.func.cast("pg_catalog.pg_class", REGCLASS),
5112 ),
5113 )
5114 .where(self._pg_class_relkind_condition(relkinds))
5115 )
5116 query = self._pg_class_filter_scope_schema(query, schema, scope)
5117 if has_filter_names:
5118 query = query.where(
5119 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
5120 )
5121 return query
5122
5123 def get_multi_table_comment(
5124 self, connection, schema, filter_names, scope, kind, **kw
5125 ):
5126 has_filter_names, params = self._prepare_filter_names(filter_names)
5127 query = self._comment_query(schema, has_filter_names, scope, kind)
5128 result = connection.execute(query, params)
5129
5130 default = ReflectionDefaults.table_comment
5131 return (
5132 (
5133 (schema, table),
5134 {"text": comment} if comment is not None else default(),
5135 )
5136 for table, comment in result
5137 )
5138
5139 @reflection.cache
5140 def get_check_constraints(self, connection, table_name, schema=None, **kw):
5141 data = self.get_multi_check_constraints(
5142 connection,
5143 schema,
5144 [table_name],
5145 scope=ObjectScope.ANY,
5146 kind=ObjectKind.ANY,
5147 **kw,
5148 )
5149 return self._value_or_raise(data, table_name, schema)
5150
5151 @lru_cache()
5152 def _check_constraint_query(self, schema, has_filter_names, scope, kind):
5153 relkinds = self._kind_to_relkinds(kind)
5154 query = (
5155 select(
5156 pg_catalog.pg_class.c.relname,
5157 pg_catalog.pg_constraint.c.conname,
5158 # NOTE: avoid calling pg_get_constraintdef when not needed
5159 # to speed up the query
5160 sql.case(
5161 (
5162 pg_catalog.pg_constraint.c.oid.is_not(None),
5163 pg_catalog.pg_get_constraintdef(
5164 pg_catalog.pg_constraint.c.oid, True
5165 ),
5166 ),
5167 else_=None,
5168 ),
5169 pg_catalog.pg_description.c.description,
5170 )
5171 .select_from(pg_catalog.pg_class)
5172 .outerjoin(
5173 pg_catalog.pg_constraint,
5174 sql.and_(
5175 pg_catalog.pg_class.c.oid
5176 == pg_catalog.pg_constraint.c.conrelid,
5177 pg_catalog.pg_constraint.c.contype == "c",
5178 ),
5179 )
5180 .outerjoin(
5181 pg_catalog.pg_description,
5182 pg_catalog.pg_description.c.objoid
5183 == pg_catalog.pg_constraint.c.oid,
5184 )
5185 .order_by(
5186 pg_catalog.pg_class.c.relname,
5187 pg_catalog.pg_constraint.c.conname,
5188 )
5189 .where(self._pg_class_relkind_condition(relkinds))
5190 )
5191 query = self._pg_class_filter_scope_schema(query, schema, scope)
5192 if has_filter_names:
5193 query = query.where(
5194 pg_catalog.pg_class.c.relname.in_(bindparam("filter_names"))
5195 )
5196 return query
5197
5198 def get_multi_check_constraints(
5199 self, connection, schema, filter_names, scope, kind, **kw
5200 ):
5201 has_filter_names, params = self._prepare_filter_names(filter_names)
5202 query = self._check_constraint_query(
5203 schema, has_filter_names, scope, kind
5204 )
5205 result = connection.execute(query, params)
5206
5207 check_constraints = defaultdict(list)
5208 default = ReflectionDefaults.check_constraints
5209 for table_name, check_name, src, comment in result:
5210 # only two cases for check_name and src: both null or both defined
5211 if check_name is None and src is None:
5212 check_constraints[(schema, table_name)] = default()
5213 continue
5214 # samples:
5215 # "CHECK (((a > 1) AND (a < 5)))"
5216 # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))"
5217 # "CHECK (((a > 1) AND (a < 5))) NOT VALID"
5218 # "CHECK (some_boolean_function(a))"
5219 # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)"
5220 # "CHECK (a NOT NULL) NO INHERIT"
5221 # "CHECK (a NOT NULL) NO INHERIT NOT VALID"
5222
5223 m = re.match(
5224 r"^CHECK *\((.+)\)( NO INHERIT)?( NOT VALID)?$",
5225 src,
5226 flags=re.DOTALL,
5227 )
5228 if not m:
5229 util.warn("Could not parse CHECK constraint text: %r" % src)
5230 sqltext = ""
5231 else:
5232 sqltext = re.compile(
5233 r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL
5234 ).sub(r"\1", m.group(1))
5235 entry = {
5236 "name": check_name,
5237 "sqltext": sqltext,
5238 "comment": comment,
5239 }
5240 if m:
5241 do = {}
5242 if " NOT VALID" in m.groups():
5243 do["not_valid"] = True
5244 if " NO INHERIT" in m.groups():
5245 do["no_inherit"] = True
5246 if do:
5247 entry["dialect_options"] = do
5248
5249 check_constraints[(schema, table_name)].append(entry)
5250 return check_constraints.items()
5251
5252 def _pg_type_filter_schema(self, query, schema):
5253 if schema is None:
5254 query = query.where(
5255 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid),
5256 # ignore pg_catalog schema
5257 pg_catalog.pg_namespace.c.nspname != "pg_catalog",
5258 )
5259 elif schema != "*":
5260 query = query.where(pg_catalog.pg_namespace.c.nspname == schema)
5261 return query
5262
5263 @lru_cache()
5264 def _enum_query(self, schema):
5265 lbl_agg_sq = (
5266 select(
5267 pg_catalog.pg_enum.c.enumtypid,
5268 sql.func.array_agg(
5269 aggregate_order_by(
5270 # NOTE: cast since some postgresql derivatives may
5271 # not support array_agg on the name type
5272 pg_catalog.pg_enum.c.enumlabel.cast(TEXT),
5273 pg_catalog.pg_enum.c.enumsortorder,
5274 )
5275 ).label("labels"),
5276 )
5277 .group_by(pg_catalog.pg_enum.c.enumtypid)
5278 .subquery("lbl_agg")
5279 )
5280
5281 query = (
5282 select(
5283 pg_catalog.pg_type.c.typname.label("name"),
5284 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label(
5285 "visible"
5286 ),
5287 pg_catalog.pg_namespace.c.nspname.label("schema"),
5288 lbl_agg_sq.c.labels.label("labels"),
5289 )
5290 .join(
5291 pg_catalog.pg_namespace,
5292 pg_catalog.pg_namespace.c.oid
5293 == pg_catalog.pg_type.c.typnamespace,
5294 )
5295 .outerjoin(
5296 lbl_agg_sq, pg_catalog.pg_type.c.oid == lbl_agg_sq.c.enumtypid
5297 )
5298 .where(pg_catalog.pg_type.c.typtype == "e")
5299 .order_by(
5300 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname
5301 )
5302 )
5303
5304 return self._pg_type_filter_schema(query, schema)
5305
5306 @reflection.cache
5307 def _load_enums(self, connection, schema=None, **kw):
5308 if not self.supports_native_enum:
5309 return []
5310
5311 result = connection.execute(self._enum_query(schema))
5312
5313 enums = []
5314 for name, visible, schema, labels in result:
5315 enums.append(
5316 {
5317 "name": name,
5318 "schema": schema,
5319 "visible": visible,
5320 "labels": [] if labels is None else labels,
5321 }
5322 )
5323 return enums
5324
5325 @lru_cache()
5326 def _domain_query(self, schema):
5327 con_sq = (
5328 select(
5329 pg_catalog.pg_constraint.c.contypid,
5330 sql.func.array_agg(
5331 pg_catalog.pg_get_constraintdef(
5332 pg_catalog.pg_constraint.c.oid, True
5333 )
5334 ).label("condefs"),
5335 sql.func.array_agg(
5336 # NOTE: cast since some postgresql derivatives may
5337 # not support array_agg on the name type
5338 pg_catalog.pg_constraint.c.conname.cast(TEXT)
5339 ).label("connames"),
5340 )
5341 # The domain this constraint is on; zero if not a domain constraint
5342 .where(pg_catalog.pg_constraint.c.contypid != 0)
5343 .group_by(pg_catalog.pg_constraint.c.contypid)
5344 .subquery("domain_constraints")
5345 )
5346
5347 query = (
5348 select(
5349 pg_catalog.pg_type.c.typname.label("name"),
5350 pg_catalog.format_type(
5351 pg_catalog.pg_type.c.typbasetype,
5352 pg_catalog.pg_type.c.typtypmod,
5353 ).label("attype"),
5354 (~pg_catalog.pg_type.c.typnotnull).label("nullable"),
5355 pg_catalog.pg_type.c.typdefault.label("default"),
5356 pg_catalog.pg_type_is_visible(pg_catalog.pg_type.c.oid).label(
5357 "visible"
5358 ),
5359 pg_catalog.pg_namespace.c.nspname.label("schema"),
5360 con_sq.c.condefs,
5361 con_sq.c.connames,
5362 pg_catalog.pg_collation.c.collname,
5363 )
5364 .join(
5365 pg_catalog.pg_namespace,
5366 pg_catalog.pg_namespace.c.oid
5367 == pg_catalog.pg_type.c.typnamespace,
5368 )
5369 .outerjoin(
5370 pg_catalog.pg_collation,
5371 pg_catalog.pg_type.c.typcollation
5372 == pg_catalog.pg_collation.c.oid,
5373 )
5374 .outerjoin(
5375 con_sq,
5376 pg_catalog.pg_type.c.oid == con_sq.c.contypid,
5377 )
5378 .where(pg_catalog.pg_type.c.typtype == "d")
5379 .order_by(
5380 pg_catalog.pg_namespace.c.nspname, pg_catalog.pg_type.c.typname
5381 )
5382 )
5383 return self._pg_type_filter_schema(query, schema)
5384
5385 @reflection.cache
5386 def _load_domains(self, connection, schema=None, **kw):
5387 result = connection.execute(self._domain_query(schema))
5388
5389 domains: List[ReflectedDomain] = []
5390 for domain in result.mappings():
5391 # strip (30) from character varying(30)
5392 attype = re.search(r"([^\(]+)", domain["attype"]).group(1)
5393 constraints: List[ReflectedDomainConstraint] = []
5394 if domain["connames"]:
5395 # When a domain has multiple CHECK constraints, they will
5396 # be tested in alphabetical order by name.
5397 sorted_constraints = sorted(
5398 zip(domain["connames"], domain["condefs"]),
5399 key=lambda t: t[0],
5400 )
5401 for name, def_ in sorted_constraints:
5402 # constraint is in the form "CHECK (expression)"
5403 # or "NOT NULL". Ignore the "NOT NULL" and
5404 # remove "CHECK (" and the tailing ")".
5405 if def_.casefold().startswith("check"):
5406 check = def_[7:-1]
5407 constraints.append({"name": name, "check": check})
5408 domain_rec: ReflectedDomain = {
5409 "name": domain["name"],
5410 "schema": domain["schema"],
5411 "visible": domain["visible"],
5412 "type": attype,
5413 "nullable": domain["nullable"],
5414 "default": domain["default"],
5415 "constraints": constraints,
5416 "collation": domain["collname"],
5417 }
5418 domains.append(domain_rec)
5419
5420 return domains
5421
5422 def _set_backslash_escapes(self, connection):
5423 # this method is provided as an override hook for descendant
5424 # dialects (e.g. Redshift), so removing it may break them
5425 std_string = connection.exec_driver_sql(
5426 "show standard_conforming_strings"
5427 ).scalar()
5428 self._backslash_escapes = std_string == "off"