Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/base.py: 27%
1206 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1# postgresql/base.py
2# Copyright (C) 2005-2023 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8r"""
9.. dialect:: postgresql
10 :name: PostgreSQL
11 :full_support: 9.6, 10, 11, 12, 13, 14
12 :normal_support: 9.6+
13 :best_effort: 8+
15.. _postgresql_sequences:
17Sequences/SERIAL/IDENTITY
18-------------------------
20PostgreSQL supports sequences, and SQLAlchemy uses these as the default means
21of creating new primary key values for integer-based primary key columns. When
22creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for
23integer-based primary key columns, which generates a sequence and server side
24default corresponding to the column.
26To specify a specific named sequence to be used for primary key generation,
27use the :func:`~sqlalchemy.schema.Sequence` construct::
29 Table('sometable', metadata,
30 Column('id', Integer, Sequence('some_id_seq'), primary_key=True)
31 )
33When SQLAlchemy issues a single INSERT statement, to fulfill the contract of
34having the "last insert identifier" available, a RETURNING clause is added to
35the INSERT statement which specifies the primary key columns should be
36returned after the statement completes. The RETURNING functionality only takes
37place if PostgreSQL 8.2 or later is in use. As a fallback approach, the
38sequence, whether specified explicitly or implicitly via ``SERIAL``, is
39executed independently beforehand, the returned value to be used in the
40subsequent insert. Note that when an
41:func:`~sqlalchemy.sql.expression.insert()` construct is executed using
42"executemany" semantics, the "last inserted identifier" functionality does not
43apply; no RETURNING clause is emitted nor is the sequence pre-executed in this
44case.
46To force the usage of RETURNING by default off, specify the flag
47``implicit_returning=False`` to :func:`_sa.create_engine`.
49PostgreSQL 10 and above IDENTITY columns
50^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
52PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use
53of SERIAL. The :class:`_schema.Identity` construct in a
54:class:`_schema.Column` can be used to control its behavior::
56 from sqlalchemy import Table, Column, MetaData, Integer, Computed
58 metadata = MetaData()
60 data = Table(
61 "data",
62 metadata,
63 Column(
64 'id', Integer, Identity(start=42, cycle=True), primary_key=True
65 ),
66 Column('data', String)
67 )
69The CREATE TABLE for the above :class:`_schema.Table` object would be:
71.. sourcecode:: sql
73 CREATE TABLE data (
74 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE),
75 data VARCHAR,
76 PRIMARY KEY (id)
77 )
79.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
80 in a :class:`_schema.Column` to specify the option of an autoincrementing
81 column.
83.. note::
85 Previous versions of SQLAlchemy did not have built-in support for rendering
86 of IDENTITY, and could use the following compilation hook to replace
87 occurrences of SERIAL with IDENTITY::
89 from sqlalchemy.schema import CreateColumn
90 from sqlalchemy.ext.compiler import compiles
93 @compiles(CreateColumn, 'postgresql')
94 def use_identity(element, compiler, **kw):
95 text = compiler.visit_create_column(element, **kw)
96 text = text.replace(
97 "SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY"
98 )
99 return text
101 Using the above, a table such as::
103 t = Table(
104 't', m,
105 Column('id', Integer, primary_key=True),
106 Column('data', String)
107 )
109 Will generate on the backing database as::
111 CREATE TABLE t (
112 id INT GENERATED BY DEFAULT AS IDENTITY,
113 data VARCHAR,
114 PRIMARY KEY (id)
115 )
117.. _postgresql_ss_cursors:
119Server Side Cursors
120-------------------
122Server-side cursor support is available for the psycopg2, asyncpg
123dialects and may also be available in others.
125Server side cursors are enabled on a per-statement basis by using the
126:paramref:`.Connection.execution_options.stream_results` connection execution
127option::
129 with engine.connect() as conn:
130 result = conn.execution_options(stream_results=True).execute(text("select * from table"))
132Note that some kinds of SQL statements may not be supported with
133server side cursors; generally, only SQL statements that return rows should be
134used with this option.
136.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated
137 and will be removed in a future release. Please use the
138 :paramref:`_engine.Connection.stream_results` execution option for
139 unbuffered cursor support.
141.. seealso::
143 :ref:`engine_stream_results`
145.. _postgresql_isolation_level:
147Transaction Isolation Level
148---------------------------
150Most SQLAlchemy dialects support setting of transaction isolation level
151using the :paramref:`_sa.create_engine.isolation_level` parameter
152at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection`
153level via the :paramref:`.Connection.execution_options.isolation_level`
154parameter.
156For PostgreSQL dialects, this feature works either by making use of the
157DBAPI-specific features, such as psycopg2's isolation level flags which will
158embed the isolation level setting inline with the ``"BEGIN"`` statement, or for
159DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS
160TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement
161emitted by the DBAPI. For the special AUTOCOMMIT isolation level,
162DBAPI-specific techniques are used which is typically an ``.autocommit``
163flag on the DBAPI connection object.
165To set isolation level using :func:`_sa.create_engine`::
167 engine = create_engine(
168 "postgresql+pg8000://scott:tiger@localhost/test",
169 isolation_level = "REPEATABLE READ"
170 )
172To set using per-connection execution options::
174 with engine.connect() as conn:
175 conn = conn.execution_options(
176 isolation_level="REPEATABLE READ"
177 )
178 with conn.begin():
179 # ... work with transaction
181There are also more options for isolation level configurations, such as
182"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
183different isolation level settings. See the discussion at
184:ref:`dbapi_autocommit` for background.
186Valid values for ``isolation_level`` on most PostgreSQL dialects include:
188* ``READ COMMITTED``
189* ``READ UNCOMMITTED``
190* ``REPEATABLE READ``
191* ``SERIALIZABLE``
192* ``AUTOCOMMIT``
194.. seealso::
196 :ref:`dbapi_autocommit`
198 :ref:`postgresql_readonly_deferrable`
200 :ref:`psycopg2_isolation_level`
202 :ref:`pg8000_isolation_level`
204.. _postgresql_readonly_deferrable:
206Setting READ ONLY / DEFERRABLE
207------------------------------
209Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE"
210characteristics of the transaction, which is in addition to the isolation level
211setting. These two attributes can be established either in conjunction with or
212independently of the isolation level by passing the ``postgresql_readonly`` and
213``postgresql_deferrable`` flags with
214:meth:`_engine.Connection.execution_options`. The example below illustrates
215passing the ``"SERIALIZABLE"`` isolation level at the same time as setting
216"READ ONLY" and "DEFERRABLE"::
218 with engine.connect() as conn:
219 conn = conn.execution_options(
220 isolation_level="SERIALIZABLE",
221 postgresql_readonly=True,
222 postgresql_deferrable=True
223 )
224 with conn.begin():
225 # ... work with transaction
227Note that some DBAPIs such as asyncpg only support "readonly" with
228SERIALIZABLE isolation.
230.. versionadded:: 1.4 added support for the ``postgresql_readonly``
231 and ``postgresql_deferrable`` execution options.
233.. _postgresql_reset_on_return:
235Temporary Table / Resource Reset for Connection Pooling
236-------------------------------------------------------
238The :class:`.QueuePool` connection pool implementation used
239by the SQLAlchemy :class:`_sa.Engine` object includes
240:ref:`reset on return <pool_reset_on_return>` behavior that will invoke
241the DBAPI ``.rollback()`` method when connections are returned to the pool.
242While this rollback will clear out the immediate state used by the previous
243transaction, it does not cover a wider range of session-level state, including
244temporary tables as well as other server state such as prepared statement
245handles and statement caches. The PostgreSQL database includes a variety
246of commands which may be used to reset this state, including
247``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``.
250To install
251one or more of these commands as the means of performing reset-on-return,
252the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated
253in the example below (**requires SQLAlchemy 1.4.43 or greater**). The implementation
254will end transactions in progress as well as discard temporary tables
255using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL
256documentation for background on what each of these statements do.
258The :paramref:`_sa.create_engine.pool_reset_on_return` parameter
259is set to ``None`` so that the custom scheme can replace the default behavior
260completely. The custom hook implementation calls ``.rollback()`` in any case,
261as it's usually important that the DBAPI's own tracking of commit/rollback
262will remain consistent with the state of the transaction::
265 from sqlalchemy import create_engine
266 from sqlalchemy import event
268 postgresql_engine = create_engine(
269 "postgresql+pyscopg2://scott:tiger@hostname/dbname",
271 # disable default reset-on-return scheme
272 pool_reset_on_return=None,
273 )
276 @event.listens_for(postgresql_engine, "reset")
277 def _reset_postgresql(dbapi_connection, connection_record, reset_state):
278 dbapi_connection.execute("CLOSE ALL")
279 dbapi_connection.execute("RESET ALL")
280 dbapi_connection.execute("DISCARD TEMP")
282 # so that the DBAPI itself knows that the connection has been
283 # reset
284 dbapi_connection.rollback()
286.. versionchanged:: 1.4.43 Ensured the :meth:`.PoolEvents.reset` event
287 is invoked for all "reset" occurrences, so that it's appropriate
288 as a place for custom "reset" handlers. Previous schemes which
289 use the :meth:`.PoolEvents.checkin` handler remain usable as well.
291.. seealso::
293 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation
295.. _postgresql_alternate_search_path:
297Setting Alternate Search Paths on Connect
298------------------------------------------
300The PostgreSQL ``search_path`` variable refers to the list of schema names
301that will be implicitly referred towards when a particular table or other
302object is referenced in a SQL statement. As detailed in the next section
303:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around
304the concept of keeping this variable at its default value of ``public``,
305however, in order to have it set to any arbitrary name or names when connections
306are used automatically, the "SET SESSION search_path" command may be invoked
307for all connections in a pool using the following event handler, as discussed
308at :ref:`schema_set_default_connections`::
310 from sqlalchemy import event
311 from sqlalchemy import create_engine
313 engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname")
315 @event.listens_for(engine, "connect", insert=True)
316 def set_search_path(dbapi_connection, connection_record):
317 existing_autocommit = dbapi_connection.autocommit
318 dbapi_connection.autocommit = True
319 cursor = dbapi_connection.cursor()
320 cursor.execute("SET SESSION search_path='%s'" % schema_name)
321 cursor.close()
322 dbapi_connection.autocommit = existing_autocommit
324The reason the recipe is complicated by use of the ``.autocommit`` DBAPI
325attribute is so that when the ``SET SESSION search_path`` directive is invoked,
326it is invoked outside of the scope of any transaction and therefore will not
327be reverted when the DBAPI connection has a rollback.
329.. seealso::
331 :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation
336.. _postgresql_schema_reflection:
338Remote-Schema Table Introspection and PostgreSQL search_path
339------------------------------------------------------------
341.. admonition:: Section Best Practices Summarized
343 keep the ``search_path`` variable set to its default of ``public``, without
344 any other schema names. For other schema names, name these explicitly
345 within :class:`_schema.Table` definitions. Alternatively, the
346 ``postgresql_ignore_search_path`` option will cause all reflected
347 :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema`
348 attribute set up.
350The PostgreSQL dialect can reflect tables from any schema, as outlined in
351:ref:`metadata_reflection_schemas`.
353With regards to tables which these :class:`_schema.Table`
354objects refer to via foreign key constraint, a decision must be made as to how
355the ``.schema`` is represented in those remote tables, in the case where that
356remote schema name is also a member of the current
357`PostgreSQL search path
358<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_.
360By default, the PostgreSQL dialect mimics the behavior encouraged by
361PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function
362returns a sample definition for a particular foreign key constraint,
363omitting the referenced schema name from that definition when the name is
364also in the PostgreSQL schema search path. The interaction below
365illustrates this behavior::
367 test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY);
368 CREATE TABLE
369 test=> CREATE TABLE referring(
370 test(> id INTEGER PRIMARY KEY,
371 test(> referred_id INTEGER REFERENCES test_schema.referred(id));
372 CREATE TABLE
373 test=> SET search_path TO public, test_schema;
374 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
375 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
376 test-> ON n.oid = c.relnamespace
377 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
378 test-> WHERE c.relname='referring' AND r.contype = 'f'
379 test-> ;
380 pg_get_constraintdef
381 ---------------------------------------------------
382 FOREIGN KEY (referred_id) REFERENCES referred(id)
383 (1 row)
385Above, we created a table ``referred`` as a member of the remote schema
386``test_schema``, however when we added ``test_schema`` to the
387PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the
388``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of
389the function.
391On the other hand, if we set the search path back to the typical default
392of ``public``::
394 test=> SET search_path TO public;
395 SET
397The same query against ``pg_get_constraintdef()`` now returns the fully
398schema-qualified name for us::
400 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
401 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
402 test-> ON n.oid = c.relnamespace
403 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
404 test-> WHERE c.relname='referring' AND r.contype = 'f';
405 pg_get_constraintdef
406 ---------------------------------------------------------------
407 FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id)
408 (1 row)
410SQLAlchemy will by default use the return value of ``pg_get_constraintdef()``
411in order to determine the remote schema name. That is, if our ``search_path``
412were set to include ``test_schema``, and we invoked a table
413reflection process as follows::
415 >>> from sqlalchemy import Table, MetaData, create_engine, text
416 >>> engine = create_engine("postgresql://scott:tiger@localhost/test")
417 >>> with engine.connect() as conn:
418 ... conn.execute(text("SET search_path TO test_schema, public"))
419 ... metadata_obj = MetaData()
420 ... referring = Table('referring', metadata_obj,
421 ... autoload_with=conn)
422 ...
423 <sqlalchemy.engine.result.CursorResult object at 0x101612ed0>
425The above process would deliver to the :attr:`_schema.MetaData.tables`
426collection
427``referred`` table named **without** the schema::
429 >>> metadata_obj.tables['referred'].schema is None
430 True
432To alter the behavior of reflection such that the referred schema is
433maintained regardless of the ``search_path`` setting, use the
434``postgresql_ignore_search_path`` option, which can be specified as a
435dialect-specific argument to both :class:`_schema.Table` as well as
436:meth:`_schema.MetaData.reflect`::
438 >>> with engine.connect() as conn:
439 ... conn.execute(text("SET search_path TO test_schema, public"))
440 ... metadata_obj = MetaData()
441 ... referring = Table('referring', metadata_obj,
442 ... autoload_with=conn,
443 ... postgresql_ignore_search_path=True)
444 ...
445 <sqlalchemy.engine.result.CursorResult object at 0x1016126d0>
447We will now have ``test_schema.referred`` stored as schema-qualified::
449 >>> metadata_obj.tables['test_schema.referred'].schema
450 'test_schema'
452.. sidebar:: Best Practices for PostgreSQL Schema reflection
454 The description of PostgreSQL schema reflection behavior is complex, and
455 is the product of many years of dealing with widely varied use cases and
456 user preferences. But in fact, there's no need to understand any of it if
457 you just stick to the simplest use pattern: leave the ``search_path`` set
458 to its default of ``public`` only, never refer to the name ``public`` as
459 an explicit schema name otherwise, and refer to all other schema names
460 explicitly when building up a :class:`_schema.Table` object. The options
461 described here are only for those users who can't, or prefer not to, stay
462 within these guidelines.
464Note that **in all cases**, the "default" schema is always reflected as
465``None``. The "default" schema on PostgreSQL is that which is returned by the
466PostgreSQL ``current_schema()`` function. On a typical PostgreSQL
467installation, this is the name ``public``. So a table that refers to another
468which is in the ``public`` (i.e. default) schema will always have the
469``.schema`` attribute set to ``None``.
471.. seealso::
473 :ref:`reflection_schema_qualified_interaction` - discussion of the issue
474 from a backend-agnostic perspective
476 `The Schema Search Path
477 <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
478 - on the PostgreSQL website.
480INSERT/UPDATE...RETURNING
481-------------------------
483The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and
484``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default
485for single-row INSERT statements in order to fetch newly generated
486primary key identifiers. To specify an explicit ``RETURNING`` clause,
487use the :meth:`._UpdateBase.returning` method on a per-statement basis::
489 # INSERT..RETURNING
490 result = table.insert().returning(table.c.col1, table.c.col2).\
491 values(name='foo')
492 print(result.fetchall())
494 # UPDATE..RETURNING
495 result = table.update().returning(table.c.col1, table.c.col2).\
496 where(table.c.name=='foo').values(name='bar')
497 print(result.fetchall())
499 # DELETE..RETURNING
500 result = table.delete().returning(table.c.col1, table.c.col2).\
501 where(table.c.name=='foo')
502 print(result.fetchall())
504.. _postgresql_insert_on_conflict:
506INSERT...ON CONFLICT (Upsert)
507------------------------------
509Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of
510rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A
511candidate row will only be inserted if that row does not violate any unique
512constraints. In the case of a unique constraint violation, a secondary action
513can occur which can be either "DO UPDATE", indicating that the data in the
514target row should be updated, or "DO NOTHING", which indicates to silently skip
515this row.
517Conflicts are determined using existing unique constraints and indexes. These
518constraints may be identified either using their name as stated in DDL,
519or they may be inferred by stating the columns and conditions that comprise
520the indexes.
522SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific
523:func:`_postgresql.insert()` function, which provides
524the generative methods :meth:`_postgresql.Insert.on_conflict_do_update`
525and :meth:`~.postgresql.Insert.on_conflict_do_nothing`:
527.. sourcecode:: pycon+sql
529 >>> from sqlalchemy.dialects.postgresql import insert
530 >>> insert_stmt = insert(my_table).values(
531 ... id='some_existing_id',
532 ... data='inserted value')
533 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
534 ... index_elements=['id']
535 ... )
536 >>> print(do_nothing_stmt)
537 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
538 ON CONFLICT (id) DO NOTHING
539 {stop}
541 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
542 ... constraint='pk_my_table',
543 ... set_=dict(data='updated value')
544 ... )
545 >>> print(do_update_stmt)
546 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
547 ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s
549.. versionadded:: 1.1
551.. seealso::
553 `INSERT .. ON CONFLICT
554 <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_
555 - in the PostgreSQL documentation.
557Specifying the Target
558^^^^^^^^^^^^^^^^^^^^^
560Both methods supply the "target" of the conflict using either the
561named constraint or by column inference:
563* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument
564 specifies a sequence containing string column names, :class:`_schema.Column`
565 objects, and/or SQL expression elements, which would identify a unique
566 index:
568 .. sourcecode:: pycon+sql
570 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
571 ... index_elements=['id'],
572 ... set_=dict(data='updated value')
573 ... )
574 >>> print(do_update_stmt)
575 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
576 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
577 {stop}
579 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
580 ... index_elements=[my_table.c.id],
581 ... set_=dict(data='updated value')
582 ... )
583 >>> print(do_update_stmt)
584 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
585 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
587* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to
588 infer an index, a partial index can be inferred by also specifying the
589 use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter:
591 .. sourcecode:: pycon+sql
593 >>> stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
594 >>> stmt = stmt.on_conflict_do_update(
595 ... index_elements=[my_table.c.user_email],
596 ... index_where=my_table.c.user_email.like('%@gmail.com'),
597 ... set_=dict(data=stmt.excluded.data)
598 ... )
599 >>> print(stmt)
600 {opensql}INSERT INTO my_table (data, user_email)
601 VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email)
602 WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data
604* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is
605 used to specify an index directly rather than inferring it. This can be
606 the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX:
608 .. sourcecode:: pycon+sql
610 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
611 ... constraint='my_table_idx_1',
612 ... set_=dict(data='updated value')
613 ... )
614 >>> print(do_update_stmt)
615 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
616 ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s
617 {stop}
619 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
620 ... constraint='my_table_pk',
621 ... set_=dict(data='updated value')
622 ... )
623 >>> print(do_update_stmt)
624 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
625 ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s
626 {stop}
628* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may
629 also refer to a SQLAlchemy construct representing a constraint,
630 e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`,
631 :class:`.Index`, or :class:`.ExcludeConstraint`. In this use,
632 if the constraint has a name, it is used directly. Otherwise, if the
633 constraint is unnamed, then inference will be used, where the expressions
634 and optional WHERE clause of the constraint will be spelled out in the
635 construct. This use is especially convenient
636 to refer to the named or unnamed primary key of a :class:`_schema.Table`
637 using the
638 :attr:`_schema.Table.primary_key` attribute:
640 .. sourcecode:: pycon+sql
642 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
643 ... constraint=my_table.primary_key,
644 ... set_=dict(data='updated value')
645 ... )
646 >>> print(do_update_stmt)
647 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
648 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
650The SET Clause
651^^^^^^^^^^^^^^^
653``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
654existing row, using any combination of new values as well as values
655from the proposed insertion. These values are specified using the
656:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This
657parameter accepts a dictionary which consists of direct values
658for UPDATE:
660.. sourcecode:: pycon+sql
662 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
663 >>> do_update_stmt = stmt.on_conflict_do_update(
664 ... index_elements=['id'],
665 ... set_=dict(data='updated value')
666 ... )
667 >>> print(do_update_stmt)
668 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
669 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
671.. warning::
673 The :meth:`_expression.Insert.on_conflict_do_update`
674 method does **not** take into
675 account Python-side default UPDATE values or generation functions, e.g.
676 those specified using :paramref:`_schema.Column.onupdate`.
677 These values will not be exercised for an ON CONFLICT style of UPDATE,
678 unless they are manually specified in the
679 :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary.
681Updating using the Excluded INSERT Values
682^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
684In order to refer to the proposed insertion row, the special alias
685:attr:`~.postgresql.Insert.excluded` is available as an attribute on
686the :class:`_postgresql.Insert` object; this object is a
687:class:`_expression.ColumnCollection`
688which alias contains all columns of the target
689table:
691.. sourcecode:: pycon+sql
693 >>> stmt = insert(my_table).values(
694 ... id='some_id',
695 ... data='inserted value',
696 ... author='jlh'
697 ... )
698 >>> do_update_stmt = stmt.on_conflict_do_update(
699 ... index_elements=['id'],
700 ... set_=dict(data='updated value', author=stmt.excluded.author)
701 ... )
702 >>> print(do_update_stmt)
703 {opensql}INSERT INTO my_table (id, data, author)
704 VALUES (%(id)s, %(data)s, %(author)s)
705 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
707Additional WHERE Criteria
708^^^^^^^^^^^^^^^^^^^^^^^^^
710The :meth:`_expression.Insert.on_conflict_do_update` method also accepts
711a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where`
712parameter, which will limit those rows which receive an UPDATE:
714.. sourcecode:: pycon+sql
716 >>> stmt = insert(my_table).values(
717 ... id='some_id',
718 ... data='inserted value',
719 ... author='jlh'
720 ... )
721 >>> on_update_stmt = stmt.on_conflict_do_update(
722 ... index_elements=['id'],
723 ... set_=dict(data='updated value', author=stmt.excluded.author),
724 ... where=(my_table.c.status == 2)
725 ... )
726 >>> print(on_update_stmt)
727 {opensql}INSERT INTO my_table (id, data, author)
728 VALUES (%(id)s, %(data)s, %(author)s)
729 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
730 WHERE my_table.status = %(status_1)s
732Skipping Rows with DO NOTHING
733^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
735``ON CONFLICT`` may be used to skip inserting a row entirely
736if any conflict with a unique or exclusion constraint occurs; below
737this is illustrated using the
738:meth:`~.postgresql.Insert.on_conflict_do_nothing` method:
740.. sourcecode:: pycon+sql
742 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
743 >>> stmt = stmt.on_conflict_do_nothing(index_elements=['id'])
744 >>> print(stmt)
745 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
746 ON CONFLICT (id) DO NOTHING
748If ``DO NOTHING`` is used without specifying any columns or constraint,
749it has the effect of skipping the INSERT for any unique or exclusion
750constraint violation which occurs:
752.. sourcecode:: pycon+sql
754 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
755 >>> stmt = stmt.on_conflict_do_nothing()
756 >>> print(stmt)
757 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
758 ON CONFLICT DO NOTHING
760.. _postgresql_match:
762Full Text Search
763----------------
765SQLAlchemy makes available the PostgreSQL ``@@`` operator via the
766:meth:`_expression.ColumnElement.match` method on any textual column expression.
768On the PostgreSQL dialect, an expression like the following::
770 select(sometable.c.text.match("search string"))
772will emit to the database::
774 SELECT text @@ to_tsquery('search string') FROM table
776Various other PostgreSQL text search functions such as ``to_tsquery()``,
777``to_tsvector()``, and ``plainto_tsquery()`` are available by explicitly using
778the standard SQLAlchemy :data:`.func` construct.
780For example::
782 select(func.to_tsvector('fat cats ate rats').match('cat & rat'))
784Emits the equivalent of::
786 SELECT to_tsvector('fat cats ate rats') @@ to_tsquery('cat & rat')
788The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST::
790 from sqlalchemy.dialects.postgresql import TSVECTOR
791 from sqlalchemy import select, cast
792 select(cast("some text", TSVECTOR))
794produces a statement equivalent to::
796 SELECT CAST('some text' AS TSVECTOR) AS anon_1
798.. tip::
800 It's important to remember that text searching in PostgreSQL is powerful but complicated,
801 and SQLAlchemy users are advised to reference the PostgreSQL documentation
802 regarding
803 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_.
805 There are important differences between ``to_tsquery`` and
806 ``plainto_tsquery``, the most significant of which is that ``to_tsquery``
807 expects specially formatted "querytext" that is written to PostgreSQL's own
808 specification, while ``plainto_tsquery`` expects unformatted text that is
809 transformed into ``to_tsquery`` compatible querytext. This means the input to
810 ``.match()`` under PostgreSQL may be incompatible with the input to
811 ``.match()`` under another database backend. SQLAlchemy users who support
812 multiple backends are advised to carefully implement their usage of
813 ``.match()`` to work around these constraints.
815Full Text Searches in PostgreSQL are influenced by a combination of: the
816PostgreSQL setting of ``default_text_search_config``, the ``regconfig`` used
817to build the GIN/GiST indexes, and the ``regconfig`` optionally passed in
818during a query.
820When performing a Full Text Search against a column that has a GIN or
821GiST index that is already pre-computed (which is common on full text
822searches) one may need to explicitly pass in a particular PostgreSQL
823``regconfig`` value to ensure the query-planner utilizes the index and does
824not re-compute the column on demand.
826In order to provide for this explicit query planning, or to use different
827search strategies, the ``match`` method accepts a ``postgresql_regconfig``
828keyword argument::
830 select(mytable.c.id).where(
831 mytable.c.title.match('somestring', postgresql_regconfig='english')
832 )
834Emits the equivalent of::
836 SELECT mytable.id FROM mytable
837 WHERE mytable.title @@ to_tsquery('english', 'somestring')
839One can also specifically pass in a `'regconfig'` value to the
840``to_tsvector()`` command as the initial argument::
842 select(mytable.c.id).where(
843 func.to_tsvector('english', mytable.c.title )\
844 .match('somestring', postgresql_regconfig='english')
845 )
847produces a statement equivalent to::
849 SELECT mytable.id FROM mytable
850 WHERE to_tsvector('english', mytable.title) @@
851 to_tsquery('english', 'somestring')
853It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from
854PostgreSQL to ensure that you are generating queries with SQLAlchemy that
855take full advantage of any indexes you may have created for full text search.
857.. seealso::
859 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation
862FROM ONLY ...
863-------------
865The dialect supports PostgreSQL's ONLY keyword for targeting only a particular
866table in an inheritance hierarchy. This can be used to produce the
867``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...``
868syntaxes. It uses SQLAlchemy's hints mechanism::
870 # SELECT ... FROM ONLY ...
871 result = table.select().with_hint(table, 'ONLY', 'postgresql')
872 print(result.fetchall())
874 # UPDATE ONLY ...
875 table.update(values=dict(foo='bar')).with_hint('ONLY',
876 dialect_name='postgresql')
878 # DELETE FROM ONLY ...
879 table.delete().with_hint('ONLY', dialect_name='postgresql')
882.. _postgresql_indexes:
884PostgreSQL-Specific Index Options
885---------------------------------
887Several extensions to the :class:`.Index` construct are available, specific
888to the PostgreSQL dialect.
890Covering Indexes
891^^^^^^^^^^^^^^^^
893The ``postgresql_include`` option renders INCLUDE(colname) for the given
894string names::
896 Index("my_index", table.c.x, postgresql_include=['y'])
898would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
900Note that this feature requires PostgreSQL 11 or later.
902.. versionadded:: 1.4
904.. _postgresql_partial_indexes:
906Partial Indexes
907^^^^^^^^^^^^^^^
909Partial indexes add criterion to the index definition so that the index is
910applied to a subset of rows. These can be specified on :class:`.Index`
911using the ``postgresql_where`` keyword argument::
913 Index('my_index', my_table.c.id, postgresql_where=my_table.c.value > 10)
915.. _postgresql_operator_classes:
917Operator Classes
918^^^^^^^^^^^^^^^^
920PostgreSQL allows the specification of an *operator class* for each column of
921an index (see
922https://www.postgresql.org/docs/current/interactive/indexes-opclass.html).
923The :class:`.Index` construct allows these to be specified via the
924``postgresql_ops`` keyword argument::
926 Index(
927 'my_index', my_table.c.id, my_table.c.data,
928 postgresql_ops={
929 'data': 'text_pattern_ops',
930 'id': 'int4_ops'
931 })
933Note that the keys in the ``postgresql_ops`` dictionaries are the
934"key" name of the :class:`_schema.Column`, i.e. the name used to access it from
935the ``.c`` collection of :class:`_schema.Table`, which can be configured to be
936different than the actual name of the column as expressed in the database.
938If ``postgresql_ops`` is to be used against a complex SQL expression such
939as a function call, then to apply to the column it must be given a label
940that is identified in the dictionary by name, e.g.::
942 Index(
943 'my_index', my_table.c.id,
944 func.lower(my_table.c.data).label('data_lower'),
945 postgresql_ops={
946 'data_lower': 'text_pattern_ops',
947 'id': 'int4_ops'
948 })
950Operator classes are also supported by the
951:class:`_postgresql.ExcludeConstraint` construct using the
952:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for
953details.
955.. versionadded:: 1.3.21 added support for operator classes with
956 :class:`_postgresql.ExcludeConstraint`.
959Index Types
960^^^^^^^^^^^
962PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well
963as the ability for users to create their own (see
964https://www.postgresql.org/docs/current/static/indexes-types.html). These can be
965specified on :class:`.Index` using the ``postgresql_using`` keyword argument::
967 Index('my_index', my_table.c.data, postgresql_using='gin')
969The value passed to the keyword argument will be simply passed through to the
970underlying CREATE INDEX command, so it *must* be a valid index type for your
971version of PostgreSQL.
973.. _postgresql_index_storage:
975Index Storage Parameters
976^^^^^^^^^^^^^^^^^^^^^^^^
978PostgreSQL allows storage parameters to be set on indexes. The storage
979parameters available depend on the index method used by the index. Storage
980parameters can be specified on :class:`.Index` using the ``postgresql_with``
981keyword argument::
983 Index('my_index', my_table.c.data, postgresql_with={"fillfactor": 50})
985.. versionadded:: 1.0.6
987PostgreSQL allows to define the tablespace in which to create the index.
988The tablespace can be specified on :class:`.Index` using the
989``postgresql_tablespace`` keyword argument::
991 Index('my_index', my_table.c.data, postgresql_tablespace='my_tablespace')
993.. versionadded:: 1.1
995Note that the same option is available on :class:`_schema.Table` as well.
997.. _postgresql_index_concurrently:
999Indexes with CONCURRENTLY
1000^^^^^^^^^^^^^^^^^^^^^^^^^
1002The PostgreSQL index option CONCURRENTLY is supported by passing the
1003flag ``postgresql_concurrently`` to the :class:`.Index` construct::
1005 tbl = Table('testtbl', m, Column('data', Integer))
1007 idx1 = Index('test_idx1', tbl.c.data, postgresql_concurrently=True)
1009The above index construct will render DDL for CREATE INDEX, assuming
1010PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as::
1012 CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)
1014For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for
1015a connection-less dialect, it will emit::
1017 DROP INDEX CONCURRENTLY test_idx1
1019.. versionadded:: 1.1 support for CONCURRENTLY on DROP INDEX. The
1020 CONCURRENTLY keyword is now only emitted if a high enough version
1021 of PostgreSQL is detected on the connection (or for a connection-less
1022 dialect).
1024When using CONCURRENTLY, the PostgreSQL database requires that the statement
1025be invoked outside of a transaction block. The Python DBAPI enforces that
1026even for a single statement, a transaction is present, so to use this
1027construct, the DBAPI's "autocommit" mode must be used::
1029 metadata = MetaData()
1030 table = Table(
1031 "foo", metadata,
1032 Column("id", String))
1033 index = Index(
1034 "foo_idx", table.c.id, postgresql_concurrently=True)
1036 with engine.connect() as conn:
1037 with conn.execution_options(isolation_level='AUTOCOMMIT'):
1038 table.create(conn)
1040.. seealso::
1042 :ref:`postgresql_isolation_level`
1044.. _postgresql_index_reflection:
1046PostgreSQL Index Reflection
1047---------------------------
1049The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the
1050UNIQUE CONSTRAINT construct is used. When inspecting a table using
1051:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes`
1052and the :meth:`_reflection.Inspector.get_unique_constraints`
1053will report on these
1054two constructs distinctly; in the case of the index, the key
1055``duplicates_constraint`` will be present in the index entry if it is
1056detected as mirroring a constraint. When performing reflection using
1057``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned
1058in :attr:`_schema.Table.indexes` when it is detected as mirroring a
1059:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection
1060.
1062.. versionchanged:: 1.0.0 - :class:`_schema.Table` reflection now includes
1063 :class:`.UniqueConstraint` objects present in the
1064 :attr:`_schema.Table.constraints`
1065 collection; the PostgreSQL backend will no longer include a "mirrored"
1066 :class:`.Index` construct in :attr:`_schema.Table.indexes`
1067 if it is detected
1068 as corresponding to a unique constraint.
1070Special Reflection Options
1071--------------------------
1073The :class:`_reflection.Inspector`
1074used for the PostgreSQL backend is an instance
1075of :class:`.PGInspector`, which offers additional methods::
1077 from sqlalchemy import create_engine, inspect
1079 engine = create_engine("postgresql+psycopg2://localhost/test")
1080 insp = inspect(engine) # will be a PGInspector
1082 print(insp.get_enums())
1084.. autoclass:: PGInspector
1085 :members:
1087.. _postgresql_table_options:
1089PostgreSQL Table Options
1090------------------------
1092Several options for CREATE TABLE are supported directly by the PostgreSQL
1093dialect in conjunction with the :class:`_schema.Table` construct:
1095* ``TABLESPACE``::
1097 Table("some_table", metadata, ..., postgresql_tablespace='some_tablespace')
1099 The above option is also available on the :class:`.Index` construct.
1101* ``ON COMMIT``::
1103 Table("some_table", metadata, ..., postgresql_on_commit='PRESERVE ROWS')
1105* ``WITH OIDS``::
1107 Table("some_table", metadata, ..., postgresql_with_oids=True)
1109* ``WITHOUT OIDS``::
1111 Table("some_table", metadata, ..., postgresql_with_oids=False)
1113* ``INHERITS``::
1115 Table("some_table", metadata, ..., postgresql_inherits="some_supertable")
1117 Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...))
1119 .. versionadded:: 1.0.0
1121* ``PARTITION BY``::
1123 Table("some_table", metadata, ...,
1124 postgresql_partition_by='LIST (part_column)')
1126 .. versionadded:: 1.2.6
1128.. seealso::
1130 `PostgreSQL CREATE TABLE options
1131 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ -
1132 in the PostgreSQL documentation.
1134.. _postgresql_constraint_options:
1136PostgreSQL Constraint Options
1137-----------------------------
1139The following option(s) are supported by the PostgreSQL dialect in conjunction
1140with selected constraint constructs:
1142* ``NOT VALID``: This option applies towards CHECK and FOREIGN KEY constraints
1143 when the constraint is being added to an existing table via ALTER TABLE,
1144 and has the effect that existing rows are not scanned during the ALTER
1145 operation against the constraint being added.
1147 When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_
1148 that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument
1149 may be specified as an additional keyword argument within the operation
1150 that creates the constraint, as in the following Alembic example::
1152 def update():
1153 op.create_foreign_key(
1154 "fk_user_address",
1155 "address",
1156 "user",
1157 ["user_id"],
1158 ["id"],
1159 postgresql_not_valid=True
1160 )
1162 The keyword is ultimately accepted directly by the
1163 :class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint`
1164 and :class:`_schema.ForeignKey` constructs; when using a tool like
1165 Alembic, dialect-specific keyword arguments are passed through to
1166 these constructs from the migration operation directives::
1168 CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True)
1170 ForeignKeyConstraint(["some_id"], ["some_table.some_id"], postgresql_not_valid=True)
1172 .. versionadded:: 1.4.32
1174 .. seealso::
1176 `PostgreSQL ALTER TABLE options
1177 <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ -
1178 in the PostgreSQL documentation.
1180.. _postgresql_table_valued_overview:
1182Table values, Table and Column valued functions, Row and Tuple objects
1183-----------------------------------------------------------------------
1185PostgreSQL makes great use of modern SQL forms such as table-valued functions,
1186tables and rows as values. These constructs are commonly used as part
1187of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other
1188datatypes. SQLAlchemy's SQL expression language has native support for
1189most table-valued and row-valued forms.
1191.. _postgresql_table_valued:
1193Table-Valued Functions
1194^^^^^^^^^^^^^^^^^^^^^^^
1196Many PostgreSQL built-in functions are intended to be used in the FROM clause
1197of a SELECT statement, and are capable of returning table rows or sets of table
1198rows. A large portion of PostgreSQL's JSON functions for example such as
1199``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``,
1200``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such
1201forms. These classes of SQL function calling forms in SQLAlchemy are available
1202using the :meth:`_functions.FunctionElement.table_valued` method in conjunction
1203with :class:`_functions.Function` objects generated from the :data:`_sql.func`
1204namespace.
1206Examples from PostgreSQL's reference documentation follow below:
1208* ``json_each()``::
1210 >>> from sqlalchemy import select, func
1211 >>> stmt = select(func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value"))
1212 >>> print(stmt)
1213 SELECT anon_1.key, anon_1.value
1214 FROM json_each(:json_each_1) AS anon_1
1216* ``json_populate_record()``::
1218 >>> from sqlalchemy import select, func, literal_column
1219 >>> stmt = select(
1220 ... func.json_populate_record(
1221 ... literal_column("null::myrowtype"),
1222 ... '{"a":1,"b":2}'
1223 ... ).table_valued("a", "b", name="x")
1224 ... )
1225 >>> print(stmt)
1226 SELECT x.a, x.b
1227 FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x
1229* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived
1230 columns in the alias, where we may make use of :func:`_sql.column` elements with
1231 types to produce them. The :meth:`_functions.FunctionElement.table_valued`
1232 method produces a :class:`_sql.TableValuedAlias` construct, and the method
1233 :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived
1234 columns specification::
1236 >>> from sqlalchemy import select, func, column, Integer, Text
1237 >>> stmt = select(
1238 ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}').table_valued(
1239 ... column("a", Integer), column("b", Text), column("d", Text),
1240 ... ).render_derived(name="x", with_types=True)
1241 ... )
1242 >>> print(stmt)
1243 SELECT x.a, x.b, x.d
1244 FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT)
1246* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an
1247 ordinal counter to the output of a function and is accepted by a limited set
1248 of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The
1249 :meth:`_functions.FunctionElement.table_valued` method accepts a keyword
1250 parameter ``with_ordinality`` for this purpose, which accepts the string name
1251 that will be applied to the "ordinality" column::
1253 >>> from sqlalchemy import select, func
1254 >>> stmt = select(
1255 ... func.generate_series(4, 1, -1).
1256 ... table_valued("value", with_ordinality="ordinality").
1257 ... render_derived()
1258 ... )
1259 >>> print(stmt)
1260 SELECT anon_1.value, anon_1.ordinality
1261 FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3)
1262 WITH ORDINALITY AS anon_1(value, ordinality)
1264.. versionadded:: 1.4.0b2
1266.. seealso::
1268 :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial`
1270.. _postgresql_column_valued:
1272Column Valued Functions
1273^^^^^^^^^^^^^^^^^^^^^^^
1275Similar to the table valued function, a column valued function is present
1276in the FROM clause, but delivers itself to the columns clause as a single
1277scalar value. PostgreSQL functions such as ``json_array_elements()``,
1278``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the
1279:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`:
1281* ``json_array_elements()``::
1283 >>> from sqlalchemy import select, func
1284 >>> stmt = select(func.json_array_elements('["one", "two"]').column_valued("x"))
1285 >>> print(stmt)
1286 SELECT x
1287 FROM json_array_elements(:json_array_elements_1) AS x
1289* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the
1290 :func:`_postgresql.array` construct may be used::
1293 >>> from sqlalchemy.dialects.postgresql import array
1294 >>> from sqlalchemy import select, func
1295 >>> stmt = select(func.unnest(array([1, 2])).column_valued())
1296 >>> print(stmt)
1297 SELECT anon_1
1298 FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1
1300 The function can of course be used against an existing table-bound column
1301 that's of type :class:`_types.ARRAY`::
1303 >>> from sqlalchemy import table, column, ARRAY, Integer
1304 >>> from sqlalchemy import select, func
1305 >>> t = table("t", column('value', ARRAY(Integer)))
1306 >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value"))
1307 >>> print(stmt)
1308 SELECT unnested_value
1309 FROM unnest(t.value) AS unnested_value
1311.. seealso::
1313 :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial`
1316Row Types
1317^^^^^^^^^
1319Built-in support for rendering a ``ROW`` may be approximated using
1320``func.ROW`` with the :attr:`_sa.func` namespace, or by using the
1321:func:`_sql.tuple_` construct::
1323 >>> from sqlalchemy import table, column, func, tuple_
1324 >>> t = table("t", column("id"), column("fk"))
1325 >>> stmt = t.select().where(
1326 ... tuple_(t.c.id, t.c.fk) > (1,2)
1327 ... ).where(
1328 ... func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7)
1329 ... )
1330 >>> print(stmt)
1331 SELECT t.id, t.fk
1332 FROM t
1333 WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2)
1335.. seealso::
1337 `PostgreSQL Row Constructors
1338 <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_
1340 `PostgreSQL Row Constructor Comparison
1341 <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_
1343Table Types passed to Functions
1344^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1346PostgreSQL supports passing a table as an argument to a function, which it
1347refers towards as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects
1348such as :class:`_schema.Table` support this special form using the
1349:meth:`_sql.FromClause.table_valued` method, which is comparable to the
1350:meth:`_functions.FunctionElement.table_valued` method except that the collection
1351of columns is already established by that of the :class:`_sql.FromClause`
1352itself::
1355 >>> from sqlalchemy import table, column, func, select
1356 >>> a = table( "a", column("id"), column("x"), column("y"))
1357 >>> stmt = select(func.row_to_json(a.table_valued()))
1358 >>> print(stmt)
1359 SELECT row_to_json(a) AS row_to_json_1
1360 FROM a
1362.. versionadded:: 1.4.0b2
1365ARRAY Types
1366-----------
1368The PostgreSQL dialect supports arrays, both as multidimensional column types
1369as well as array literals:
1371* :class:`_postgresql.ARRAY` - ARRAY datatype
1373* :class:`_postgresql.array` - array literal
1375* :func:`_postgresql.array_agg` - ARRAY_AGG SQL function
1377* :class:`_postgresql.aggregate_order_by` - helper for PG's ORDER BY aggregate
1378 function syntax.
1380JSON Types
1381----------
1383The PostgreSQL dialect supports both JSON and JSONB datatypes, including
1384psycopg2's native support and support for all of PostgreSQL's special
1385operators:
1387* :class:`_postgresql.JSON`
1389* :class:`_postgresql.JSONB`
1391HSTORE Type
1392-----------
1394The PostgreSQL HSTORE type as well as hstore literals are supported:
1396* :class:`_postgresql.HSTORE` - HSTORE datatype
1398* :class:`_postgresql.hstore` - hstore literal
1400ENUM Types
1401----------
1403PostgreSQL has an independently creatable TYPE structure which is used
1404to implement an enumerated type. This approach introduces significant
1405complexity on the SQLAlchemy side in terms of when this type should be
1406CREATED and DROPPED. The type object is also an independently reflectable
1407entity. The following sections should be consulted:
1409* :class:`_postgresql.ENUM` - DDL and typing support for ENUM.
1411* :meth:`.PGInspector.get_enums` - retrieve a listing of current ENUM types
1413* :meth:`.postgresql.ENUM.create` , :meth:`.postgresql.ENUM.drop` - individual
1414 CREATE and DROP commands for ENUM.
1416.. _postgresql_array_of_enum:
1418Using ENUM with ARRAY
1419^^^^^^^^^^^^^^^^^^^^^
1421The combination of ENUM and ARRAY is not directly supported by backend
1422DBAPIs at this time. Prior to SQLAlchemy 1.3.17, a special workaround
1423was needed in order to allow this combination to work, described below.
1425.. versionchanged:: 1.3.17 The combination of ENUM and ARRAY is now directly
1426 handled by SQLAlchemy's implementation without any workarounds needed.
1428.. sourcecode:: python
1430 from sqlalchemy import TypeDecorator
1431 from sqlalchemy.dialects.postgresql import ARRAY
1433 class ArrayOfEnum(TypeDecorator):
1434 impl = ARRAY
1436 def bind_expression(self, bindvalue):
1437 return sa.cast(bindvalue, self)
1439 def result_processor(self, dialect, coltype):
1440 super_rp = super(ArrayOfEnum, self).result_processor(
1441 dialect, coltype)
1443 def handle_raw_string(value):
1444 inner = re.match(r"^{(.*)}$", value).group(1)
1445 return inner.split(",") if inner else []
1447 def process(value):
1448 if value is None:
1449 return None
1450 return super_rp(handle_raw_string(value))
1451 return process
1453E.g.::
1455 Table(
1456 'mydata', metadata,
1457 Column('id', Integer, primary_key=True),
1458 Column('data', ArrayOfEnum(ENUM('a', 'b, 'c', name='myenum')))
1460 )
1462This type is not included as a built-in type as it would be incompatible
1463with a DBAPI that suddenly decides to support ARRAY of ENUM directly in
1464a new version.
1466.. _postgresql_array_of_json:
1468Using JSON/JSONB with ARRAY
1469^^^^^^^^^^^^^^^^^^^^^^^^^^^
1471Similar to using ENUM, prior to SQLAlchemy 1.3.17, for an ARRAY of JSON/JSONB
1472we need to render the appropriate CAST. Current psycopg2 drivers accommodate
1473the result set correctly without any special steps.
1475.. versionchanged:: 1.3.17 The combination of JSON/JSONB and ARRAY is now
1476 directly handled by SQLAlchemy's implementation without any workarounds
1477 needed.
1479.. sourcecode:: python
1481 class CastingArray(ARRAY):
1482 def bind_expression(self, bindvalue):
1483 return sa.cast(bindvalue, self)
1485E.g.::
1487 Table(
1488 'mydata', metadata,
1489 Column('id', Integer, primary_key=True),
1490 Column('data', CastingArray(JSONB))
1491 )
1494""" # noqa: E501
1496from collections import defaultdict
1497import datetime as dt
1498import re
1499from uuid import UUID as _python_UUID
1501from . import array as _array
1502from . import dml
1503from . import hstore as _hstore
1504from . import json as _json
1505from . import ranges as _ranges
1506from ... import exc
1507from ... import schema
1508from ... import sql
1509from ... import util
1510from ...engine import characteristics
1511from ...engine import default
1512from ...engine import reflection
1513from ...sql import coercions
1514from ...sql import compiler
1515from ...sql import elements
1516from ...sql import expression
1517from ...sql import roles
1518from ...sql import sqltypes
1519from ...sql import util as sql_util
1520from ...sql.ddl import DDLBase
1521from ...types import BIGINT
1522from ...types import BOOLEAN
1523from ...types import CHAR
1524from ...types import DATE
1525from ...types import FLOAT
1526from ...types import INTEGER
1527from ...types import NUMERIC
1528from ...types import REAL
1529from ...types import SMALLINT
1530from ...types import TEXT
1531from ...types import VARCHAR
1533IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I)
1535AUTOCOMMIT_REGEXP = re.compile(
1536 r"\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER|GRANT|REVOKE|"
1537 "IMPORT FOREIGN SCHEMA|REFRESH MATERIALIZED VIEW|TRUNCATE)",
1538 re.I | re.UNICODE,
1539)
1541RESERVED_WORDS = set(
1542 [
1543 "all",
1544 "analyse",
1545 "analyze",
1546 "and",
1547 "any",
1548 "array",
1549 "as",
1550 "asc",
1551 "asymmetric",
1552 "both",
1553 "case",
1554 "cast",
1555 "check",
1556 "collate",
1557 "column",
1558 "constraint",
1559 "create",
1560 "current_catalog",
1561 "current_date",
1562 "current_role",
1563 "current_time",
1564 "current_timestamp",
1565 "current_user",
1566 "default",
1567 "deferrable",
1568 "desc",
1569 "distinct",
1570 "do",
1571 "else",
1572 "end",
1573 "except",
1574 "false",
1575 "fetch",
1576 "for",
1577 "foreign",
1578 "from",
1579 "grant",
1580 "group",
1581 "having",
1582 "in",
1583 "initially",
1584 "intersect",
1585 "into",
1586 "leading",
1587 "limit",
1588 "localtime",
1589 "localtimestamp",
1590 "new",
1591 "not",
1592 "null",
1593 "of",
1594 "off",
1595 "offset",
1596 "old",
1597 "on",
1598 "only",
1599 "or",
1600 "order",
1601 "placing",
1602 "primary",
1603 "references",
1604 "returning",
1605 "select",
1606 "session_user",
1607 "some",
1608 "symmetric",
1609 "table",
1610 "then",
1611 "to",
1612 "trailing",
1613 "true",
1614 "union",
1615 "unique",
1616 "user",
1617 "using",
1618 "variadic",
1619 "when",
1620 "where",
1621 "window",
1622 "with",
1623 "authorization",
1624 "between",
1625 "binary",
1626 "cross",
1627 "current_schema",
1628 "freeze",
1629 "full",
1630 "ilike",
1631 "inner",
1632 "is",
1633 "isnull",
1634 "join",
1635 "left",
1636 "like",
1637 "natural",
1638 "notnull",
1639 "outer",
1640 "over",
1641 "overlaps",
1642 "right",
1643 "similar",
1644 "verbose",
1645 ]
1646)
1648_DECIMAL_TYPES = (1231, 1700)
1649_FLOAT_TYPES = (700, 701, 1021, 1022)
1650_INT_TYPES = (20, 21, 23, 26, 1005, 1007, 1016)
1653class BYTEA(sqltypes.LargeBinary):
1654 __visit_name__ = "BYTEA"
1657class DOUBLE_PRECISION(sqltypes.Float):
1658 __visit_name__ = "DOUBLE_PRECISION"
1661class INET(sqltypes.TypeEngine):
1662 __visit_name__ = "INET"
1665PGInet = INET
1668class CIDR(sqltypes.TypeEngine):
1669 __visit_name__ = "CIDR"
1672PGCidr = CIDR
1675class MACADDR(sqltypes.TypeEngine):
1676 __visit_name__ = "MACADDR"
1679PGMacAddr = MACADDR
1682class MACADDR8(sqltypes.TypeEngine):
1683 __visit_name__ = "MACADDR8"
1686PGMacAddr8 = MACADDR8
1689class MONEY(sqltypes.TypeEngine):
1691 r"""Provide the PostgreSQL MONEY type.
1693 Depending on driver, result rows using this type may return a
1694 string value which includes currency symbols.
1696 For this reason, it may be preferable to provide conversion to a
1697 numerically-based currency datatype using :class:`_types.TypeDecorator`::
1699 import re
1700 import decimal
1701 from sqlalchemy import TypeDecorator
1703 class NumericMoney(TypeDecorator):
1704 impl = MONEY
1706 def process_result_value(self, value: Any, dialect: Any) -> None:
1707 if value is not None:
1708 # adjust this for the currency and numeric
1709 m = re.match(r"\$([\d.]+)", value)
1710 if m:
1711 value = decimal.Decimal(m.group(1))
1712 return value
1714 Alternatively, the conversion may be applied as a CAST using
1715 the :meth:`_types.TypeDecorator.column_expression` method as follows::
1717 import decimal
1718 from sqlalchemy import cast
1719 from sqlalchemy import TypeDecorator
1721 class NumericMoney(TypeDecorator):
1722 impl = MONEY
1724 def column_expression(self, column: Any):
1725 return cast(column, Numeric())
1727 .. versionadded:: 1.2
1729 """
1731 __visit_name__ = "MONEY"
1734class OID(sqltypes.TypeEngine):
1736 """Provide the PostgreSQL OID type.
1738 .. versionadded:: 0.9.5
1740 """
1742 __visit_name__ = "OID"
1745class REGCLASS(sqltypes.TypeEngine):
1747 """Provide the PostgreSQL REGCLASS type.
1749 .. versionadded:: 1.2.7
1751 """
1753 __visit_name__ = "REGCLASS"
1756class TIMESTAMP(sqltypes.TIMESTAMP):
1758 """Provide the PostgreSQL TIMESTAMP type."""
1760 __visit_name__ = "TIMESTAMP"
1762 def __init__(self, timezone=False, precision=None):
1763 """Construct a TIMESTAMP.
1765 :param timezone: boolean value if timezone present, default False
1766 :param precision: optional integer precision value
1768 .. versionadded:: 1.4
1770 """
1771 super(TIMESTAMP, self).__init__(timezone=timezone)
1772 self.precision = precision
1775class TIME(sqltypes.TIME):
1777 """PostgreSQL TIME type."""
1779 __visit_name__ = "TIME"
1781 def __init__(self, timezone=False, precision=None):
1782 """Construct a TIME.
1784 :param timezone: boolean value if timezone present, default False
1785 :param precision: optional integer precision value
1787 .. versionadded:: 1.4
1789 """
1790 super(TIME, self).__init__(timezone=timezone)
1791 self.precision = precision
1794class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval):
1796 """PostgreSQL INTERVAL type."""
1798 __visit_name__ = "INTERVAL"
1799 native = True
1801 def __init__(self, precision=None, fields=None):
1802 """Construct an INTERVAL.
1804 :param precision: optional integer precision value
1805 :param fields: string fields specifier. allows storage of fields
1806 to be limited, such as ``"YEAR"``, ``"MONTH"``, ``"DAY TO HOUR"``,
1807 etc.
1809 .. versionadded:: 1.2
1811 """
1812 self.precision = precision
1813 self.fields = fields
1815 @classmethod
1816 def adapt_emulated_to_native(cls, interval, **kw):
1817 return INTERVAL(precision=interval.second_precision)
1819 @property
1820 def _type_affinity(self):
1821 return sqltypes.Interval
1823 def as_generic(self, allow_nulltype=False):
1824 return sqltypes.Interval(native=True, second_precision=self.precision)
1826 @property
1827 def python_type(self):
1828 return dt.timedelta
1830 def coerce_compared_value(self, op, value):
1831 return self
1834PGInterval = INTERVAL
1837class BIT(sqltypes.TypeEngine):
1838 __visit_name__ = "BIT"
1840 def __init__(self, length=None, varying=False):
1841 if not varying:
1842 # BIT without VARYING defaults to length 1
1843 self.length = length or 1
1844 else:
1845 # but BIT VARYING can be unlimited-length, so no default
1846 self.length = length
1847 self.varying = varying
1850PGBit = BIT
1853class UUID(sqltypes.TypeEngine):
1855 """PostgreSQL UUID type.
1857 Represents the UUID column type, interpreting
1858 data either as natively returned by the DBAPI
1859 or as Python uuid objects.
1861 The UUID type is currently known to work within the prominent DBAPI
1862 drivers supported by SQLAlchemy including psycopg2, pg8000 and
1863 asyncpg. Support for other DBAPI drivers may be incomplete or non-present.
1865 """
1867 __visit_name__ = "UUID"
1869 def __init__(self, as_uuid=False):
1870 """Construct a UUID type.
1873 :param as_uuid=False: if True, values will be interpreted
1874 as Python uuid objects, converting to/from string via the
1875 DBAPI.
1877 """
1878 self.as_uuid = as_uuid
1880 def coerce_compared_value(self, op, value):
1881 """See :meth:`.TypeEngine.coerce_compared_value` for a description."""
1883 if isinstance(value, util.string_types):
1884 return self
1885 else:
1886 return super(UUID, self).coerce_compared_value(op, value)
1888 def bind_processor(self, dialect):
1889 if self.as_uuid:
1891 def process(value):
1892 if value is not None:
1893 value = util.text_type(value)
1894 return value
1896 return process
1897 else:
1898 return None
1900 def result_processor(self, dialect, coltype):
1901 if self.as_uuid:
1903 def process(value):
1904 if value is not None:
1905 value = _python_UUID(value)
1906 return value
1908 return process
1909 else:
1910 return None
1912 def literal_processor(self, dialect):
1913 if self.as_uuid:
1915 def process(value):
1916 if value is not None:
1917 value = "'%s'::UUID" % value
1918 return value
1920 return process
1921 else:
1923 def process(value):
1924 if value is not None:
1925 value = "'%s'" % value
1926 return value
1928 return process
1930 @property
1931 def python_type(self):
1932 return _python_UUID if self.as_uuid else str
1935PGUuid = UUID
1938class TSVECTOR(sqltypes.TypeEngine):
1940 """The :class:`_postgresql.TSVECTOR` type implements the PostgreSQL
1941 text search type TSVECTOR.
1943 It can be used to do full text queries on natural language
1944 documents.
1946 .. versionadded:: 0.9.0
1948 .. seealso::
1950 :ref:`postgresql_match`
1952 """
1954 __visit_name__ = "TSVECTOR"
1957class ENUM(sqltypes.NativeForEmulated, sqltypes.Enum):
1959 """PostgreSQL ENUM type.
1961 This is a subclass of :class:`_types.Enum` which includes
1962 support for PG's ``CREATE TYPE`` and ``DROP TYPE``.
1964 When the builtin type :class:`_types.Enum` is used and the
1965 :paramref:`.Enum.native_enum` flag is left at its default of
1966 True, the PostgreSQL backend will use a :class:`_postgresql.ENUM`
1967 type as the implementation, so the special create/drop rules
1968 will be used.
1970 The create/drop behavior of ENUM is necessarily intricate, due to the
1971 awkward relationship the ENUM type has in relationship to the
1972 parent table, in that it may be "owned" by just a single table, or
1973 may be shared among many tables.
1975 When using :class:`_types.Enum` or :class:`_postgresql.ENUM`
1976 in an "inline" fashion, the ``CREATE TYPE`` and ``DROP TYPE`` is emitted
1977 corresponding to when the :meth:`_schema.Table.create` and
1978 :meth:`_schema.Table.drop`
1979 methods are called::
1981 table = Table('sometable', metadata,
1982 Column('some_enum', ENUM('a', 'b', 'c', name='myenum'))
1983 )
1985 table.create(engine) # will emit CREATE ENUM and CREATE TABLE
1986 table.drop(engine) # will emit DROP TABLE and DROP ENUM
1988 To use a common enumerated type between multiple tables, the best
1989 practice is to declare the :class:`_types.Enum` or
1990 :class:`_postgresql.ENUM` independently, and associate it with the
1991 :class:`_schema.MetaData` object itself::
1993 my_enum = ENUM('a', 'b', 'c', name='myenum', metadata=metadata)
1995 t1 = Table('sometable_one', metadata,
1996 Column('some_enum', myenum)
1997 )
1999 t2 = Table('sometable_two', metadata,
2000 Column('some_enum', myenum)
2001 )
2003 When this pattern is used, care must still be taken at the level
2004 of individual table creates. Emitting CREATE TABLE without also
2005 specifying ``checkfirst=True`` will still cause issues::
2007 t1.create(engine) # will fail: no such type 'myenum'
2009 If we specify ``checkfirst=True``, the individual table-level create
2010 operation will check for the ``ENUM`` and create if not exists::
2012 # will check if enum exists, and emit CREATE TYPE if not
2013 t1.create(engine, checkfirst=True)
2015 When using a metadata-level ENUM type, the type will always be created
2016 and dropped if either the metadata-wide create/drop is called::
2018 metadata.create_all(engine) # will emit CREATE TYPE
2019 metadata.drop_all(engine) # will emit DROP TYPE
2021 The type can also be created and dropped directly::
2023 my_enum.create(engine)
2024 my_enum.drop(engine)
2026 .. versionchanged:: 1.0.0 The PostgreSQL :class:`_postgresql.ENUM` type
2027 now behaves more strictly with regards to CREATE/DROP. A metadata-level
2028 ENUM type will only be created and dropped at the metadata level,
2029 not the table level, with the exception of
2030 ``table.create(checkfirst=True)``.
2031 The ``table.drop()`` call will now emit a DROP TYPE for a table-level
2032 enumerated type.
2034 """
2036 native_enum = True
2038 def __init__(self, *enums, **kw):
2039 """Construct an :class:`_postgresql.ENUM`.
2041 Arguments are the same as that of
2042 :class:`_types.Enum`, but also including
2043 the following parameters.
2045 :param create_type: Defaults to True.
2046 Indicates that ``CREATE TYPE`` should be
2047 emitted, after optionally checking for the
2048 presence of the type, when the parent
2049 table is being created; and additionally
2050 that ``DROP TYPE`` is called when the table
2051 is dropped. When ``False``, no check
2052 will be performed and no ``CREATE TYPE``
2053 or ``DROP TYPE`` is emitted, unless
2054 :meth:`~.postgresql.ENUM.create`
2055 or :meth:`~.postgresql.ENUM.drop`
2056 are called directly.
2057 Setting to ``False`` is helpful
2058 when invoking a creation scheme to a SQL file
2059 without access to the actual database -
2060 the :meth:`~.postgresql.ENUM.create` and
2061 :meth:`~.postgresql.ENUM.drop` methods can
2062 be used to emit SQL to a target bind.
2064 """
2065 native_enum = kw.pop("native_enum", None)
2066 if native_enum is False:
2067 util.warn(
2068 "the native_enum flag does not apply to the "
2069 "sqlalchemy.dialects.postgresql.ENUM datatype; this type "
2070 "always refers to ENUM. Use sqlalchemy.types.Enum for "
2071 "non-native enum."
2072 )
2073 self.create_type = kw.pop("create_type", True)
2074 super(ENUM, self).__init__(*enums, **kw)
2076 @classmethod
2077 def adapt_emulated_to_native(cls, impl, **kw):
2078 """Produce a PostgreSQL native :class:`_postgresql.ENUM` from plain
2079 :class:`.Enum`.
2081 """
2082 kw.setdefault("validate_strings", impl.validate_strings)
2083 kw.setdefault("name", impl.name)
2084 kw.setdefault("schema", impl.schema)
2085 kw.setdefault("inherit_schema", impl.inherit_schema)
2086 kw.setdefault("metadata", impl.metadata)
2087 kw.setdefault("_create_events", False)
2088 kw.setdefault("values_callable", impl.values_callable)
2089 kw.setdefault("omit_aliases", impl._omit_aliases)
2090 return cls(**kw)
2092 def create(self, bind=None, checkfirst=True):
2093 """Emit ``CREATE TYPE`` for this
2094 :class:`_postgresql.ENUM`.
2096 If the underlying dialect does not support
2097 PostgreSQL CREATE TYPE, no action is taken.
2099 :param bind: a connectable :class:`_engine.Engine`,
2100 :class:`_engine.Connection`, or similar object to emit
2101 SQL.
2102 :param checkfirst: if ``True``, a query against
2103 the PG catalog will be first performed to see
2104 if the type does not exist already before
2105 creating.
2107 """
2108 if not bind.dialect.supports_native_enum:
2109 return
2111 bind._run_ddl_visitor(self.EnumGenerator, self, checkfirst=checkfirst)
2113 def drop(self, bind=None, checkfirst=True):
2114 """Emit ``DROP TYPE`` for this
2115 :class:`_postgresql.ENUM`.
2117 If the underlying dialect does not support
2118 PostgreSQL DROP TYPE, no action is taken.
2120 :param bind: a connectable :class:`_engine.Engine`,
2121 :class:`_engine.Connection`, or similar object to emit
2122 SQL.
2123 :param checkfirst: if ``True``, a query against
2124 the PG catalog will be first performed to see
2125 if the type actually exists before dropping.
2127 """
2128 if not bind.dialect.supports_native_enum:
2129 return
2131 bind._run_ddl_visitor(self.EnumDropper, self, checkfirst=checkfirst)
2133 class EnumGenerator(DDLBase):
2134 def __init__(self, dialect, connection, checkfirst=False, **kwargs):
2135 super(ENUM.EnumGenerator, self).__init__(connection, **kwargs)
2136 self.checkfirst = checkfirst
2138 def _can_create_enum(self, enum):
2139 if not self.checkfirst:
2140 return True
2142 effective_schema = self.connection.schema_for_object(enum)
2144 return not self.connection.dialect.has_type(
2145 self.connection, enum.name, schema=effective_schema
2146 )
2148 def visit_enum(self, enum):
2149 if not self._can_create_enum(enum):
2150 return
2152 self.connection.execute(CreateEnumType(enum))
2154 class EnumDropper(DDLBase):
2155 def __init__(self, dialect, connection, checkfirst=False, **kwargs):
2156 super(ENUM.EnumDropper, self).__init__(connection, **kwargs)
2157 self.checkfirst = checkfirst
2159 def _can_drop_enum(self, enum):
2160 if not self.checkfirst:
2161 return True
2163 effective_schema = self.connection.schema_for_object(enum)
2165 return self.connection.dialect.has_type(
2166 self.connection, enum.name, schema=effective_schema
2167 )
2169 def visit_enum(self, enum):
2170 if not self._can_drop_enum(enum):
2171 return
2173 self.connection.execute(DropEnumType(enum))
2175 def _check_for_name_in_memos(self, checkfirst, kw):
2176 """Look in the 'ddl runner' for 'memos', then
2177 note our name in that collection.
2179 This to ensure a particular named enum is operated
2180 upon only once within any kind of create/drop
2181 sequence without relying upon "checkfirst".
2183 """
2184 if not self.create_type:
2185 return True
2186 if "_ddl_runner" in kw:
2187 ddl_runner = kw["_ddl_runner"]
2188 if "_pg_enums" in ddl_runner.memo:
2189 pg_enums = ddl_runner.memo["_pg_enums"]
2190 else:
2191 pg_enums = ddl_runner.memo["_pg_enums"] = set()
2192 present = (self.schema, self.name) in pg_enums
2193 pg_enums.add((self.schema, self.name))
2194 return present
2195 else:
2196 return False
2198 def _on_table_create(self, target, bind, checkfirst=False, **kw):
2199 if (
2200 checkfirst
2201 or (
2202 not self.metadata
2203 and not kw.get("_is_metadata_operation", False)
2204 )
2205 ) and not self._check_for_name_in_memos(checkfirst, kw):
2206 self.create(bind=bind, checkfirst=checkfirst)
2208 def _on_table_drop(self, target, bind, checkfirst=False, **kw):
2209 if (
2210 not self.metadata
2211 and not kw.get("_is_metadata_operation", False)
2212 and not self._check_for_name_in_memos(checkfirst, kw)
2213 ):
2214 self.drop(bind=bind, checkfirst=checkfirst)
2216 def _on_metadata_create(self, target, bind, checkfirst=False, **kw):
2217 if not self._check_for_name_in_memos(checkfirst, kw):
2218 self.create(bind=bind, checkfirst=checkfirst)
2220 def _on_metadata_drop(self, target, bind, checkfirst=False, **kw):
2221 if not self._check_for_name_in_memos(checkfirst, kw):
2222 self.drop(bind=bind, checkfirst=checkfirst)
2225class _ColonCast(elements.Cast):
2226 __visit_name__ = "colon_cast"
2228 def __init__(self, expression, type_):
2229 self.type = type_
2230 self.clause = expression
2231 self.typeclause = elements.TypeClause(type_)
2234colspecs = {
2235 sqltypes.ARRAY: _array.ARRAY,
2236 sqltypes.Interval: INTERVAL,
2237 sqltypes.Enum: ENUM,
2238 sqltypes.JSON.JSONPathType: _json.JSONPathType,
2239 sqltypes.JSON: _json.JSON,
2240}
2243ischema_names = {
2244 "_array": _array.ARRAY,
2245 "hstore": _hstore.HSTORE,
2246 "json": _json.JSON,
2247 "jsonb": _json.JSONB,
2248 "int4range": _ranges.INT4RANGE,
2249 "int8range": _ranges.INT8RANGE,
2250 "numrange": _ranges.NUMRANGE,
2251 "daterange": _ranges.DATERANGE,
2252 "tsrange": _ranges.TSRANGE,
2253 "tstzrange": _ranges.TSTZRANGE,
2254 "integer": INTEGER,
2255 "bigint": BIGINT,
2256 "smallint": SMALLINT,
2257 "character varying": VARCHAR,
2258 "character": CHAR,
2259 '"char"': sqltypes.String,
2260 "name": sqltypes.String,
2261 "text": TEXT,
2262 "numeric": NUMERIC,
2263 "float": FLOAT,
2264 "real": REAL,
2265 "inet": INET,
2266 "cidr": CIDR,
2267 "uuid": UUID,
2268 "bit": BIT,
2269 "bit varying": BIT,
2270 "macaddr": MACADDR,
2271 "macaddr8": MACADDR8,
2272 "money": MONEY,
2273 "oid": OID,
2274 "regclass": REGCLASS,
2275 "double precision": DOUBLE_PRECISION,
2276 "timestamp": TIMESTAMP,
2277 "timestamp with time zone": TIMESTAMP,
2278 "timestamp without time zone": TIMESTAMP,
2279 "time with time zone": TIME,
2280 "time without time zone": TIME,
2281 "date": DATE,
2282 "time": TIME,
2283 "bytea": BYTEA,
2284 "boolean": BOOLEAN,
2285 "interval": INTERVAL,
2286 "tsvector": TSVECTOR,
2287}
2290class PGCompiler(compiler.SQLCompiler):
2291 def visit_colon_cast(self, element, **kw):
2292 return "%s::%s" % (
2293 element.clause._compiler_dispatch(self, **kw),
2294 element.typeclause._compiler_dispatch(self, **kw),
2295 )
2297 def visit_array(self, element, **kw):
2298 return "ARRAY[%s]" % self.visit_clauselist(element, **kw)
2300 def visit_slice(self, element, **kw):
2301 return "%s:%s" % (
2302 self.process(element.start, **kw),
2303 self.process(element.stop, **kw),
2304 )
2306 def visit_json_getitem_op_binary(
2307 self, binary, operator, _cast_applied=False, **kw
2308 ):
2309 if (
2310 not _cast_applied
2311 and binary.type._type_affinity is not sqltypes.JSON
2312 ):
2313 kw["_cast_applied"] = True
2314 return self.process(sql.cast(binary, binary.type), **kw)
2316 kw["eager_grouping"] = True
2318 return self._generate_generic_binary(
2319 binary, " -> " if not _cast_applied else " ->> ", **kw
2320 )
2322 def visit_json_path_getitem_op_binary(
2323 self, binary, operator, _cast_applied=False, **kw
2324 ):
2325 if (
2326 not _cast_applied
2327 and binary.type._type_affinity is not sqltypes.JSON
2328 ):
2329 kw["_cast_applied"] = True
2330 return self.process(sql.cast(binary, binary.type), **kw)
2332 kw["eager_grouping"] = True
2333 return self._generate_generic_binary(
2334 binary, " #> " if not _cast_applied else " #>> ", **kw
2335 )
2337 def visit_getitem_binary(self, binary, operator, **kw):
2338 return "%s[%s]" % (
2339 self.process(binary.left, **kw),
2340 self.process(binary.right, **kw),
2341 )
2343 def visit_aggregate_order_by(self, element, **kw):
2344 return "%s ORDER BY %s" % (
2345 self.process(element.target, **kw),
2346 self.process(element.order_by, **kw),
2347 )
2349 def visit_match_op_binary(self, binary, operator, **kw):
2350 if "postgresql_regconfig" in binary.modifiers:
2351 regconfig = self.render_literal_value(
2352 binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE
2353 )
2354 if regconfig:
2355 return "%s @@ to_tsquery(%s, %s)" % (
2356 self.process(binary.left, **kw),
2357 regconfig,
2358 self.process(binary.right, **kw),
2359 )
2360 return "%s @@ to_tsquery(%s)" % (
2361 self.process(binary.left, **kw),
2362 self.process(binary.right, **kw),
2363 )
2365 def visit_ilike_op_binary(self, binary, operator, **kw):
2366 escape = binary.modifiers.get("escape", None)
2368 return "%s ILIKE %s" % (
2369 self.process(binary.left, **kw),
2370 self.process(binary.right, **kw),
2371 ) + (
2372 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
2373 if escape
2374 else ""
2375 )
2377 def visit_not_ilike_op_binary(self, binary, operator, **kw):
2378 escape = binary.modifiers.get("escape", None)
2379 return "%s NOT ILIKE %s" % (
2380 self.process(binary.left, **kw),
2381 self.process(binary.right, **kw),
2382 ) + (
2383 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
2384 if escape
2385 else ""
2386 )
2388 def _regexp_match(self, base_op, binary, operator, kw):
2389 flags = binary.modifiers["flags"]
2390 if flags is None:
2391 return self._generate_generic_binary(
2392 binary, " %s " % base_op, **kw
2393 )
2394 if isinstance(flags, elements.BindParameter) and flags.value == "i":
2395 return self._generate_generic_binary(
2396 binary, " %s* " % base_op, **kw
2397 )
2398 return "%s %s CONCAT('(?', %s, ')', %s)" % (
2399 self.process(binary.left, **kw),
2400 base_op,
2401 self.process(flags, **kw),
2402 self.process(binary.right, **kw),
2403 )
2405 def visit_regexp_match_op_binary(self, binary, operator, **kw):
2406 return self._regexp_match("~", binary, operator, kw)
2408 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
2409 return self._regexp_match("!~", binary, operator, kw)
2411 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
2412 string = self.process(binary.left, **kw)
2413 pattern = self.process(binary.right, **kw)
2414 flags = binary.modifiers["flags"]
2415 replacement = self.process(binary.modifiers["replacement"], **kw)
2416 if flags is None:
2417 return "REGEXP_REPLACE(%s, %s, %s)" % (
2418 string,
2419 pattern,
2420 replacement,
2421 )
2422 else:
2423 return "REGEXP_REPLACE(%s, %s, %s, %s)" % (
2424 string,
2425 pattern,
2426 replacement,
2427 self.process(flags, **kw),
2428 )
2430 def visit_empty_set_expr(self, element_types):
2431 # cast the empty set to the type we are comparing against. if
2432 # we are comparing against the null type, pick an arbitrary
2433 # datatype for the empty set
2434 return "SELECT %s WHERE 1!=1" % (
2435 ", ".join(
2436 "CAST(NULL AS %s)"
2437 % self.dialect.type_compiler.process(
2438 INTEGER() if type_._isnull else type_
2439 )
2440 for type_ in element_types or [INTEGER()]
2441 ),
2442 )
2444 def render_literal_value(self, value, type_):
2445 value = super(PGCompiler, self).render_literal_value(value, type_)
2447 if self.dialect._backslash_escapes:
2448 value = value.replace("\\", "\\\\")
2449 return value
2451 def visit_sequence(self, seq, **kw):
2452 return "nextval('%s')" % self.preparer.format_sequence(seq)
2454 def limit_clause(self, select, **kw):
2455 text = ""
2456 if select._limit_clause is not None:
2457 text += " \n LIMIT " + self.process(select._limit_clause, **kw)
2458 if select._offset_clause is not None:
2459 if select._limit_clause is None:
2460 text += "\n LIMIT ALL"
2461 text += " OFFSET " + self.process(select._offset_clause, **kw)
2462 return text
2464 def format_from_hint_text(self, sqltext, table, hint, iscrud):
2465 if hint.upper() != "ONLY":
2466 raise exc.CompileError("Unrecognized hint: %r" % hint)
2467 return "ONLY " + sqltext
2469 def get_select_precolumns(self, select, **kw):
2470 # Do not call super().get_select_precolumns because
2471 # it will warn/raise when distinct on is present
2472 if select._distinct or select._distinct_on:
2473 if select._distinct_on:
2474 return (
2475 "DISTINCT ON ("
2476 + ", ".join(
2477 [
2478 self.process(col, **kw)
2479 for col in select._distinct_on
2480 ]
2481 )
2482 + ") "
2483 )
2484 else:
2485 return "DISTINCT "
2486 else:
2487 return ""
2489 def for_update_clause(self, select, **kw):
2491 if select._for_update_arg.read:
2492 if select._for_update_arg.key_share:
2493 tmp = " FOR KEY SHARE"
2494 else:
2495 tmp = " FOR SHARE"
2496 elif select._for_update_arg.key_share:
2497 tmp = " FOR NO KEY UPDATE"
2498 else:
2499 tmp = " FOR UPDATE"
2501 if select._for_update_arg.of:
2503 tables = util.OrderedSet()
2504 for c in select._for_update_arg.of:
2505 tables.update(sql_util.surface_selectables_only(c))
2507 tmp += " OF " + ", ".join(
2508 self.process(table, ashint=True, use_schema=False, **kw)
2509 for table in tables
2510 )
2512 if select._for_update_arg.nowait:
2513 tmp += " NOWAIT"
2514 if select._for_update_arg.skip_locked:
2515 tmp += " SKIP LOCKED"
2517 return tmp
2519 def returning_clause(self, stmt, returning_cols):
2521 columns = [
2522 self._label_returning_column(
2523 stmt, c, fallback_label_name=c._non_anon_label
2524 )
2525 for c in expression._select_iterables(returning_cols)
2526 ]
2528 return "RETURNING " + ", ".join(columns)
2530 def visit_substring_func(self, func, **kw):
2531 s = self.process(func.clauses.clauses[0], **kw)
2532 start = self.process(func.clauses.clauses[1], **kw)
2533 if len(func.clauses.clauses) > 2:
2534 length = self.process(func.clauses.clauses[2], **kw)
2535 return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length)
2536 else:
2537 return "SUBSTRING(%s FROM %s)" % (s, start)
2539 def _on_conflict_target(self, clause, **kw):
2541 if clause.constraint_target is not None:
2542 # target may be a name of an Index, UniqueConstraint or
2543 # ExcludeConstraint. While there is a separate
2544 # "max_identifier_length" for indexes, PostgreSQL uses the same
2545 # length for all objects so we can use
2546 # truncate_and_render_constraint_name
2547 target_text = (
2548 "ON CONSTRAINT %s"
2549 % self.preparer.truncate_and_render_constraint_name(
2550 clause.constraint_target
2551 )
2552 )
2553 elif clause.inferred_target_elements is not None:
2554 target_text = "(%s)" % ", ".join(
2555 (
2556 self.preparer.quote(c)
2557 if isinstance(c, util.string_types)
2558 else self.process(c, include_table=False, use_schema=False)
2559 )
2560 for c in clause.inferred_target_elements
2561 )
2562 if clause.inferred_target_whereclause is not None:
2563 target_text += " WHERE %s" % self.process(
2564 clause.inferred_target_whereclause,
2565 include_table=False,
2566 use_schema=False,
2567 )
2568 else:
2569 target_text = ""
2571 return target_text
2573 @util.memoized_property
2574 def _is_safe_for_fast_insert_values_helper(self):
2575 # don't allow fast executemany if _post_values_clause is
2576 # present and is not an OnConflictDoNothing. what this means
2577 # concretely is that the
2578 # "fast insert executemany helper" won't be used, in other
2579 # words we won't convert "executemany()" of many parameter
2580 # sets into a single INSERT with many elements in VALUES.
2581 # We can't apply that optimization safely if for example the
2582 # statement includes a clause like "ON CONFLICT DO UPDATE"
2584 return self.insert_single_values_expr is not None and (
2585 self.statement._post_values_clause is None
2586 or isinstance(
2587 self.statement._post_values_clause, dml.OnConflictDoNothing
2588 )
2589 )
2591 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
2593 target_text = self._on_conflict_target(on_conflict, **kw)
2595 if target_text:
2596 return "ON CONFLICT %s DO NOTHING" % target_text
2597 else:
2598 return "ON CONFLICT DO NOTHING"
2600 def visit_on_conflict_do_update(self, on_conflict, **kw):
2602 clause = on_conflict
2604 target_text = self._on_conflict_target(on_conflict, **kw)
2606 action_set_ops = []
2608 set_parameters = dict(clause.update_values_to_set)
2609 # create a list of column assignment clauses as tuples
2611 insert_statement = self.stack[-1]["selectable"]
2612 cols = insert_statement.table.c
2613 for c in cols:
2614 col_key = c.key
2616 if col_key in set_parameters:
2617 value = set_parameters.pop(col_key)
2618 elif c in set_parameters:
2619 value = set_parameters.pop(c)
2620 else:
2621 continue
2623 if coercions._is_literal(value):
2624 value = elements.BindParameter(None, value, type_=c.type)
2626 else:
2627 if (
2628 isinstance(value, elements.BindParameter)
2629 and value.type._isnull
2630 ):
2631 value = value._clone()
2632 value.type = c.type
2633 value_text = self.process(value.self_group(), use_schema=False)
2635 key_text = self.preparer.quote(c.name)
2636 action_set_ops.append("%s = %s" % (key_text, value_text))
2638 # check for names that don't match columns
2639 if set_parameters:
2640 util.warn(
2641 "Additional column names not matching "
2642 "any column keys in table '%s': %s"
2643 % (
2644 self.current_executable.table.name,
2645 (", ".join("'%s'" % c for c in set_parameters)),
2646 )
2647 )
2648 for k, v in set_parameters.items():
2649 key_text = (
2650 self.preparer.quote(k)
2651 if isinstance(k, util.string_types)
2652 else self.process(k, use_schema=False)
2653 )
2654 value_text = self.process(
2655 coercions.expect(roles.ExpressionElementRole, v),
2656 use_schema=False,
2657 )
2658 action_set_ops.append("%s = %s" % (key_text, value_text))
2660 action_text = ", ".join(action_set_ops)
2661 if clause.update_whereclause is not None:
2662 action_text += " WHERE %s" % self.process(
2663 clause.update_whereclause, include_table=True, use_schema=False
2664 )
2666 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
2668 def update_from_clause(
2669 self, update_stmt, from_table, extra_froms, from_hints, **kw
2670 ):
2671 kw["asfrom"] = True
2672 return "FROM " + ", ".join(
2673 t._compiler_dispatch(self, fromhints=from_hints, **kw)
2674 for t in extra_froms
2675 )
2677 def delete_extra_from_clause(
2678 self, delete_stmt, from_table, extra_froms, from_hints, **kw
2679 ):
2680 """Render the DELETE .. USING clause specific to PostgreSQL."""
2681 kw["asfrom"] = True
2682 return "USING " + ", ".join(
2683 t._compiler_dispatch(self, fromhints=from_hints, **kw)
2684 for t in extra_froms
2685 )
2687 def fetch_clause(self, select, **kw):
2688 # pg requires parens for non literal clauses. It's also required for
2689 # bind parameters if a ::type casts is used by the driver (asyncpg),
2690 # so it's easiest to just always add it
2691 text = ""
2692 if select._offset_clause is not None:
2693 text += "\n OFFSET (%s) ROWS" % self.process(
2694 select._offset_clause, **kw
2695 )
2696 if select._fetch_clause is not None:
2697 text += "\n FETCH FIRST (%s)%s ROWS %s" % (
2698 self.process(select._fetch_clause, **kw),
2699 " PERCENT" if select._fetch_clause_options["percent"] else "",
2700 "WITH TIES"
2701 if select._fetch_clause_options["with_ties"]
2702 else "ONLY",
2703 )
2704 return text
2707class PGDDLCompiler(compiler.DDLCompiler):
2708 def get_column_specification(self, column, **kwargs):
2710 colspec = self.preparer.format_column(column)
2711 impl_type = column.type.dialect_impl(self.dialect)
2712 if isinstance(impl_type, sqltypes.TypeDecorator):
2713 impl_type = impl_type.impl
2715 has_identity = (
2716 column.identity is not None
2717 and self.dialect.supports_identity_columns
2718 )
2720 if (
2721 column.primary_key
2722 and column is column.table._autoincrement_column
2723 and (
2724 self.dialect.supports_smallserial
2725 or not isinstance(impl_type, sqltypes.SmallInteger)
2726 )
2727 and not has_identity
2728 and (
2729 column.default is None
2730 or (
2731 isinstance(column.default, schema.Sequence)
2732 and column.default.optional
2733 )
2734 )
2735 ):
2736 if isinstance(impl_type, sqltypes.BigInteger):
2737 colspec += " BIGSERIAL"
2738 elif isinstance(impl_type, sqltypes.SmallInteger):
2739 colspec += " SMALLSERIAL"
2740 else:
2741 colspec += " SERIAL"
2742 else:
2743 colspec += " " + self.dialect.type_compiler.process(
2744 column.type,
2745 type_expression=column,
2746 identifier_preparer=self.preparer,
2747 )
2748 default = self.get_column_default_string(column)
2749 if default is not None:
2750 colspec += " DEFAULT " + default
2752 if column.computed is not None:
2753 colspec += " " + self.process(column.computed)
2754 if has_identity:
2755 colspec += " " + self.process(column.identity)
2757 if not column.nullable and not has_identity:
2758 colspec += " NOT NULL"
2759 elif column.nullable and has_identity:
2760 colspec += " NULL"
2761 return colspec
2763 def _define_constraint_validity(self, constraint):
2764 not_valid = constraint.dialect_options["postgresql"]["not_valid"]
2765 return " NOT VALID" if not_valid else ""
2767 def visit_check_constraint(self, constraint):
2768 if constraint._type_bound:
2769 typ = list(constraint.columns)[0].type
2770 if (
2771 isinstance(typ, sqltypes.ARRAY)
2772 and isinstance(typ.item_type, sqltypes.Enum)
2773 and not typ.item_type.native_enum
2774 ):
2775 raise exc.CompileError(
2776 "PostgreSQL dialect cannot produce the CHECK constraint "
2777 "for ARRAY of non-native ENUM; please specify "
2778 "create_constraint=False on this Enum datatype."
2779 )
2781 text = super(PGDDLCompiler, self).visit_check_constraint(constraint)
2782 text += self._define_constraint_validity(constraint)
2783 return text
2785 def visit_foreign_key_constraint(self, constraint):
2786 text = super(PGDDLCompiler, self).visit_foreign_key_constraint(
2787 constraint
2788 )
2789 text += self._define_constraint_validity(constraint)
2790 return text
2792 def visit_drop_table_comment(self, drop):
2793 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
2794 drop.element
2795 )
2797 def visit_create_enum_type(self, create):
2798 type_ = create.element
2800 return "CREATE TYPE %s AS ENUM (%s)" % (
2801 self.preparer.format_type(type_),
2802 ", ".join(
2803 self.sql_compiler.process(sql.literal(e), literal_binds=True)
2804 for e in type_.enums
2805 ),
2806 )
2808 def visit_drop_enum_type(self, drop):
2809 type_ = drop.element
2811 return "DROP TYPE %s" % (self.preparer.format_type(type_))
2813 def visit_create_index(self, create):
2814 preparer = self.preparer
2815 index = create.element
2816 self._verify_index_table(index)
2817 text = "CREATE "
2818 if index.unique:
2819 text += "UNIQUE "
2820 text += "INDEX "
2822 if self.dialect._supports_create_index_concurrently:
2823 concurrently = index.dialect_options["postgresql"]["concurrently"]
2824 if concurrently:
2825 text += "CONCURRENTLY "
2827 if create.if_not_exists:
2828 text += "IF NOT EXISTS "
2830 text += "%s ON %s " % (
2831 self._prepared_index_name(index, include_schema=False),
2832 preparer.format_table(index.table),
2833 )
2835 using = index.dialect_options["postgresql"]["using"]
2836 if using:
2837 text += (
2838 "USING %s "
2839 % self.preparer.validate_sql_phrase(using, IDX_USING).lower()
2840 )
2842 ops = index.dialect_options["postgresql"]["ops"]
2843 text += "(%s)" % (
2844 ", ".join(
2845 [
2846 self.sql_compiler.process(
2847 expr.self_group()
2848 if not isinstance(expr, expression.ColumnClause)
2849 else expr,
2850 include_table=False,
2851 literal_binds=True,
2852 )
2853 + (
2854 (" " + ops[expr.key])
2855 if hasattr(expr, "key") and expr.key in ops
2856 else ""
2857 )
2858 for expr in index.expressions
2859 ]
2860 )
2861 )
2863 includeclause = index.dialect_options["postgresql"]["include"]
2864 if includeclause:
2865 inclusions = [
2866 index.table.c[col]
2867 if isinstance(col, util.string_types)
2868 else col
2869 for col in includeclause
2870 ]
2871 text += " INCLUDE (%s)" % ", ".join(
2872 [preparer.quote(c.name) for c in inclusions]
2873 )
2875 withclause = index.dialect_options["postgresql"]["with"]
2876 if withclause:
2877 text += " WITH (%s)" % (
2878 ", ".join(
2879 [
2880 "%s = %s" % storage_parameter
2881 for storage_parameter in withclause.items()
2882 ]
2883 )
2884 )
2886 tablespace_name = index.dialect_options["postgresql"]["tablespace"]
2887 if tablespace_name:
2888 text += " TABLESPACE %s" % preparer.quote(tablespace_name)
2890 whereclause = index.dialect_options["postgresql"]["where"]
2891 if whereclause is not None:
2892 whereclause = coercions.expect(
2893 roles.DDLExpressionRole, whereclause
2894 )
2896 where_compiled = self.sql_compiler.process(
2897 whereclause, include_table=False, literal_binds=True
2898 )
2899 text += " WHERE " + where_compiled
2901 return text
2903 def visit_drop_index(self, drop):
2904 index = drop.element
2906 text = "\nDROP INDEX "
2908 if self.dialect._supports_drop_index_concurrently:
2909 concurrently = index.dialect_options["postgresql"]["concurrently"]
2910 if concurrently:
2911 text += "CONCURRENTLY "
2913 if drop.if_exists:
2914 text += "IF EXISTS "
2916 text += self._prepared_index_name(index, include_schema=True)
2917 return text
2919 def visit_exclude_constraint(self, constraint, **kw):
2920 text = ""
2921 if constraint.name is not None:
2922 text += "CONSTRAINT %s " % self.preparer.format_constraint(
2923 constraint
2924 )
2925 elements = []
2926 for expr, name, op in constraint._render_exprs:
2927 kw["include_table"] = False
2928 exclude_element = self.sql_compiler.process(expr, **kw) + (
2929 (" " + constraint.ops[expr.key])
2930 if hasattr(expr, "key") and expr.key in constraint.ops
2931 else ""
2932 )
2934 elements.append("%s WITH %s" % (exclude_element, op))
2935 text += "EXCLUDE USING %s (%s)" % (
2936 self.preparer.validate_sql_phrase(
2937 constraint.using, IDX_USING
2938 ).lower(),
2939 ", ".join(elements),
2940 )
2941 if constraint.where is not None:
2942 text += " WHERE (%s)" % self.sql_compiler.process(
2943 constraint.where, literal_binds=True
2944 )
2945 text += self.define_constraint_deferrability(constraint)
2946 return text
2948 def post_create_table(self, table):
2949 table_opts = []
2950 pg_opts = table.dialect_options["postgresql"]
2952 inherits = pg_opts.get("inherits")
2953 if inherits is not None:
2954 if not isinstance(inherits, (list, tuple)):
2955 inherits = (inherits,)
2956 table_opts.append(
2957 "\n INHERITS ( "
2958 + ", ".join(self.preparer.quote(name) for name in inherits)
2959 + " )"
2960 )
2962 if pg_opts["partition_by"]:
2963 table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"])
2965 if pg_opts["with_oids"] is True:
2966 table_opts.append("\n WITH OIDS")
2967 elif pg_opts["with_oids"] is False:
2968 table_opts.append("\n WITHOUT OIDS")
2970 if pg_opts["on_commit"]:
2971 on_commit_options = pg_opts["on_commit"].replace("_", " ").upper()
2972 table_opts.append("\n ON COMMIT %s" % on_commit_options)
2974 if pg_opts["tablespace"]:
2975 tablespace_name = pg_opts["tablespace"]
2976 table_opts.append(
2977 "\n TABLESPACE %s" % self.preparer.quote(tablespace_name)
2978 )
2980 return "".join(table_opts)
2982 def visit_computed_column(self, generated):
2983 if generated.persisted is False:
2984 raise exc.CompileError(
2985 "PostrgreSQL computed columns do not support 'virtual' "
2986 "persistence; set the 'persisted' flag to None or True for "
2987 "PostgreSQL support."
2988 )
2990 return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process(
2991 generated.sqltext, include_table=False, literal_binds=True
2992 )
2994 def visit_create_sequence(self, create, **kw):
2995 prefix = None
2996 if create.element.data_type is not None:
2997 prefix = " AS %s" % self.type_compiler.process(
2998 create.element.data_type
2999 )
3001 return super(PGDDLCompiler, self).visit_create_sequence(
3002 create, prefix=prefix, **kw
3003 )
3006class PGTypeCompiler(compiler.GenericTypeCompiler):
3007 def visit_TSVECTOR(self, type_, **kw):
3008 return "TSVECTOR"
3010 def visit_INET(self, type_, **kw):
3011 return "INET"
3013 def visit_CIDR(self, type_, **kw):
3014 return "CIDR"
3016 def visit_MACADDR(self, type_, **kw):
3017 return "MACADDR"
3019 def visit_MACADDR8(self, type_, **kw):
3020 return "MACADDR8"
3022 def visit_MONEY(self, type_, **kw):
3023 return "MONEY"
3025 def visit_OID(self, type_, **kw):
3026 return "OID"
3028 def visit_REGCLASS(self, type_, **kw):
3029 return "REGCLASS"
3031 def visit_FLOAT(self, type_, **kw):
3032 if not type_.precision:
3033 return "FLOAT"
3034 else:
3035 return "FLOAT(%(precision)s)" % {"precision": type_.precision}
3037 def visit_DOUBLE_PRECISION(self, type_, **kw):
3038 return "DOUBLE PRECISION"
3040 def visit_BIGINT(self, type_, **kw):
3041 return "BIGINT"
3043 def visit_HSTORE(self, type_, **kw):
3044 return "HSTORE"
3046 def visit_JSON(self, type_, **kw):
3047 return "JSON"
3049 def visit_JSONB(self, type_, **kw):
3050 return "JSONB"
3052 def visit_INT4RANGE(self, type_, **kw):
3053 return "INT4RANGE"
3055 def visit_INT8RANGE(self, type_, **kw):
3056 return "INT8RANGE"
3058 def visit_NUMRANGE(self, type_, **kw):
3059 return "NUMRANGE"
3061 def visit_DATERANGE(self, type_, **kw):
3062 return "DATERANGE"
3064 def visit_TSRANGE(self, type_, **kw):
3065 return "TSRANGE"
3067 def visit_TSTZRANGE(self, type_, **kw):
3068 return "TSTZRANGE"
3070 def visit_datetime(self, type_, **kw):
3071 return self.visit_TIMESTAMP(type_, **kw)
3073 def visit_enum(self, type_, **kw):
3074 if not type_.native_enum or not self.dialect.supports_native_enum:
3075 return super(PGTypeCompiler, self).visit_enum(type_, **kw)
3076 else:
3077 return self.visit_ENUM(type_, **kw)
3079 def visit_ENUM(self, type_, identifier_preparer=None, **kw):
3080 if identifier_preparer is None:
3081 identifier_preparer = self.dialect.identifier_preparer
3082 return identifier_preparer.format_type(type_)
3084 def visit_TIMESTAMP(self, type_, **kw):
3085 return "TIMESTAMP%s %s" % (
3086 "(%d)" % type_.precision
3087 if getattr(type_, "precision", None) is not None
3088 else "",
3089 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
3090 )
3092 def visit_TIME(self, type_, **kw):
3093 return "TIME%s %s" % (
3094 "(%d)" % type_.precision
3095 if getattr(type_, "precision", None) is not None
3096 else "",
3097 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
3098 )
3100 def visit_INTERVAL(self, type_, **kw):
3101 text = "INTERVAL"
3102 if type_.fields is not None:
3103 text += " " + type_.fields
3104 if type_.precision is not None:
3105 text += " (%d)" % type_.precision
3106 return text
3108 def visit_BIT(self, type_, **kw):
3109 if type_.varying:
3110 compiled = "BIT VARYING"
3111 if type_.length is not None:
3112 compiled += "(%d)" % type_.length
3113 else:
3114 compiled = "BIT(%d)" % type_.length
3115 return compiled
3117 def visit_UUID(self, type_, **kw):
3118 return "UUID"
3120 def visit_large_binary(self, type_, **kw):
3121 return self.visit_BYTEA(type_, **kw)
3123 def visit_BYTEA(self, type_, **kw):
3124 return "BYTEA"
3126 def visit_ARRAY(self, type_, **kw):
3128 inner = self.process(type_.item_type, **kw)
3129 return re.sub(
3130 r"((?: COLLATE.*)?)$",
3131 (
3132 r"%s\1"
3133 % (
3134 "[]"
3135 * (type_.dimensions if type_.dimensions is not None else 1)
3136 )
3137 ),
3138 inner,
3139 count=1,
3140 )
3143class PGIdentifierPreparer(compiler.IdentifierPreparer):
3145 reserved_words = RESERVED_WORDS
3147 def _unquote_identifier(self, value):
3148 if value[0] == self.initial_quote:
3149 value = value[1:-1].replace(
3150 self.escape_to_quote, self.escape_quote
3151 )
3152 return value
3154 def format_type(self, type_, use_schema=True):
3155 if not type_.name:
3156 raise exc.CompileError("PostgreSQL ENUM type requires a name.")
3158 name = self.quote(type_.name)
3159 effective_schema = self.schema_for_object(type_)
3161 if (
3162 not self.omit_schema
3163 and use_schema
3164 and effective_schema is not None
3165 ):
3166 name = self.quote_schema(effective_schema) + "." + name
3167 return name
3170class PGInspector(reflection.Inspector):
3171 def get_table_oid(self, table_name, schema=None):
3172 """Return the OID for the given table name."""
3174 with self._operation_context() as conn:
3175 return self.dialect.get_table_oid(
3176 conn, table_name, schema, info_cache=self.info_cache
3177 )
3179 def get_enums(self, schema=None):
3180 """Return a list of ENUM objects.
3182 Each member is a dictionary containing these fields:
3184 * name - name of the enum
3185 * schema - the schema name for the enum.
3186 * visible - boolean, whether or not this enum is visible
3187 in the default search path.
3188 * labels - a list of string labels that apply to the enum.
3190 :param schema: schema name. If None, the default schema
3191 (typically 'public') is used. May also be set to '*' to
3192 indicate load enums for all schemas.
3194 .. versionadded:: 1.0.0
3196 """
3197 schema = schema or self.default_schema_name
3198 with self._operation_context() as conn:
3199 return self.dialect._load_enums(conn, schema)
3201 def get_foreign_table_names(self, schema=None):
3202 """Return a list of FOREIGN TABLE names.
3204 Behavior is similar to that of
3205 :meth:`_reflection.Inspector.get_table_names`,
3206 except that the list is limited to those tables that report a
3207 ``relkind`` value of ``f``.
3209 .. versionadded:: 1.0.0
3211 """
3212 schema = schema or self.default_schema_name
3213 with self._operation_context() as conn:
3214 return self.dialect._get_foreign_table_names(conn, schema)
3216 def get_view_names(self, schema=None, include=("plain", "materialized")):
3217 """Return all view names in `schema`.
3219 :param schema: Optional, retrieve names from a non-default schema.
3220 For special quoting, use :class:`.quoted_name`.
3222 :param include: specify which types of views to return. Passed
3223 as a string value (for a single type) or a tuple (for any number
3224 of types). Defaults to ``('plain', 'materialized')``.
3226 .. versionadded:: 1.1
3228 """
3230 with self._operation_context() as conn:
3231 return self.dialect.get_view_names(
3232 conn, schema, info_cache=self.info_cache, include=include
3233 )
3236class CreateEnumType(schema._CreateDropBase):
3237 __visit_name__ = "create_enum_type"
3240class DropEnumType(schema._CreateDropBase):
3241 __visit_name__ = "drop_enum_type"
3244class PGExecutionContext(default.DefaultExecutionContext):
3245 def fire_sequence(self, seq, type_):
3246 return self._execute_scalar(
3247 (
3248 "select nextval('%s')"
3249 % self.identifier_preparer.format_sequence(seq)
3250 ),
3251 type_,
3252 )
3254 def get_insert_default(self, column):
3255 if column.primary_key and column is column.table._autoincrement_column:
3256 if column.server_default and column.server_default.has_argument:
3258 # pre-execute passive defaults on primary key columns
3259 return self._execute_scalar(
3260 "select %s" % column.server_default.arg, column.type
3261 )
3263 elif column.default is None or (
3264 column.default.is_sequence and column.default.optional
3265 ):
3266 # execute the sequence associated with a SERIAL primary
3267 # key column. for non-primary-key SERIAL, the ID just
3268 # generates server side.
3270 try:
3271 seq_name = column._postgresql_seq_name
3272 except AttributeError:
3273 tab = column.table.name
3274 col = column.name
3275 tab = tab[0 : 29 + max(0, (29 - len(col)))]
3276 col = col[0 : 29 + max(0, (29 - len(tab)))]
3277 name = "%s_%s_seq" % (tab, col)
3278 column._postgresql_seq_name = seq_name = name
3280 if column.table is not None:
3281 effective_schema = self.connection.schema_for_object(
3282 column.table
3283 )
3284 else:
3285 effective_schema = None
3287 if effective_schema is not None:
3288 exc = 'select nextval(\'"%s"."%s"\')' % (
3289 effective_schema,
3290 seq_name,
3291 )
3292 else:
3293 exc = "select nextval('\"%s\"')" % (seq_name,)
3295 return self._execute_scalar(exc, column.type)
3297 return super(PGExecutionContext, self).get_insert_default(column)
3299 def should_autocommit_text(self, statement):
3300 return AUTOCOMMIT_REGEXP.match(statement)
3303class PGReadOnlyConnectionCharacteristic(
3304 characteristics.ConnectionCharacteristic
3305):
3306 transactional = True
3308 def reset_characteristic(self, dialect, dbapi_conn):
3309 dialect.set_readonly(dbapi_conn, False)
3311 def set_characteristic(self, dialect, dbapi_conn, value):
3312 dialect.set_readonly(dbapi_conn, value)
3314 def get_characteristic(self, dialect, dbapi_conn):
3315 return dialect.get_readonly(dbapi_conn)
3318class PGDeferrableConnectionCharacteristic(
3319 characteristics.ConnectionCharacteristic
3320):
3321 transactional = True
3323 def reset_characteristic(self, dialect, dbapi_conn):
3324 dialect.set_deferrable(dbapi_conn, False)
3326 def set_characteristic(self, dialect, dbapi_conn, value):
3327 dialect.set_deferrable(dbapi_conn, value)
3329 def get_characteristic(self, dialect, dbapi_conn):
3330 return dialect.get_deferrable(dbapi_conn)
3333class PGDialect(default.DefaultDialect):
3334 name = "postgresql"
3335 supports_statement_cache = True
3336 supports_alter = True
3337 max_identifier_length = 63
3338 supports_sane_rowcount = True
3340 supports_native_enum = True
3341 supports_native_boolean = True
3342 supports_smallserial = True
3344 supports_sequences = True
3345 sequences_optional = True
3346 preexecute_autoincrement_sequences = True
3347 postfetch_lastrowid = False
3349 supports_comments = True
3350 supports_default_values = True
3352 supports_default_metavalue = True
3354 supports_empty_insert = False
3355 supports_multivalues_insert = True
3356 supports_identity_columns = True
3358 default_paramstyle = "pyformat"
3359 ischema_names = ischema_names
3360 colspecs = colspecs
3362 statement_compiler = PGCompiler
3363 ddl_compiler = PGDDLCompiler
3364 type_compiler = PGTypeCompiler
3365 preparer = PGIdentifierPreparer
3366 execution_ctx_cls = PGExecutionContext
3367 inspector = PGInspector
3368 isolation_level = None
3370 implicit_returning = True
3371 full_returning = True
3373 connection_characteristics = (
3374 default.DefaultDialect.connection_characteristics
3375 )
3376 connection_characteristics = connection_characteristics.union(
3377 {
3378 "postgresql_readonly": PGReadOnlyConnectionCharacteristic(),
3379 "postgresql_deferrable": PGDeferrableConnectionCharacteristic(),
3380 }
3381 )
3383 construct_arguments = [
3384 (
3385 schema.Index,
3386 {
3387 "using": False,
3388 "include": None,
3389 "where": None,
3390 "ops": {},
3391 "concurrently": False,
3392 "with": {},
3393 "tablespace": None,
3394 },
3395 ),
3396 (
3397 schema.Table,
3398 {
3399 "ignore_search_path": False,
3400 "tablespace": None,
3401 "partition_by": None,
3402 "with_oids": None,
3403 "on_commit": None,
3404 "inherits": None,
3405 },
3406 ),
3407 (
3408 schema.CheckConstraint,
3409 {
3410 "not_valid": False,
3411 },
3412 ),
3413 (
3414 schema.ForeignKeyConstraint,
3415 {
3416 "not_valid": False,
3417 },
3418 ),
3419 ]
3421 reflection_options = ("postgresql_ignore_search_path",)
3423 _backslash_escapes = True
3424 _supports_create_index_concurrently = True
3425 _supports_drop_index_concurrently = True
3427 def __init__(
3428 self,
3429 isolation_level=None,
3430 json_serializer=None,
3431 json_deserializer=None,
3432 **kwargs
3433 ):
3434 default.DefaultDialect.__init__(self, **kwargs)
3436 # the isolation_level parameter to the PGDialect itself is legacy.
3437 # still works however the execution_options method is the one that
3438 # is documented.
3439 self.isolation_level = isolation_level
3440 self._json_deserializer = json_deserializer
3441 self._json_serializer = json_serializer
3443 def initialize(self, connection):
3444 super(PGDialect, self).initialize(connection)
3446 if self.server_version_info <= (8, 2):
3447 self.full_returning = self.implicit_returning = False
3449 self.supports_native_enum = self.server_version_info >= (8, 3)
3450 if not self.supports_native_enum:
3451 self.colspecs = self.colspecs.copy()
3452 # pop base Enum type
3453 self.colspecs.pop(sqltypes.Enum, None)
3454 # psycopg2, others may have placed ENUM here as well
3455 self.colspecs.pop(ENUM, None)
3457 # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689
3458 self.supports_smallserial = self.server_version_info >= (9, 2)
3460 if self.server_version_info < (8, 2):
3461 self._backslash_escapes = False
3462 else:
3463 # ensure this query is not emitted on server version < 8.2
3464 # as it will fail
3465 std_string = connection.exec_driver_sql(
3466 "show standard_conforming_strings"
3467 ).scalar()
3468 self._backslash_escapes = std_string == "off"
3470 self._supports_create_index_concurrently = (
3471 self.server_version_info >= (8, 2)
3472 )
3473 self._supports_drop_index_concurrently = self.server_version_info >= (
3474 9,
3475 2,
3476 )
3477 self.supports_identity_columns = self.server_version_info >= (10,)
3479 def on_connect(self):
3480 if self.isolation_level is not None:
3482 def connect(conn):
3483 self.set_isolation_level(conn, self.isolation_level)
3485 return connect
3486 else:
3487 return None
3489 _isolation_lookup = set(
3490 [
3491 "SERIALIZABLE",
3492 "READ UNCOMMITTED",
3493 "READ COMMITTED",
3494 "REPEATABLE READ",
3495 ]
3496 )
3498 def set_isolation_level(self, connection, level):
3499 level = level.replace("_", " ")
3500 if level not in self._isolation_lookup:
3501 raise exc.ArgumentError(
3502 "Invalid value '%s' for isolation_level. "
3503 "Valid isolation levels for %s are %s"
3504 % (level, self.name, ", ".join(self._isolation_lookup))
3505 )
3506 cursor = connection.cursor()
3507 cursor.execute(
3508 "SET SESSION CHARACTERISTICS AS TRANSACTION "
3509 "ISOLATION LEVEL %s" % level
3510 )
3511 cursor.execute("COMMIT")
3512 cursor.close()
3514 def get_isolation_level(self, connection):
3515 cursor = connection.cursor()
3516 cursor.execute("show transaction isolation level")
3517 val = cursor.fetchone()[0]
3518 cursor.close()
3519 return val.upper()
3521 def set_readonly(self, connection, value):
3522 raise NotImplementedError()
3524 def get_readonly(self, connection):
3525 raise NotImplementedError()
3527 def set_deferrable(self, connection, value):
3528 raise NotImplementedError()
3530 def get_deferrable(self, connection):
3531 raise NotImplementedError()
3533 def do_begin_twophase(self, connection, xid):
3534 self.do_begin(connection.connection)
3536 def do_prepare_twophase(self, connection, xid):
3537 connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid)
3539 def do_rollback_twophase(
3540 self, connection, xid, is_prepared=True, recover=False
3541 ):
3542 if is_prepared:
3543 if recover:
3544 # FIXME: ugly hack to get out of transaction
3545 # context when committing recoverable transactions
3546 # Must find out a way how to make the dbapi not
3547 # open a transaction.
3548 connection.exec_driver_sql("ROLLBACK")
3549 connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
3550 connection.exec_driver_sql("BEGIN")
3551 self.do_rollback(connection.connection)
3552 else:
3553 self.do_rollback(connection.connection)
3555 def do_commit_twophase(
3556 self, connection, xid, is_prepared=True, recover=False
3557 ):
3558 if is_prepared:
3559 if recover:
3560 connection.exec_driver_sql("ROLLBACK")
3561 connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid)
3562 connection.exec_driver_sql("BEGIN")
3563 self.do_rollback(connection.connection)
3564 else:
3565 self.do_commit(connection.connection)
3567 def do_recover_twophase(self, connection):
3568 resultset = connection.execute(
3569 sql.text("SELECT gid FROM pg_prepared_xacts")
3570 )
3571 return [row[0] for row in resultset]
3573 def _get_default_schema_name(self, connection):
3574 return connection.exec_driver_sql("select current_schema()").scalar()
3576 def has_schema(self, connection, schema):
3577 query = (
3578 "select nspname from pg_namespace " "where lower(nspname)=:schema"
3579 )
3580 cursor = connection.execute(
3581 sql.text(query).bindparams(
3582 sql.bindparam(
3583 "schema",
3584 util.text_type(schema.lower()),
3585 type_=sqltypes.Unicode,
3586 )
3587 )
3588 )
3590 return bool(cursor.first())
3592 def has_table(self, connection, table_name, schema=None):
3593 self._ensure_has_table_connection(connection)
3594 # seems like case gets folded in pg_class...
3595 if schema is None:
3596 cursor = connection.execute(
3597 sql.text(
3598 "select relname from pg_class c join pg_namespace n on "
3599 "n.oid=c.relnamespace where "
3600 "pg_catalog.pg_table_is_visible(c.oid) "
3601 "and relname=:name"
3602 ).bindparams(
3603 sql.bindparam(
3604 "name",
3605 util.text_type(table_name),
3606 type_=sqltypes.Unicode,
3607 )
3608 )
3609 )
3610 else:
3611 cursor = connection.execute(
3612 sql.text(
3613 "select relname from pg_class c join pg_namespace n on "
3614 "n.oid=c.relnamespace where n.nspname=:schema and "
3615 "relname=:name"
3616 ).bindparams(
3617 sql.bindparam(
3618 "name",
3619 util.text_type(table_name),
3620 type_=sqltypes.Unicode,
3621 ),
3622 sql.bindparam(
3623 "schema",
3624 util.text_type(schema),
3625 type_=sqltypes.Unicode,
3626 ),
3627 )
3628 )
3629 return bool(cursor.first())
3631 def has_sequence(self, connection, sequence_name, schema=None):
3632 if schema is None:
3633 schema = self.default_schema_name
3634 cursor = connection.execute(
3635 sql.text(
3636 "SELECT relname FROM pg_class c join pg_namespace n on "
3637 "n.oid=c.relnamespace where relkind='S' and "
3638 "n.nspname=:schema and relname=:name"
3639 ).bindparams(
3640 sql.bindparam(
3641 "name",
3642 util.text_type(sequence_name),
3643 type_=sqltypes.Unicode,
3644 ),
3645 sql.bindparam(
3646 "schema",
3647 util.text_type(schema),
3648 type_=sqltypes.Unicode,
3649 ),
3650 )
3651 )
3653 return bool(cursor.first())
3655 def has_type(self, connection, type_name, schema=None):
3656 if schema is not None:
3657 query = """
3658 SELECT EXISTS (
3659 SELECT * FROM pg_catalog.pg_type t, pg_catalog.pg_namespace n
3660 WHERE t.typnamespace = n.oid
3661 AND t.typname = :typname
3662 AND n.nspname = :nspname
3663 )
3664 """
3665 query = sql.text(query)
3666 else:
3667 query = """
3668 SELECT EXISTS (
3669 SELECT * FROM pg_catalog.pg_type t
3670 WHERE t.typname = :typname
3671 AND pg_type_is_visible(t.oid)
3672 )
3673 """
3674 query = sql.text(query)
3675 query = query.bindparams(
3676 sql.bindparam(
3677 "typname", util.text_type(type_name), type_=sqltypes.Unicode
3678 )
3679 )
3680 if schema is not None:
3681 query = query.bindparams(
3682 sql.bindparam(
3683 "nspname", util.text_type(schema), type_=sqltypes.Unicode
3684 )
3685 )
3686 cursor = connection.execute(query)
3687 return bool(cursor.scalar())
3689 def _get_server_version_info(self, connection):
3690 v = connection.exec_driver_sql("select pg_catalog.version()").scalar()
3691 m = re.match(
3692 r".*(?:PostgreSQL|EnterpriseDB) "
3693 r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?",
3694 v,
3695 )
3696 if not m:
3697 raise AssertionError(
3698 "Could not determine version from string '%s'" % v
3699 )
3700 return tuple([int(x) for x in m.group(1, 2, 3) if x is not None])
3702 @reflection.cache
3703 def get_table_oid(self, connection, table_name, schema=None, **kw):
3704 """Fetch the oid for schema.table_name.
3706 Several reflection methods require the table oid. The idea for using
3707 this method is that it can be fetched one time and cached for
3708 subsequent calls.
3710 """
3711 table_oid = None
3712 if schema is not None:
3713 schema_where_clause = "n.nspname = :schema"
3714 else:
3715 schema_where_clause = "pg_catalog.pg_table_is_visible(c.oid)"
3716 query = (
3717 """
3718 SELECT c.oid
3719 FROM pg_catalog.pg_class c
3720 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
3721 WHERE (%s)
3722 AND c.relname = :table_name AND c.relkind in
3723 ('r', 'v', 'm', 'f', 'p')
3724 """
3725 % schema_where_clause
3726 )
3727 # Since we're binding to unicode, table_name and schema_name must be
3728 # unicode.
3729 table_name = util.text_type(table_name)
3730 if schema is not None:
3731 schema = util.text_type(schema)
3732 s = sql.text(query).bindparams(table_name=sqltypes.Unicode)
3733 s = s.columns(oid=sqltypes.Integer)
3734 if schema:
3735 s = s.bindparams(sql.bindparam("schema", type_=sqltypes.Unicode))
3736 c = connection.execute(s, dict(table_name=table_name, schema=schema))
3737 table_oid = c.scalar()
3738 if table_oid is None:
3739 raise exc.NoSuchTableError(table_name)
3740 return table_oid
3742 @reflection.cache
3743 def get_schema_names(self, connection, **kw):
3744 result = connection.execute(
3745 sql.text(
3746 "SELECT nspname FROM pg_namespace "
3747 "WHERE nspname NOT LIKE 'pg_%' "
3748 "ORDER BY nspname"
3749 ).columns(nspname=sqltypes.Unicode)
3750 )
3751 return [name for name, in result]
3753 @reflection.cache
3754 def get_table_names(self, connection, schema=None, **kw):
3755 result = connection.execute(
3756 sql.text(
3757 "SELECT c.relname FROM pg_class c "
3758 "JOIN pg_namespace n ON n.oid = c.relnamespace "
3759 "WHERE n.nspname = :schema AND c.relkind in ('r', 'p')"
3760 ).columns(relname=sqltypes.Unicode),
3761 dict(
3762 schema=schema
3763 if schema is not None
3764 else self.default_schema_name
3765 ),
3766 )
3767 return [name for name, in result]
3769 @reflection.cache
3770 def _get_foreign_table_names(self, connection, schema=None, **kw):
3771 result = connection.execute(
3772 sql.text(
3773 "SELECT c.relname FROM pg_class c "
3774 "JOIN pg_namespace n ON n.oid = c.relnamespace "
3775 "WHERE n.nspname = :schema AND c.relkind = 'f'"
3776 ).columns(relname=sqltypes.Unicode),
3777 dict(
3778 schema=schema
3779 if schema is not None
3780 else self.default_schema_name
3781 ),
3782 )
3783 return [name for name, in result]
3785 @reflection.cache
3786 def get_view_names(
3787 self, connection, schema=None, include=("plain", "materialized"), **kw
3788 ):
3790 include_kind = {"plain": "v", "materialized": "m"}
3791 try:
3792 kinds = [include_kind[i] for i in util.to_list(include)]
3793 except KeyError:
3794 raise ValueError(
3795 "include %r unknown, needs to be a sequence containing "
3796 "one or both of 'plain' and 'materialized'" % (include,)
3797 )
3798 if not kinds:
3799 raise ValueError(
3800 "empty include, needs to be a sequence containing "
3801 "one or both of 'plain' and 'materialized'"
3802 )
3804 result = connection.execute(
3805 sql.text(
3806 "SELECT c.relname FROM pg_class c "
3807 "JOIN pg_namespace n ON n.oid = c.relnamespace "
3808 "WHERE n.nspname = :schema AND c.relkind IN (%s)"
3809 % (", ".join("'%s'" % elem for elem in kinds))
3810 ).columns(relname=sqltypes.Unicode),
3811 dict(
3812 schema=schema
3813 if schema is not None
3814 else self.default_schema_name
3815 ),
3816 )
3817 return [name for name, in result]
3819 @reflection.cache
3820 def get_sequence_names(self, connection, schema=None, **kw):
3821 if not schema:
3822 schema = self.default_schema_name
3823 cursor = connection.execute(
3824 sql.text(
3825 "SELECT relname FROM pg_class c join pg_namespace n on "
3826 "n.oid=c.relnamespace where relkind='S' and "
3827 "n.nspname=:schema"
3828 ).bindparams(
3829 sql.bindparam(
3830 "schema",
3831 util.text_type(schema),
3832 type_=sqltypes.Unicode,
3833 ),
3834 )
3835 )
3836 return [row[0] for row in cursor]
3838 @reflection.cache
3839 def get_view_definition(self, connection, view_name, schema=None, **kw):
3840 view_def = connection.scalar(
3841 sql.text(
3842 "SELECT pg_get_viewdef(c.oid) view_def FROM pg_class c "
3843 "JOIN pg_namespace n ON n.oid = c.relnamespace "
3844 "WHERE n.nspname = :schema AND c.relname = :view_name "
3845 "AND c.relkind IN ('v', 'm')"
3846 ).columns(view_def=sqltypes.Unicode),
3847 dict(
3848 schema=schema
3849 if schema is not None
3850 else self.default_schema_name,
3851 view_name=view_name,
3852 ),
3853 )
3854 return view_def
3856 @reflection.cache
3857 def get_columns(self, connection, table_name, schema=None, **kw):
3859 table_oid = self.get_table_oid(
3860 connection, table_name, schema, info_cache=kw.get("info_cache")
3861 )
3863 generated = (
3864 "a.attgenerated as generated"
3865 if self.server_version_info >= (12,)
3866 else "NULL as generated"
3867 )
3868 if self.server_version_info >= (10,):
3869 # a.attidentity != '' is required or it will reflect also
3870 # serial columns as identity.
3871 identity = """\
3872 (SELECT json_build_object(
3873 'always', a.attidentity = 'a',
3874 'start', s.seqstart,
3875 'increment', s.seqincrement,
3876 'minvalue', s.seqmin,
3877 'maxvalue', s.seqmax,
3878 'cache', s.seqcache,
3879 'cycle', s.seqcycle)
3880 FROM pg_catalog.pg_sequence s
3881 JOIN pg_catalog.pg_class c on s.seqrelid = c."oid"
3882 WHERE c.relkind = 'S'
3883 AND a.attidentity != ''
3884 AND s.seqrelid = pg_catalog.pg_get_serial_sequence(
3885 a.attrelid::regclass::text, a.attname
3886 )::regclass::oid
3887 ) as identity_options\
3888 """
3889 else:
3890 identity = "NULL as identity_options"
3892 SQL_COLS = """
3893 SELECT a.attname,
3894 pg_catalog.format_type(a.atttypid, a.atttypmod),
3895 (
3896 SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
3897 FROM pg_catalog.pg_attrdef d
3898 WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum
3899 AND a.atthasdef
3900 ) AS DEFAULT,
3901 a.attnotnull,
3902 a.attrelid as table_oid,
3903 pgd.description as comment,
3904 %s,
3905 %s
3906 FROM pg_catalog.pg_attribute a
3907 LEFT JOIN pg_catalog.pg_description pgd ON (
3908 pgd.objoid = a.attrelid AND pgd.objsubid = a.attnum)
3909 WHERE a.attrelid = :table_oid
3910 AND a.attnum > 0 AND NOT a.attisdropped
3911 ORDER BY a.attnum
3912 """ % (
3913 generated,
3914 identity,
3915 )
3916 s = (
3917 sql.text(SQL_COLS)
3918 .bindparams(sql.bindparam("table_oid", type_=sqltypes.Integer))
3919 .columns(attname=sqltypes.Unicode, default=sqltypes.Unicode)
3920 )
3921 c = connection.execute(s, dict(table_oid=table_oid))
3922 rows = c.fetchall()
3924 # dictionary with (name, ) if default search path or (schema, name)
3925 # as keys
3926 domains = self._load_domains(connection)
3928 # dictionary with (name, ) if default search path or (schema, name)
3929 # as keys
3930 enums = dict(
3931 ((rec["name"],), rec)
3932 if rec["visible"]
3933 else ((rec["schema"], rec["name"]), rec)
3934 for rec in self._load_enums(connection, schema="*")
3935 )
3937 # format columns
3938 columns = []
3940 for (
3941 name,
3942 format_type,
3943 default_,
3944 notnull,
3945 table_oid,
3946 comment,
3947 generated,
3948 identity,
3949 ) in rows:
3950 column_info = self._get_column_info(
3951 name,
3952 format_type,
3953 default_,
3954 notnull,
3955 domains,
3956 enums,
3957 schema,
3958 comment,
3959 generated,
3960 identity,
3961 )
3962 columns.append(column_info)
3963 return columns
3965 def _get_column_info(
3966 self,
3967 name,
3968 format_type,
3969 default,
3970 notnull,
3971 domains,
3972 enums,
3973 schema,
3974 comment,
3975 generated,
3976 identity,
3977 ):
3978 def _handle_array_type(attype):
3979 return (
3980 # strip '[]' from integer[], etc.
3981 re.sub(r"\[\]$", "", attype),
3982 attype.endswith("[]"),
3983 )
3985 if format_type is None:
3986 no_format_type = True
3987 attype = format_type = "no format_type()"
3988 is_array = False
3989 else:
3990 no_format_type = False
3992 # strip (*) from character varying(5), timestamp(5)
3993 # with time zone, geometry(POLYGON), etc.
3994 attype = re.sub(r"\(.*\)", "", format_type)
3996 # strip '[]' from integer[], etc. and check if an array
3997 attype, is_array = _handle_array_type(attype)
3999 # strip quotes from case sensitive enum or domain names
4000 enum_or_domain_key = tuple(util.quoted_token_parser(attype))
4002 nullable = not notnull
4004 charlen = re.search(r"\(([\d,]+)\)", format_type)
4005 if charlen:
4006 charlen = charlen.group(1)
4007 args = re.search(r"\((.*)\)", format_type)
4008 if args and args.group(1):
4009 args = tuple(re.split(r"\s*,\s*", args.group(1)))
4010 else:
4011 args = ()
4012 kwargs = {}
4014 if attype == "numeric":
4015 if charlen:
4016 prec, scale = charlen.split(",")
4017 args = (int(prec), int(scale))
4018 else:
4019 args = ()
4020 elif attype == "double precision":
4021 args = (53,)
4022 elif attype == "integer":
4023 args = ()
4024 elif attype in ("timestamp with time zone", "time with time zone"):
4025 kwargs["timezone"] = True
4026 if charlen:
4027 kwargs["precision"] = int(charlen)
4028 args = ()
4029 elif attype in (
4030 "timestamp without time zone",
4031 "time without time zone",
4032 "time",
4033 ):
4034 kwargs["timezone"] = False
4035 if charlen:
4036 kwargs["precision"] = int(charlen)
4037 args = ()
4038 elif attype == "bit varying":
4039 kwargs["varying"] = True
4040 if charlen:
4041 args = (int(charlen),)
4042 else:
4043 args = ()
4044 elif attype.startswith("interval"):
4045 field_match = re.match(r"interval (.+)", attype, re.I)
4046 if charlen:
4047 kwargs["precision"] = int(charlen)
4048 if field_match:
4049 kwargs["fields"] = field_match.group(1)
4050 attype = "interval"
4051 args = ()
4052 elif charlen:
4053 args = (int(charlen),)
4055 while True:
4056 # looping here to suit nested domains
4057 if attype in self.ischema_names:
4058 coltype = self.ischema_names[attype]
4059 break
4060 elif enum_or_domain_key in enums:
4061 enum = enums[enum_or_domain_key]
4062 coltype = ENUM
4063 kwargs["name"] = enum["name"]
4064 if not enum["visible"]:
4065 kwargs["schema"] = enum["schema"]
4066 args = tuple(enum["labels"])
4067 break
4068 elif enum_or_domain_key in domains:
4069 domain = domains[enum_or_domain_key]
4070 attype = domain["attype"]
4071 attype, is_array = _handle_array_type(attype)
4072 # strip quotes from case sensitive enum or domain names
4073 enum_or_domain_key = tuple(util.quoted_token_parser(attype))
4074 # A table can't override a not null on the domain,
4075 # but can override nullable
4076 nullable = nullable and domain["nullable"]
4077 if domain["default"] and not default:
4078 # It can, however, override the default
4079 # value, but can't set it to null.
4080 default = domain["default"]
4081 continue
4082 else:
4083 coltype = None
4084 break
4086 if coltype:
4087 coltype = coltype(*args, **kwargs)
4088 if is_array:
4089 coltype = self.ischema_names["_array"](coltype)
4090 elif no_format_type:
4091 util.warn(
4092 "PostgreSQL format_type() returned NULL for column '%s'"
4093 % (name,)
4094 )
4095 coltype = sqltypes.NULLTYPE
4096 else:
4097 util.warn(
4098 "Did not recognize type '%s' of column '%s'" % (attype, name)
4099 )
4100 coltype = sqltypes.NULLTYPE
4102 # If a zero byte or blank string depending on driver (is also absent
4103 # for older PG versions), then not a generated column. Otherwise, s =
4104 # stored. (Other values might be added in the future.)
4105 if generated not in (None, "", b"\x00"):
4106 computed = dict(
4107 sqltext=default, persisted=generated in ("s", b"s")
4108 )
4109 default = None
4110 else:
4111 computed = None
4113 # adjust the default value
4114 autoincrement = False
4115 if default is not None:
4116 match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
4117 if match is not None:
4118 if issubclass(coltype._type_affinity, sqltypes.Integer):
4119 autoincrement = True
4120 # the default is related to a Sequence
4121 sch = schema
4122 if "." not in match.group(2) and sch is not None:
4123 # unconditionally quote the schema name. this could
4124 # later be enhanced to obey quoting rules /
4125 # "quote schema"
4126 default = (
4127 match.group(1)
4128 + ('"%s"' % sch)
4129 + "."
4130 + match.group(2)
4131 + match.group(3)
4132 )
4134 column_info = dict(
4135 name=name,
4136 type=coltype,
4137 nullable=nullable,
4138 default=default,
4139 autoincrement=autoincrement or identity is not None,
4140 comment=comment,
4141 )
4142 if computed is not None:
4143 column_info["computed"] = computed
4144 if identity is not None:
4145 column_info["identity"] = identity
4146 return column_info
4148 @reflection.cache
4149 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
4150 table_oid = self.get_table_oid(
4151 connection, table_name, schema, info_cache=kw.get("info_cache")
4152 )
4154 if self.server_version_info < (8, 4):
4155 PK_SQL = """
4156 SELECT a.attname
4157 FROM
4158 pg_class t
4159 join pg_index ix on t.oid = ix.indrelid
4160 join pg_attribute a
4161 on t.oid=a.attrelid AND %s
4162 WHERE
4163 t.oid = :table_oid and ix.indisprimary = 't'
4164 ORDER BY a.attnum
4165 """ % self._pg_index_any(
4166 "a.attnum", "ix.indkey"
4167 )
4169 else:
4170 # unnest() and generate_subscripts() both introduced in
4171 # version 8.4
4172 PK_SQL = """
4173 SELECT a.attname
4174 FROM pg_attribute a JOIN (
4175 SELECT unnest(ix.indkey) attnum,
4176 generate_subscripts(ix.indkey, 1) ord
4177 FROM pg_index ix
4178 WHERE ix.indrelid = :table_oid AND ix.indisprimary
4179 ) k ON a.attnum=k.attnum
4180 WHERE a.attrelid = :table_oid
4181 ORDER BY k.ord
4182 """
4183 t = sql.text(PK_SQL).columns(attname=sqltypes.Unicode)
4184 c = connection.execute(t, dict(table_oid=table_oid))
4185 cols = [r[0] for r in c.fetchall()]
4187 PK_CONS_SQL = """
4188 SELECT conname
4189 FROM pg_catalog.pg_constraint r
4190 WHERE r.conrelid = :table_oid AND r.contype = 'p'
4191 ORDER BY 1
4192 """
4193 t = sql.text(PK_CONS_SQL).columns(conname=sqltypes.Unicode)
4194 c = connection.execute(t, dict(table_oid=table_oid))
4195 name = c.scalar()
4197 return {"constrained_columns": cols, "name": name}
4199 @reflection.cache
4200 def get_foreign_keys(
4201 self,
4202 connection,
4203 table_name,
4204 schema=None,
4205 postgresql_ignore_search_path=False,
4206 **kw
4207 ):
4208 preparer = self.identifier_preparer
4209 table_oid = self.get_table_oid(
4210 connection, table_name, schema, info_cache=kw.get("info_cache")
4211 )
4213 FK_SQL = """
4214 SELECT r.conname,
4215 pg_catalog.pg_get_constraintdef(r.oid, true) as condef,
4216 n.nspname as conschema
4217 FROM pg_catalog.pg_constraint r,
4218 pg_namespace n,
4219 pg_class c
4221 WHERE r.conrelid = :table AND
4222 r.contype = 'f' AND
4223 c.oid = confrelid AND
4224 n.oid = c.relnamespace
4225 ORDER BY 1
4226 """
4227 # https://www.postgresql.org/docs/9.0/static/sql-createtable.html
4228 FK_REGEX = re.compile(
4229 r"FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)"
4230 r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?"
4231 r"[\s]?(ON UPDATE "
4232 r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
4233 r"[\s]?(ON DELETE "
4234 r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
4235 r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?"
4236 r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?"
4237 )
4239 t = sql.text(FK_SQL).columns(
4240 conname=sqltypes.Unicode, condef=sqltypes.Unicode
4241 )
4242 c = connection.execute(t, dict(table=table_oid))
4243 fkeys = []
4244 for conname, condef, conschema in c.fetchall():
4245 m = re.search(FK_REGEX, condef).groups()
4247 (
4248 constrained_columns,
4249 referred_schema,
4250 referred_table,
4251 referred_columns,
4252 _,
4253 match,
4254 _,
4255 onupdate,
4256 _,
4257 ondelete,
4258 deferrable,
4259 _,
4260 initially,
4261 ) = m
4263 if deferrable is not None:
4264 deferrable = True if deferrable == "DEFERRABLE" else False
4265 constrained_columns = [
4266 preparer._unquote_identifier(x)
4267 for x in re.split(r"\s*,\s*", constrained_columns)
4268 ]
4270 if postgresql_ignore_search_path:
4271 # when ignoring search path, we use the actual schema
4272 # provided it isn't the "default" schema
4273 if conschema != self.default_schema_name:
4274 referred_schema = conschema
4275 else:
4276 referred_schema = schema
4277 elif referred_schema:
4278 # referred_schema is the schema that we regexp'ed from
4279 # pg_get_constraintdef(). If the schema is in the search
4280 # path, pg_get_constraintdef() will give us None.
4281 referred_schema = preparer._unquote_identifier(referred_schema)
4282 elif schema is not None and schema == conschema:
4283 # If the actual schema matches the schema of the table
4284 # we're reflecting, then we will use that.
4285 referred_schema = schema
4287 referred_table = preparer._unquote_identifier(referred_table)
4288 referred_columns = [
4289 preparer._unquote_identifier(x)
4290 for x in re.split(r"\s*,\s", referred_columns)
4291 ]
4292 options = {
4293 k: v
4294 for k, v in [
4295 ("onupdate", onupdate),
4296 ("ondelete", ondelete),
4297 ("initially", initially),
4298 ("deferrable", deferrable),
4299 ("match", match),
4300 ]
4301 if v is not None and v != "NO ACTION"
4302 }
4303 fkey_d = {
4304 "name": conname,
4305 "constrained_columns": constrained_columns,
4306 "referred_schema": referred_schema,
4307 "referred_table": referred_table,
4308 "referred_columns": referred_columns,
4309 "options": options,
4310 }
4311 fkeys.append(fkey_d)
4312 return fkeys
4314 def _pg_index_any(self, col, compare_to):
4315 if self.server_version_info < (8, 1):
4316 # https://www.postgresql.org/message-id/10279.1124395722@sss.pgh.pa.us
4317 # "In CVS tip you could replace this with "attnum = ANY (indkey)".
4318 # Unfortunately, most array support doesn't work on int2vector in
4319 # pre-8.1 releases, so I think you're kinda stuck with the above
4320 # for now.
4321 # regards, tom lane"
4322 return "(%s)" % " OR ".join(
4323 "%s[%d] = %s" % (compare_to, ind, col) for ind in range(0, 10)
4324 )
4325 else:
4326 return "%s = ANY(%s)" % (col, compare_to)
4328 @reflection.cache
4329 def get_indexes(self, connection, table_name, schema, **kw):
4330 table_oid = self.get_table_oid(
4331 connection, table_name, schema, info_cache=kw.get("info_cache")
4332 )
4334 # cast indkey as varchar since it's an int2vector,
4335 # returned as a list by some drivers such as pypostgresql
4337 if self.server_version_info < (8, 5):
4338 IDX_SQL = """
4339 SELECT
4340 i.relname as relname,
4341 ix.indisunique, ix.indexprs, ix.indpred,
4342 a.attname, a.attnum, NULL, ix.indkey%s,
4343 %s, %s, am.amname,
4344 NULL as indnkeyatts
4345 FROM
4346 pg_class t
4347 join pg_index ix on t.oid = ix.indrelid
4348 join pg_class i on i.oid = ix.indexrelid
4349 left outer join
4350 pg_attribute a
4351 on t.oid = a.attrelid and %s
4352 left outer join
4353 pg_am am
4354 on i.relam = am.oid
4355 WHERE
4356 t.relkind IN ('r', 'v', 'f', 'm')
4357 and t.oid = :table_oid
4358 and ix.indisprimary = 'f'
4359 ORDER BY
4360 t.relname,
4361 i.relname
4362 """ % (
4363 # version 8.3 here was based on observing the
4364 # cast does not work in PG 8.2.4, does work in 8.3.0.
4365 # nothing in PG changelogs regarding this.
4366 "::varchar" if self.server_version_info >= (8, 3) else "",
4367 "ix.indoption::varchar"
4368 if self.server_version_info >= (8, 3)
4369 else "NULL",
4370 "i.reloptions"
4371 if self.server_version_info >= (8, 2)
4372 else "NULL",
4373 self._pg_index_any("a.attnum", "ix.indkey"),
4374 )
4375 else:
4376 IDX_SQL = """
4377 SELECT
4378 i.relname as relname,
4379 ix.indisunique, ix.indexprs,
4380 a.attname, a.attnum, c.conrelid, ix.indkey::varchar,
4381 ix.indoption::varchar, i.reloptions, am.amname,
4382 pg_get_expr(ix.indpred, ix.indrelid),
4383 %s as indnkeyatts
4384 FROM
4385 pg_class t
4386 join pg_index ix on t.oid = ix.indrelid
4387 join pg_class i on i.oid = ix.indexrelid
4388 left outer join
4389 pg_attribute a
4390 on t.oid = a.attrelid and a.attnum = ANY(ix.indkey)
4391 left outer join
4392 pg_constraint c
4393 on (ix.indrelid = c.conrelid and
4394 ix.indexrelid = c.conindid and
4395 c.contype in ('p', 'u', 'x'))
4396 left outer join
4397 pg_am am
4398 on i.relam = am.oid
4399 WHERE
4400 t.relkind IN ('r', 'v', 'f', 'm', 'p')
4401 and t.oid = :table_oid
4402 and ix.indisprimary = 'f'
4403 ORDER BY
4404 t.relname,
4405 i.relname
4406 """ % (
4407 "ix.indnkeyatts"
4408 if self.server_version_info >= (11, 0)
4409 else "NULL",
4410 )
4412 t = sql.text(IDX_SQL).columns(
4413 relname=sqltypes.Unicode, attname=sqltypes.Unicode
4414 )
4415 c = connection.execute(t, dict(table_oid=table_oid))
4417 indexes = defaultdict(lambda: defaultdict(dict))
4419 sv_idx_name = None
4420 for row in c.fetchall():
4421 (
4422 idx_name,
4423 unique,
4424 expr,
4425 col,
4426 col_num,
4427 conrelid,
4428 idx_key,
4429 idx_option,
4430 options,
4431 amname,
4432 filter_definition,
4433 indnkeyatts,
4434 ) = row
4436 if expr:
4437 if idx_name != sv_idx_name:
4438 util.warn(
4439 "Skipped unsupported reflection of "
4440 "expression-based index %s" % idx_name
4441 )
4442 sv_idx_name = idx_name
4443 continue
4445 has_idx = idx_name in indexes
4446 index = indexes[idx_name]
4447 if col is not None:
4448 index["cols"][col_num] = col
4449 if not has_idx:
4450 idx_keys = idx_key.split()
4451 # "The number of key columns in the index, not counting any
4452 # included columns, which are merely stored and do not
4453 # participate in the index semantics"
4454 if indnkeyatts and idx_keys[indnkeyatts:]:
4455 # this is a "covering index" which has INCLUDE columns
4456 # as well as regular index columns
4457 inc_keys = idx_keys[indnkeyatts:]
4458 idx_keys = idx_keys[:indnkeyatts]
4459 else:
4460 inc_keys = []
4462 index["key"] = [int(k.strip()) for k in idx_keys]
4463 index["inc"] = [int(k.strip()) for k in inc_keys]
4465 # (new in pg 8.3)
4466 # "pg_index.indoption" is list of ints, one per column/expr.
4467 # int acts as bitmask: 0x01=DESC, 0x02=NULLSFIRST
4468 sorting = {}
4469 for col_idx, col_flags in enumerate(
4470 (idx_option or "").split()
4471 ):
4472 col_flags = int(col_flags.strip())
4473 col_sorting = ()
4474 # try to set flags only if they differ from PG defaults...
4475 if col_flags & 0x01:
4476 col_sorting += ("desc",)
4477 if not (col_flags & 0x02):
4478 col_sorting += ("nulls_last",)
4479 else:
4480 if col_flags & 0x02:
4481 col_sorting += ("nulls_first",)
4482 if col_sorting:
4483 sorting[col_idx] = col_sorting
4484 if sorting:
4485 index["sorting"] = sorting
4487 index["unique"] = unique
4488 if conrelid is not None:
4489 index["duplicates_constraint"] = idx_name
4490 if options:
4491 index["options"] = dict(
4492 [option.split("=") for option in options]
4493 )
4495 # it *might* be nice to include that this is 'btree' in the
4496 # reflection info. But we don't want an Index object
4497 # to have a ``postgresql_using`` in it that is just the
4498 # default, so for the moment leaving this out.
4499 if amname and amname != "btree":
4500 index["amname"] = amname
4502 if filter_definition:
4503 index["postgresql_where"] = filter_definition
4505 result = []
4506 for name, idx in indexes.items():
4507 entry = {
4508 "name": name,
4509 "unique": idx["unique"],
4510 "column_names": [idx["cols"][i] for i in idx["key"]],
4511 }
4512 if self.server_version_info >= (11, 0):
4513 # NOTE: this is legacy, this is part of dialect_options now
4514 # as of #7382
4515 entry["include_columns"] = [idx["cols"][i] for i in idx["inc"]]
4516 if "duplicates_constraint" in idx:
4517 entry["duplicates_constraint"] = idx["duplicates_constraint"]
4518 if "sorting" in idx:
4519 entry["column_sorting"] = dict(
4520 (idx["cols"][idx["key"][i]], value)
4521 for i, value in idx["sorting"].items()
4522 )
4523 if "include_columns" in entry:
4524 entry.setdefault("dialect_options", {})[
4525 "postgresql_include"
4526 ] = entry["include_columns"]
4527 if "options" in idx:
4528 entry.setdefault("dialect_options", {})[
4529 "postgresql_with"
4530 ] = idx["options"]
4531 if "amname" in idx:
4532 entry.setdefault("dialect_options", {})[
4533 "postgresql_using"
4534 ] = idx["amname"]
4535 if "postgresql_where" in idx:
4536 entry.setdefault("dialect_options", {})[
4537 "postgresql_where"
4538 ] = idx["postgresql_where"]
4539 result.append(entry)
4540 return result
4542 @reflection.cache
4543 def get_unique_constraints(
4544 self, connection, table_name, schema=None, **kw
4545 ):
4546 table_oid = self.get_table_oid(
4547 connection, table_name, schema, info_cache=kw.get("info_cache")
4548 )
4550 UNIQUE_SQL = """
4551 SELECT
4552 cons.conname as name,
4553 cons.conkey as key,
4554 a.attnum as col_num,
4555 a.attname as col_name
4556 FROM
4557 pg_catalog.pg_constraint cons
4558 join pg_attribute a
4559 on cons.conrelid = a.attrelid AND
4560 a.attnum = ANY(cons.conkey)
4561 WHERE
4562 cons.conrelid = :table_oid AND
4563 cons.contype = 'u'
4564 """
4566 t = sql.text(UNIQUE_SQL).columns(col_name=sqltypes.Unicode)
4567 c = connection.execute(t, dict(table_oid=table_oid))
4569 uniques = defaultdict(lambda: defaultdict(dict))
4570 for row in c.fetchall():
4571 uc = uniques[row.name]
4572 uc["key"] = row.key
4573 uc["cols"][row.col_num] = row.col_name
4575 return [
4576 {"name": name, "column_names": [uc["cols"][i] for i in uc["key"]]}
4577 for name, uc in uniques.items()
4578 ]
4580 @reflection.cache
4581 def get_table_comment(self, connection, table_name, schema=None, **kw):
4582 table_oid = self.get_table_oid(
4583 connection, table_name, schema, info_cache=kw.get("info_cache")
4584 )
4586 COMMENT_SQL = """
4587 SELECT
4588 pgd.description as table_comment
4589 FROM
4590 pg_catalog.pg_description pgd
4591 WHERE
4592 pgd.objsubid = 0 AND
4593 pgd.objoid = :table_oid
4594 """
4596 c = connection.execute(
4597 sql.text(COMMENT_SQL), dict(table_oid=table_oid)
4598 )
4599 return {"text": c.scalar()}
4601 @reflection.cache
4602 def get_check_constraints(self, connection, table_name, schema=None, **kw):
4603 table_oid = self.get_table_oid(
4604 connection, table_name, schema, info_cache=kw.get("info_cache")
4605 )
4607 CHECK_SQL = """
4608 SELECT
4609 cons.conname as name,
4610 pg_get_constraintdef(cons.oid) as src
4611 FROM
4612 pg_catalog.pg_constraint cons
4613 WHERE
4614 cons.conrelid = :table_oid AND
4615 cons.contype = 'c'
4616 """
4618 c = connection.execute(sql.text(CHECK_SQL), dict(table_oid=table_oid))
4620 ret = []
4621 for name, src in c:
4622 # samples:
4623 # "CHECK (((a > 1) AND (a < 5)))"
4624 # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))"
4625 # "CHECK (((a > 1) AND (a < 5))) NOT VALID"
4626 # "CHECK (some_boolean_function(a))"
4627 # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)"
4629 m = re.match(
4630 r"^CHECK *\((.+)\)( NOT VALID)?$", src, flags=re.DOTALL
4631 )
4632 if not m:
4633 util.warn("Could not parse CHECK constraint text: %r" % src)
4634 sqltext = ""
4635 else:
4636 sqltext = re.compile(
4637 r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL
4638 ).sub(r"\1", m.group(1))
4639 entry = {"name": name, "sqltext": sqltext}
4640 if m and m.group(2):
4641 entry["dialect_options"] = {"not_valid": True}
4643 ret.append(entry)
4644 return ret
4646 def _load_enums(self, connection, schema=None):
4647 schema = schema or self.default_schema_name
4648 if not self.supports_native_enum:
4649 return {}
4651 # Load data types for enums:
4652 SQL_ENUMS = """
4653 SELECT t.typname as "name",
4654 -- no enum defaults in 8.4 at least
4655 -- t.typdefault as "default",
4656 pg_catalog.pg_type_is_visible(t.oid) as "visible",
4657 n.nspname as "schema",
4658 e.enumlabel as "label"
4659 FROM pg_catalog.pg_type t
4660 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
4661 LEFT JOIN pg_catalog.pg_enum e ON t.oid = e.enumtypid
4662 WHERE t.typtype = 'e'
4663 """
4665 if schema != "*":
4666 SQL_ENUMS += "AND n.nspname = :schema "
4668 # e.oid gives us label order within an enum
4669 SQL_ENUMS += 'ORDER BY "schema", "name", e.oid'
4671 s = sql.text(SQL_ENUMS).columns(
4672 attname=sqltypes.Unicode, label=sqltypes.Unicode
4673 )
4675 if schema != "*":
4676 s = s.bindparams(schema=schema)
4678 c = connection.execute(s)
4680 enums = []
4681 enum_by_name = {}
4682 for enum in c.fetchall():
4683 key = (enum.schema, enum.name)
4684 if key in enum_by_name:
4685 enum_by_name[key]["labels"].append(enum.label)
4686 else:
4687 enum_by_name[key] = enum_rec = {
4688 "name": enum.name,
4689 "schema": enum.schema,
4690 "visible": enum.visible,
4691 "labels": [],
4692 }
4693 if enum.label is not None:
4694 enum_rec["labels"].append(enum.label)
4695 enums.append(enum_rec)
4696 return enums
4698 def _load_domains(self, connection):
4699 # Load data types for domains:
4700 SQL_DOMAINS = """
4701 SELECT t.typname as "name",
4702 pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype",
4703 not t.typnotnull as "nullable",
4704 t.typdefault as "default",
4705 pg_catalog.pg_type_is_visible(t.oid) as "visible",
4706 n.nspname as "schema"
4707 FROM pg_catalog.pg_type t
4708 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
4709 WHERE t.typtype = 'd'
4710 """
4712 s = sql.text(SQL_DOMAINS)
4713 c = connection.execution_options(future_result=True).execute(s)
4715 domains = {}
4716 for domain in c.mappings():
4717 domain = domain
4718 # strip (30) from character varying(30)
4719 attype = re.search(r"([^\(]+)", domain["attype"]).group(1)
4720 # 'visible' just means whether or not the domain is in a
4721 # schema that's on the search path -- or not overridden by
4722 # a schema with higher precedence. If it's not visible,
4723 # it will be prefixed with the schema-name when it's used.
4724 if domain["visible"]:
4725 key = (domain["name"],)
4726 else:
4727 key = (domain["schema"], domain["name"])
4729 domains[key] = {
4730 "attype": attype,
4731 "nullable": domain["nullable"],
4732 "default": domain["default"],
4733 }
4735 return domains