Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/base.py: 26%
1201 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# postgresql/base.py
2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
8r"""
9.. dialect:: postgresql
10 :name: PostgreSQL
11 :full_support: 9.6, 10, 11, 12, 13, 14
12 :normal_support: 9.6+
13 :best_effort: 8+
15.. _postgresql_sequences:
17Sequences/SERIAL/IDENTITY
18-------------------------
20PostgreSQL supports sequences, and SQLAlchemy uses these as the default means
21of creating new primary key values for integer-based primary key columns. When
22creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for
23integer-based primary key columns, which generates a sequence and server side
24default corresponding to the column.
26To specify a specific named sequence to be used for primary key generation,
27use the :func:`~sqlalchemy.schema.Sequence` construct::
29 Table('sometable', metadata,
30 Column('id', Integer, Sequence('some_id_seq'), primary_key=True)
31 )
33When SQLAlchemy issues a single INSERT statement, to fulfill the contract of
34having the "last insert identifier" available, a RETURNING clause is added to
35the INSERT statement which specifies the primary key columns should be
36returned after the statement completes. The RETURNING functionality only takes
37place if PostgreSQL 8.2 or later is in use. As a fallback approach, the
38sequence, whether specified explicitly or implicitly via ``SERIAL``, is
39executed independently beforehand, the returned value to be used in the
40subsequent insert. Note that when an
41:func:`~sqlalchemy.sql.expression.insert()` construct is executed using
42"executemany" semantics, the "last inserted identifier" functionality does not
43apply; no RETURNING clause is emitted nor is the sequence pre-executed in this
44case.
46To force the usage of RETURNING by default off, specify the flag
47``implicit_returning=False`` to :func:`_sa.create_engine`.
49PostgreSQL 10 and above IDENTITY columns
50^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
52PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use
53of SERIAL. The :class:`_schema.Identity` construct in a
54:class:`_schema.Column` can be used to control its behavior::
56 from sqlalchemy import Table, Column, MetaData, Integer, Computed
58 metadata = MetaData()
60 data = Table(
61 "data",
62 metadata,
63 Column(
64 'id', Integer, Identity(start=42, cycle=True), primary_key=True
65 ),
66 Column('data', String)
67 )
69The CREATE TABLE for the above :class:`_schema.Table` object would be:
71.. sourcecode:: sql
73 CREATE TABLE data (
74 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE),
75 data VARCHAR,
76 PRIMARY KEY (id)
77 )
79.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
80 in a :class:`_schema.Column` to specify the option of an autoincrementing
81 column.
83.. note::
85 Previous versions of SQLAlchemy did not have built-in support for rendering
86 of IDENTITY, and could use the following compilation hook to replace
87 occurrences of SERIAL with IDENTITY::
89 from sqlalchemy.schema import CreateColumn
90 from sqlalchemy.ext.compiler import compiles
93 @compiles(CreateColumn, 'postgresql')
94 def use_identity(element, compiler, **kw):
95 text = compiler.visit_create_column(element, **kw)
96 text = text.replace(
97 "SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY"
98 )
99 return text
101 Using the above, a table such as::
103 t = Table(
104 't', m,
105 Column('id', Integer, primary_key=True),
106 Column('data', String)
107 )
109 Will generate on the backing database as::
111 CREATE TABLE t (
112 id INT GENERATED BY DEFAULT AS IDENTITY,
113 data VARCHAR,
114 PRIMARY KEY (id)
115 )
117.. _postgresql_ss_cursors:
119Server Side Cursors
120-------------------
122Server-side cursor support is available for the psycopg2, asyncpg
123dialects and may also be available in others.
125Server side cursors are enabled on a per-statement basis by using the
126:paramref:`.Connection.execution_options.stream_results` connection execution
127option::
129 with engine.connect() as conn:
130 result = conn.execution_options(stream_results=True).execute(text("select * from table"))
132Note that some kinds of SQL statements may not be supported with
133server side cursors; generally, only SQL statements that return rows should be
134used with this option.
136.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated
137 and will be removed in a future release. Please use the
138 :paramref:`_engine.Connection.stream_results` execution option for
139 unbuffered cursor support.
141.. seealso::
143 :ref:`engine_stream_results`
145.. _postgresql_isolation_level:
147Transaction Isolation Level
148---------------------------
150Most SQLAlchemy dialects support setting of transaction isolation level
151using the :paramref:`_sa.create_engine.isolation_level` parameter
152at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection`
153level via the :paramref:`.Connection.execution_options.isolation_level`
154parameter.
156For PostgreSQL dialects, this feature works either by making use of the
157DBAPI-specific features, such as psycopg2's isolation level flags which will
158embed the isolation level setting inline with the ``"BEGIN"`` statement, or for
159DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS
160TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement
161emitted by the DBAPI. For the special AUTOCOMMIT isolation level,
162DBAPI-specific techniques are used which is typically an ``.autocommit``
163flag on the DBAPI connection object.
165To set isolation level using :func:`_sa.create_engine`::
167 engine = create_engine(
168 "postgresql+pg8000://scott:tiger@localhost/test",
169 isolation_level = "REPEATABLE READ"
170 )
172To set using per-connection execution options::
174 with engine.connect() as conn:
175 conn = conn.execution_options(
176 isolation_level="REPEATABLE READ"
177 )
178 with conn.begin():
179 # ... work with transaction
181There are also more options for isolation level configurations, such as
182"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
183different isolation level settings. See the discussion at
184:ref:`dbapi_autocommit` for background.
186Valid values for ``isolation_level`` on most PostgreSQL dialects include:
188* ``READ COMMITTED``
189* ``READ UNCOMMITTED``
190* ``REPEATABLE READ``
191* ``SERIALIZABLE``
192* ``AUTOCOMMIT``
194.. seealso::
196 :ref:`dbapi_autocommit`
198 :ref:`postgresql_readonly_deferrable`
200 :ref:`psycopg2_isolation_level`
202 :ref:`pg8000_isolation_level`
204.. _postgresql_readonly_deferrable:
206Setting READ ONLY / DEFERRABLE
207------------------------------
209Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE"
210characteristics of the transaction, which is in addition to the isolation level
211setting. These two attributes can be established either in conjunction with or
212independently of the isolation level by passing the ``postgresql_readonly`` and
213``postgresql_deferrable`` flags with
214:meth:`_engine.Connection.execution_options`. The example below illustrates
215passing the ``"SERIALIZABLE"`` isolation level at the same time as setting
216"READ ONLY" and "DEFERRABLE"::
218 with engine.connect() as conn:
219 conn = conn.execution_options(
220 isolation_level="SERIALIZABLE",
221 postgresql_readonly=True,
222 postgresql_deferrable=True
223 )
224 with conn.begin():
225 # ... work with transaction
227Note that some DBAPIs such as asyncpg only support "readonly" with
228SERIALIZABLE isolation.
230.. versionadded:: 1.4 added support for the ``postgresql_readonly``
231 and ``postgresql_deferrable`` execution options.
233.. _postgresql_reset_on_return:
235Temporary Table / Resource Reset for Connection Pooling
236-------------------------------------------------------
238The :class:`.QueuePool` connection pool implementation used
239by the SQLAlchemy :class:`_sa.Engine` object includes
240:ref:`reset on return <pool_reset_on_return>` behavior that will invoke
241the DBAPI ``.rollback()`` method when connections are returned to the pool.
242While this rollback will clear out the immediate state used by the previous
243transaction, it does not cover a wider range of session-level state, including
244temporary tables as well as other server state such as prepared statement
245handles and statement caches. The PostgreSQL database includes a variety
246of commands which may be used to reset this state, including
247``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``.
250To install
251one or more of these commands as the means of performing reset-on-return,
252the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated
253in the example below (**requires SQLAlchemy 1.4.43 or greater**). The implementation
254will end transactions in progress as well as discard temporary tables
255using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL
256documentation for background on what each of these statements do.
258The :paramref:`_sa.create_engine.pool_reset_on_return` parameter
259is set to ``None`` so that the custom scheme can replace the default behavior
260completely. The custom hook implementation calls ``.rollback()`` in any case,
261as it's usually important that the DBAPI's own tracking of commit/rollback
262will remain consistent with the state of the transaction::
265 from sqlalchemy import create_engine
266 from sqlalchemy import event
268 postgresql_engine = create_engine(
269 "postgresql+pyscopg2://scott:tiger@hostname/dbname",
271 # disable default reset-on-return scheme
272 pool_reset_on_return=None,
273 )
276 @event.listens_for(postgresql_engine, "reset")
277 def _reset_postgresql(dbapi_connection, connection_record, reset_state):
278 dbapi_connection.execute("CLOSE ALL")
279 dbapi_connection.execute("RESET ALL")
280 dbapi_connection.execute("DISCARD TEMP")
282 # so that the DBAPI itself knows that the connection has been
283 # reset
284 dbapi_connection.rollback()
286.. versionchanged:: 1.4.43 Ensured the :meth:`.PoolEvents.reset` event
287 is invoked for all "reset" occurrences, so that it's appropriate
288 as a place for custom "reset" handlers. Previous schemes which
289 use the :meth:`.PoolEvents.checkin` handler remain usable as well.
291.. seealso::
293 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation
295.. _postgresql_alternate_search_path:
297Setting Alternate Search Paths on Connect
298------------------------------------------
300The PostgreSQL ``search_path`` variable refers to the list of schema names
301that will be implicitly referred towards when a particular table or other
302object is referenced in a SQL statement. As detailed in the next section
303:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around
304the concept of keeping this variable at its default value of ``public``,
305however, in order to have it set to any arbitrary name or names when connections
306are used automatically, the "SET SESSION search_path" command may be invoked
307for all connections in a pool using the following event handler, as discussed
308at :ref:`schema_set_default_connections`::
310 from sqlalchemy import event
311 from sqlalchemy import create_engine
313 engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname")
315 @event.listens_for(engine, "connect", insert=True)
316 def set_search_path(dbapi_connection, connection_record):
317 existing_autocommit = dbapi_connection.autocommit
318 dbapi_connection.autocommit = True
319 cursor = dbapi_connection.cursor()
320 cursor.execute("SET SESSION search_path='%s'" % schema_name)
321 cursor.close()
322 dbapi_connection.autocommit = existing_autocommit
324The reason the recipe is complicated by use of the ``.autocommit`` DBAPI
325attribute is so that when the ``SET SESSION search_path`` directive is invoked,
326it is invoked outside of the scope of any transaction and therefore will not
327be reverted when the DBAPI connection has a rollback.
329.. seealso::
331 :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation
336.. _postgresql_schema_reflection:
338Remote-Schema Table Introspection and PostgreSQL search_path
339------------------------------------------------------------
341.. admonition:: Section Best Practices Summarized
343 keep the ``search_path`` variable set to its default of ``public``, without
344 any other schema names. For other schema names, name these explicitly
345 within :class:`_schema.Table` definitions. Alternatively, the
346 ``postgresql_ignore_search_path`` option will cause all reflected
347 :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema`
348 attribute set up.
350The PostgreSQL dialect can reflect tables from any schema, as outlined in
351:ref:`metadata_reflection_schemas`.
353With regards to tables which these :class:`_schema.Table`
354objects refer to via foreign key constraint, a decision must be made as to how
355the ``.schema`` is represented in those remote tables, in the case where that
356remote schema name is also a member of the current
357`PostgreSQL search path
358<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_.
360By default, the PostgreSQL dialect mimics the behavior encouraged by
361PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function
362returns a sample definition for a particular foreign key constraint,
363omitting the referenced schema name from that definition when the name is
364also in the PostgreSQL schema search path. The interaction below
365illustrates this behavior::
367 test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY);
368 CREATE TABLE
369 test=> CREATE TABLE referring(
370 test(> id INTEGER PRIMARY KEY,
371 test(> referred_id INTEGER REFERENCES test_schema.referred(id));
372 CREATE TABLE
373 test=> SET search_path TO public, test_schema;
374 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
375 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
376 test-> ON n.oid = c.relnamespace
377 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
378 test-> WHERE c.relname='referring' AND r.contype = 'f'
379 test-> ;
380 pg_get_constraintdef
381 ---------------------------------------------------
382 FOREIGN KEY (referred_id) REFERENCES referred(id)
383 (1 row)
385Above, we created a table ``referred`` as a member of the remote schema
386``test_schema``, however when we added ``test_schema`` to the
387PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the
388``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of
389the function.
391On the other hand, if we set the search path back to the typical default
392of ``public``::
394 test=> SET search_path TO public;
395 SET
397The same query against ``pg_get_constraintdef()`` now returns the fully
398schema-qualified name for us::
400 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
401 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
402 test-> ON n.oid = c.relnamespace
403 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
404 test-> WHERE c.relname='referring' AND r.contype = 'f';
405 pg_get_constraintdef
406 ---------------------------------------------------------------
407 FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id)
408 (1 row)
410SQLAlchemy will by default use the return value of ``pg_get_constraintdef()``
411in order to determine the remote schema name. That is, if our ``search_path``
412were set to include ``test_schema``, and we invoked a table
413reflection process as follows::
415 >>> from sqlalchemy import Table, MetaData, create_engine, text
416 >>> engine = create_engine("postgresql://scott:tiger@localhost/test")
417 >>> with engine.connect() as conn:
418 ... conn.execute(text("SET search_path TO test_schema, public"))
419 ... metadata_obj = MetaData()
420 ... referring = Table('referring', metadata_obj,
421 ... autoload_with=conn)
422 ...
423 <sqlalchemy.engine.result.CursorResult object at 0x101612ed0>
425The above process would deliver to the :attr:`_schema.MetaData.tables`
426collection
427``referred`` table named **without** the schema::
429 >>> metadata_obj.tables['referred'].schema is None
430 True
432To alter the behavior of reflection such that the referred schema is
433maintained regardless of the ``search_path`` setting, use the
434``postgresql_ignore_search_path`` option, which can be specified as a
435dialect-specific argument to both :class:`_schema.Table` as well as
436:meth:`_schema.MetaData.reflect`::
438 >>> with engine.connect() as conn:
439 ... conn.execute(text("SET search_path TO test_schema, public"))
440 ... metadata_obj = MetaData()
441 ... referring = Table('referring', metadata_obj,
442 ... autoload_with=conn,
443 ... postgresql_ignore_search_path=True)
444 ...
445 <sqlalchemy.engine.result.CursorResult object at 0x1016126d0>
447We will now have ``test_schema.referred`` stored as schema-qualified::
449 >>> metadata_obj.tables['test_schema.referred'].schema
450 'test_schema'
452.. sidebar:: Best Practices for PostgreSQL Schema reflection
454 The description of PostgreSQL schema reflection behavior is complex, and
455 is the product of many years of dealing with widely varied use cases and
456 user preferences. But in fact, there's no need to understand any of it if
457 you just stick to the simplest use pattern: leave the ``search_path`` set
458 to its default of ``public`` only, never refer to the name ``public`` as
459 an explicit schema name otherwise, and refer to all other schema names
460 explicitly when building up a :class:`_schema.Table` object. The options
461 described here are only for those users who can't, or prefer not to, stay
462 within these guidelines.
464Note that **in all cases**, the "default" schema is always reflected as
465``None``. The "default" schema on PostgreSQL is that which is returned by the
466PostgreSQL ``current_schema()`` function. On a typical PostgreSQL
467installation, this is the name ``public``. So a table that refers to another
468which is in the ``public`` (i.e. default) schema will always have the
469``.schema`` attribute set to ``None``.
471.. seealso::
473 :ref:`reflection_schema_qualified_interaction` - discussion of the issue
474 from a backend-agnostic perspective
476 `The Schema Search Path
477 <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
478 - on the PostgreSQL website.
480INSERT/UPDATE...RETURNING
481-------------------------
483The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and
484``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default
485for single-row INSERT statements in order to fetch newly generated
486primary key identifiers. To specify an explicit ``RETURNING`` clause,
487use the :meth:`._UpdateBase.returning` method on a per-statement basis::
489 # INSERT..RETURNING
490 result = table.insert().returning(table.c.col1, table.c.col2).\
491 values(name='foo')
492 print(result.fetchall())
494 # UPDATE..RETURNING
495 result = table.update().returning(table.c.col1, table.c.col2).\
496 where(table.c.name=='foo').values(name='bar')
497 print(result.fetchall())
499 # DELETE..RETURNING
500 result = table.delete().returning(table.c.col1, table.c.col2).\
501 where(table.c.name=='foo')
502 print(result.fetchall())
504.. _postgresql_insert_on_conflict:
506INSERT...ON CONFLICT (Upsert)
507------------------------------
509Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of
510rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A
511candidate row will only be inserted if that row does not violate any unique
512constraints. In the case of a unique constraint violation, a secondary action
513can occur which can be either "DO UPDATE", indicating that the data in the
514target row should be updated, or "DO NOTHING", which indicates to silently skip
515this row.
517Conflicts are determined using existing unique constraints and indexes. These
518constraints may be identified either using their name as stated in DDL,
519or they may be inferred by stating the columns and conditions that comprise
520the indexes.
522SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific
523:func:`_postgresql.insert()` function, which provides
524the generative methods :meth:`_postgresql.Insert.on_conflict_do_update`
525and :meth:`~.postgresql.Insert.on_conflict_do_nothing`:
527.. sourcecode:: pycon+sql
529 >>> from sqlalchemy.dialects.postgresql import insert
530 >>> insert_stmt = insert(my_table).values(
531 ... id='some_existing_id',
532 ... data='inserted value')
533 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
534 ... index_elements=['id']
535 ... )
536 >>> print(do_nothing_stmt)
537 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
538 ON CONFLICT (id) DO NOTHING
539 {stop}
541 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
542 ... constraint='pk_my_table',
543 ... set_=dict(data='updated value')
544 ... )
545 >>> print(do_update_stmt)
546 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
547 ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s
549.. versionadded:: 1.1
551.. seealso::
553 `INSERT .. ON CONFLICT
554 <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_
555 - in the PostgreSQL documentation.
557Specifying the Target
558^^^^^^^^^^^^^^^^^^^^^
560Both methods supply the "target" of the conflict using either the
561named constraint or by column inference:
563* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument
564 specifies a sequence containing string column names, :class:`_schema.Column`
565 objects, and/or SQL expression elements, which would identify a unique
566 index:
568 .. sourcecode:: pycon+sql
570 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
571 ... index_elements=['id'],
572 ... set_=dict(data='updated value')
573 ... )
574 >>> print(do_update_stmt)
575 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
576 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
577 {stop}
579 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
580 ... index_elements=[my_table.c.id],
581 ... set_=dict(data='updated value')
582 ... )
583 >>> print(do_update_stmt)
584 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
585 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
587* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to
588 infer an index, a partial index can be inferred by also specifying the
589 use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter:
591 .. sourcecode:: pycon+sql
593 >>> stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
594 >>> stmt = stmt.on_conflict_do_update(
595 ... index_elements=[my_table.c.user_email],
596 ... index_where=my_table.c.user_email.like('%@gmail.com'),
597 ... set_=dict(data=stmt.excluded.data)
598 ... )
599 >>> print(stmt)
600 {opensql}INSERT INTO my_table (data, user_email)
601 VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email)
602 WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data
604* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is
605 used to specify an index directly rather than inferring it. This can be
606 the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX:
608 .. sourcecode:: pycon+sql
610 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
611 ... constraint='my_table_idx_1',
612 ... set_=dict(data='updated value')
613 ... )
614 >>> print(do_update_stmt)
615 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
616 ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s
617 {stop}
619 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
620 ... constraint='my_table_pk',
621 ... set_=dict(data='updated value')
622 ... )
623 >>> print(do_update_stmt)
624 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
625 ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s
626 {stop}
628* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may
629 also refer to a SQLAlchemy construct representing a constraint,
630 e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`,
631 :class:`.Index`, or :class:`.ExcludeConstraint`. In this use,
632 if the constraint has a name, it is used directly. Otherwise, if the
633 constraint is unnamed, then inference will be used, where the expressions
634 and optional WHERE clause of the constraint will be spelled out in the
635 construct. This use is especially convenient
636 to refer to the named or unnamed primary key of a :class:`_schema.Table`
637 using the
638 :attr:`_schema.Table.primary_key` attribute:
640 .. sourcecode:: pycon+sql
642 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
643 ... constraint=my_table.primary_key,
644 ... set_=dict(data='updated value')
645 ... )
646 >>> print(do_update_stmt)
647 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
648 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
650The SET Clause
651^^^^^^^^^^^^^^^
653``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
654existing row, using any combination of new values as well as values
655from the proposed insertion. These values are specified using the
656:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This
657parameter accepts a dictionary which consists of direct values
658for UPDATE:
660.. sourcecode:: pycon+sql
662 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
663 >>> do_update_stmt = stmt.on_conflict_do_update(
664 ... index_elements=['id'],
665 ... set_=dict(data='updated value')
666 ... )
667 >>> print(do_update_stmt)
668 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
669 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
671.. warning::
673 The :meth:`_expression.Insert.on_conflict_do_update`
674 method does **not** take into
675 account Python-side default UPDATE values or generation functions, e.g.
676 those specified using :paramref:`_schema.Column.onupdate`.
677 These values will not be exercised for an ON CONFLICT style of UPDATE,
678 unless they are manually specified in the
679 :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary.
681Updating using the Excluded INSERT Values
682^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
684In order to refer to the proposed insertion row, the special alias
685:attr:`~.postgresql.Insert.excluded` is available as an attribute on
686the :class:`_postgresql.Insert` object; this object is a
687:class:`_expression.ColumnCollection`
688which alias contains all columns of the target
689table:
691.. sourcecode:: pycon+sql
693 >>> stmt = insert(my_table).values(
694 ... id='some_id',
695 ... data='inserted value',
696 ... author='jlh'
697 ... )
698 >>> do_update_stmt = stmt.on_conflict_do_update(
699 ... index_elements=['id'],
700 ... set_=dict(data='updated value', author=stmt.excluded.author)
701 ... )
702 >>> print(do_update_stmt)
703 {opensql}INSERT INTO my_table (id, data, author)
704 VALUES (%(id)s, %(data)s, %(author)s)
705 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
707Additional WHERE Criteria
708^^^^^^^^^^^^^^^^^^^^^^^^^
710The :meth:`_expression.Insert.on_conflict_do_update` method also accepts
711a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where`
712parameter, which will limit those rows which receive an UPDATE:
714.. sourcecode:: pycon+sql
716 >>> stmt = insert(my_table).values(
717 ... id='some_id',
718 ... data='inserted value',
719 ... author='jlh'
720 ... )
721 >>> on_update_stmt = stmt.on_conflict_do_update(
722 ... index_elements=['id'],
723 ... set_=dict(data='updated value', author=stmt.excluded.author),
724 ... where=(my_table.c.status == 2)
725 ... )
726 >>> print(on_update_stmt)
727 {opensql}INSERT INTO my_table (id, data, author)
728 VALUES (%(id)s, %(data)s, %(author)s)
729 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
730 WHERE my_table.status = %(status_1)s
732Skipping Rows with DO NOTHING
733^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
735``ON CONFLICT`` may be used to skip inserting a row entirely
736if any conflict with a unique or exclusion constraint occurs; below
737this is illustrated using the
738:meth:`~.postgresql.Insert.on_conflict_do_nothing` method:
740.. sourcecode:: pycon+sql
742 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
743 >>> stmt = stmt.on_conflict_do_nothing(index_elements=['id'])
744 >>> print(stmt)
745 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
746 ON CONFLICT (id) DO NOTHING
748If ``DO NOTHING`` is used without specifying any columns or constraint,
749it has the effect of skipping the INSERT for any unique or exclusion
750constraint violation which occurs:
752.. sourcecode:: pycon+sql
754 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
755 >>> stmt = stmt.on_conflict_do_nothing()
756 >>> print(stmt)
757 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
758 ON CONFLICT DO NOTHING
760.. _postgresql_match:
762Full Text Search
763----------------
765SQLAlchemy makes available the PostgreSQL ``@@`` operator via the
766:meth:`_expression.ColumnElement.match` method on any textual column expression.
768On the PostgreSQL dialect, an expression like the following::
770 select(sometable.c.text.match("search string"))
772will emit to the database::
774 SELECT text @@ to_tsquery('search string') FROM table
776Various other PostgreSQL text search functions such as ``to_tsquery()``,
777``to_tsvector()``, and ``plainto_tsquery()`` are available by explicitly using
778the standard SQLAlchemy :data:`.func` construct.
780For example::
782 select(func.to_tsvector('fat cats ate rats').match('cat & rat'))
784Emits the equivalent of::
786 SELECT to_tsvector('fat cats ate rats') @@ to_tsquery('cat & rat')
788The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST::
790 from sqlalchemy.dialects.postgresql import TSVECTOR
791 from sqlalchemy import select, cast
792 select(cast("some text", TSVECTOR))
794produces a statement equivalent to::
796 SELECT CAST('some text' AS TSVECTOR) AS anon_1
798.. tip::
800 It's important to remember that text searching in PostgreSQL is powerful but complicated,
801 and SQLAlchemy users are advised to reference the PostgreSQL documentation
802 regarding
803 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_.
805 There are important differences between ``to_tsquery`` and
806 ``plainto_tsquery``, the most significant of which is that ``to_tsquery``
807 expects specially formatted "querytext" that is written to PostgreSQL's own
808 specification, while ``plainto_tsquery`` expects unformatted text that is
809 transformed into ``to_tsquery`` compatible querytext. This means the input to
810 ``.match()`` under PostgreSQL may be incompatible with the input to
811 ``.match()`` under another database backend. SQLAlchemy users who support
812 multiple backends are advised to carefully implement their usage of
813 ``.match()`` to work around these constraints.
815Full Text Searches in PostgreSQL are influenced by a combination of: the
816PostgreSQL setting of ``default_text_search_config``, the ``regconfig`` used
817to build the GIN/GiST indexes, and the ``regconfig`` optionally passed in
818during a query.
820When performing a Full Text Search against a column that has a GIN or
821GiST index that is already pre-computed (which is common on full text
822searches) one may need to explicitly pass in a particular PostgreSQL
823``regconfig`` value to ensure the query-planner utilizes the index and does
824not re-compute the column on demand.
826In order to provide for this explicit query planning, or to use different
827search strategies, the ``match`` method accepts a ``postgresql_regconfig``
828keyword argument::
830 select(mytable.c.id).where(
831 mytable.c.title.match('somestring', postgresql_regconfig='english')
832 )
834Emits the equivalent of::
836 SELECT mytable.id FROM mytable
837 WHERE mytable.title @@ to_tsquery('english', 'somestring')
839One can also specifically pass in a `'regconfig'` value to the
840``to_tsvector()`` command as the initial argument::
842 select(mytable.c.id).where(
843 func.to_tsvector('english', mytable.c.title )\
844 .match('somestring', postgresql_regconfig='english')
845 )
847produces a statement equivalent to::
849 SELECT mytable.id FROM mytable
850 WHERE to_tsvector('english', mytable.title) @@
851 to_tsquery('english', 'somestring')
853It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from
854PostgreSQL to ensure that you are generating queries with SQLAlchemy that
855take full advantage of any indexes you may have created for full text search.
857.. seealso::
859 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation
862FROM ONLY ...
863-------------
865The dialect supports PostgreSQL's ONLY keyword for targeting only a particular
866table in an inheritance hierarchy. This can be used to produce the
867``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...``
868syntaxes. It uses SQLAlchemy's hints mechanism::
870 # SELECT ... FROM ONLY ...
871 result = table.select().with_hint(table, 'ONLY', 'postgresql')
872 print(result.fetchall())
874 # UPDATE ONLY ...
875 table.update(values=dict(foo='bar')).with_hint('ONLY',
876 dialect_name='postgresql')
878 # DELETE FROM ONLY ...
879 table.delete().with_hint('ONLY', dialect_name='postgresql')
882.. _postgresql_indexes:
884PostgreSQL-Specific Index Options
885---------------------------------
887Several extensions to the :class:`.Index` construct are available, specific
888to the PostgreSQL dialect.
890Covering Indexes
891^^^^^^^^^^^^^^^^
893The ``postgresql_include`` option renders INCLUDE(colname) for the given
894string names::
896 Index("my_index", table.c.x, postgresql_include=['y'])
898would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
900Note that this feature requires PostgreSQL 11 or later.
902.. versionadded:: 1.4
904.. _postgresql_partial_indexes:
906Partial Indexes
907^^^^^^^^^^^^^^^
909Partial indexes add criterion to the index definition so that the index is
910applied to a subset of rows. These can be specified on :class:`.Index`
911using the ``postgresql_where`` keyword argument::
913 Index('my_index', my_table.c.id, postgresql_where=my_table.c.value > 10)
915.. _postgresql_operator_classes:
917Operator Classes
918^^^^^^^^^^^^^^^^
920PostgreSQL allows the specification of an *operator class* for each column of
921an index (see
922https://www.postgresql.org/docs/current/interactive/indexes-opclass.html).
923The :class:`.Index` construct allows these to be specified via the
924``postgresql_ops`` keyword argument::
926 Index(
927 'my_index', my_table.c.id, my_table.c.data,
928 postgresql_ops={
929 'data': 'text_pattern_ops',
930 'id': 'int4_ops'
931 })
933Note that the keys in the ``postgresql_ops`` dictionaries are the
934"key" name of the :class:`_schema.Column`, i.e. the name used to access it from
935the ``.c`` collection of :class:`_schema.Table`, which can be configured to be
936different than the actual name of the column as expressed in the database.
938If ``postgresql_ops`` is to be used against a complex SQL expression such
939as a function call, then to apply to the column it must be given a label
940that is identified in the dictionary by name, e.g.::
942 Index(
943 'my_index', my_table.c.id,
944 func.lower(my_table.c.data).label('data_lower'),
945 postgresql_ops={
946 'data_lower': 'text_pattern_ops',
947 'id': 'int4_ops'
948 })
950Operator classes are also supported by the
951:class:`_postgresql.ExcludeConstraint` construct using the
952:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for
953details.
955.. versionadded:: 1.3.21 added support for operator classes with
956 :class:`_postgresql.ExcludeConstraint`.
959Index Types
960^^^^^^^^^^^
962PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well
963as the ability for users to create their own (see
964https://www.postgresql.org/docs/current/static/indexes-types.html). These can be
965specified on :class:`.Index` using the ``postgresql_using`` keyword argument::
967 Index('my_index', my_table.c.data, postgresql_using='gin')
969The value passed to the keyword argument will be simply passed through to the
970underlying CREATE INDEX command, so it *must* be a valid index type for your
971version of PostgreSQL.
973.. _postgresql_index_storage:
975Index Storage Parameters
976^^^^^^^^^^^^^^^^^^^^^^^^
978PostgreSQL allows storage parameters to be set on indexes. The storage
979parameters available depend on the index method used by the index. Storage
980parameters can be specified on :class:`.Index` using the ``postgresql_with``
981keyword argument::
983 Index('my_index', my_table.c.data, postgresql_with={"fillfactor": 50})
985.. versionadded:: 1.0.6
987PostgreSQL allows to define the tablespace in which to create the index.
988The tablespace can be specified on :class:`.Index` using the
989``postgresql_tablespace`` keyword argument::
991 Index('my_index', my_table.c.data, postgresql_tablespace='my_tablespace')
993.. versionadded:: 1.1
995Note that the same option is available on :class:`_schema.Table` as well.
997.. _postgresql_index_concurrently:
999Indexes with CONCURRENTLY
1000^^^^^^^^^^^^^^^^^^^^^^^^^
1002The PostgreSQL index option CONCURRENTLY is supported by passing the
1003flag ``postgresql_concurrently`` to the :class:`.Index` construct::
1005 tbl = Table('testtbl', m, Column('data', Integer))
1007 idx1 = Index('test_idx1', tbl.c.data, postgresql_concurrently=True)
1009The above index construct will render DDL for CREATE INDEX, assuming
1010PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as::
1012 CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)
1014For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for
1015a connection-less dialect, it will emit::
1017 DROP INDEX CONCURRENTLY test_idx1
1019.. versionadded:: 1.1 support for CONCURRENTLY on DROP INDEX. The
1020 CONCURRENTLY keyword is now only emitted if a high enough version
1021 of PostgreSQL is detected on the connection (or for a connection-less
1022 dialect).
1024When using CONCURRENTLY, the PostgreSQL database requires that the statement
1025be invoked outside of a transaction block. The Python DBAPI enforces that
1026even for a single statement, a transaction is present, so to use this
1027construct, the DBAPI's "autocommit" mode must be used::
1029 metadata = MetaData()
1030 table = Table(
1031 "foo", metadata,
1032 Column("id", String))
1033 index = Index(
1034 "foo_idx", table.c.id, postgresql_concurrently=True)
1036 with engine.connect() as conn:
1037 with conn.execution_options(isolation_level='AUTOCOMMIT'):
1038 table.create(conn)
1040.. seealso::
1042 :ref:`postgresql_isolation_level`
1044.. _postgresql_index_reflection:
1046PostgreSQL Index Reflection
1047---------------------------
1049The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the
1050UNIQUE CONSTRAINT construct is used. When inspecting a table using
1051:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes`
1052and the :meth:`_reflection.Inspector.get_unique_constraints`
1053will report on these
1054two constructs distinctly; in the case of the index, the key
1055``duplicates_constraint`` will be present in the index entry if it is
1056detected as mirroring a constraint. When performing reflection using
1057``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned
1058in :attr:`_schema.Table.indexes` when it is detected as mirroring a
1059:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection
1060.
1062.. versionchanged:: 1.0.0 - :class:`_schema.Table` reflection now includes
1063 :class:`.UniqueConstraint` objects present in the
1064 :attr:`_schema.Table.constraints`
1065 collection; the PostgreSQL backend will no longer include a "mirrored"
1066 :class:`.Index` construct in :attr:`_schema.Table.indexes`
1067 if it is detected
1068 as corresponding to a unique constraint.
1070Special Reflection Options
1071--------------------------
1073The :class:`_reflection.Inspector`
1074used for the PostgreSQL backend is an instance
1075of :class:`.PGInspector`, which offers additional methods::
1077 from sqlalchemy import create_engine, inspect
1079 engine = create_engine("postgresql+psycopg2://localhost/test")
1080 insp = inspect(engine) # will be a PGInspector
1082 print(insp.get_enums())
1084.. autoclass:: PGInspector
1085 :members:
1087.. _postgresql_table_options:
1089PostgreSQL Table Options
1090------------------------
1092Several options for CREATE TABLE are supported directly by the PostgreSQL
1093dialect in conjunction with the :class:`_schema.Table` construct:
1095* ``TABLESPACE``::
1097 Table("some_table", metadata, ..., postgresql_tablespace='some_tablespace')
1099 The above option is also available on the :class:`.Index` construct.
1101* ``ON COMMIT``::
1103 Table("some_table", metadata, ..., postgresql_on_commit='PRESERVE ROWS')
1105* ``WITH OIDS``::
1107 Table("some_table", metadata, ..., postgresql_with_oids=True)
1109* ``WITHOUT OIDS``::
1111 Table("some_table", metadata, ..., postgresql_with_oids=False)
1113* ``INHERITS``::
1115 Table("some_table", metadata, ..., postgresql_inherits="some_supertable")
1117 Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...))
1119 .. versionadded:: 1.0.0
1121* ``PARTITION BY``::
1123 Table("some_table", metadata, ...,
1124 postgresql_partition_by='LIST (part_column)')
1126 .. versionadded:: 1.2.6
1128.. seealso::
1130 `PostgreSQL CREATE TABLE options
1131 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ -
1132 in the PostgreSQL documentation.
1134.. _postgresql_constraint_options:
1136PostgreSQL Constraint Options
1137-----------------------------
1139The following option(s) are supported by the PostgreSQL dialect in conjunction
1140with selected constraint constructs:
1142* ``NOT VALID``: This option applies towards CHECK and FOREIGN KEY constraints
1143 when the constraint is being added to an existing table via ALTER TABLE,
1144 and has the effect that existing rows are not scanned during the ALTER
1145 operation against the constraint being added.
1147 When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_
1148 that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument
1149 may be specified as an additional keyword argument within the operation
1150 that creates the constraint, as in the following Alembic example::
1152 def update():
1153 op.create_foreign_key(
1154 "fk_user_address",
1155 "address",
1156 "user",
1157 ["user_id"],
1158 ["id"],
1159 postgresql_not_valid=True
1160 )
1162 The keyword is ultimately accepted directly by the
1163 :class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint`
1164 and :class:`_schema.ForeignKey` constructs; when using a tool like
1165 Alembic, dialect-specific keyword arguments are passed through to
1166 these constructs from the migration operation directives::
1168 CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True)
1170 ForeignKeyConstraint(["some_id"], ["some_table.some_id"], postgresql_not_valid=True)
1172 .. versionadded:: 1.4.32
1174 .. seealso::
1176 `PostgreSQL ALTER TABLE options
1177 <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ -
1178 in the PostgreSQL documentation.
1180.. _postgresql_table_valued_overview:
1182Table values, Table and Column valued functions, Row and Tuple objects
1183-----------------------------------------------------------------------
1185PostgreSQL makes great use of modern SQL forms such as table-valued functions,
1186tables and rows as values. These constructs are commonly used as part
1187of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other
1188datatypes. SQLAlchemy's SQL expression language has native support for
1189most table-valued and row-valued forms.
1191.. _postgresql_table_valued:
1193Table-Valued Functions
1194^^^^^^^^^^^^^^^^^^^^^^^
1196Many PostgreSQL built-in functions are intended to be used in the FROM clause
1197of a SELECT statement, and are capable of returning table rows or sets of table
1198rows. A large portion of PostgreSQL's JSON functions for example such as
1199``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``,
1200``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such
1201forms. These classes of SQL function calling forms in SQLAlchemy are available
1202using the :meth:`_functions.FunctionElement.table_valued` method in conjunction
1203with :class:`_functions.Function` objects generated from the :data:`_sql.func`
1204namespace.
1206Examples from PostgreSQL's reference documentation follow below:
1208* ``json_each()``::
1210 >>> from sqlalchemy import select, func
1211 >>> stmt = select(func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value"))
1212 >>> print(stmt)
1213 SELECT anon_1.key, anon_1.value
1214 FROM json_each(:json_each_1) AS anon_1
1216* ``json_populate_record()``::
1218 >>> from sqlalchemy import select, func, literal_column
1219 >>> stmt = select(
1220 ... func.json_populate_record(
1221 ... literal_column("null::myrowtype"),
1222 ... '{"a":1,"b":2}'
1223 ... ).table_valued("a", "b", name="x")
1224 ... )
1225 >>> print(stmt)
1226 SELECT x.a, x.b
1227 FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x
1229* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived
1230 columns in the alias, where we may make use of :func:`_sql.column` elements with
1231 types to produce them. The :meth:`_functions.FunctionElement.table_valued`
1232 method produces a :class:`_sql.TableValuedAlias` construct, and the method
1233 :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived
1234 columns specification::
1236 >>> from sqlalchemy import select, func, column, Integer, Text
1237 >>> stmt = select(
1238 ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}').table_valued(
1239 ... column("a", Integer), column("b", Text), column("d", Text),
1240 ... ).render_derived(name="x", with_types=True)
1241 ... )
1242 >>> print(stmt)
1243 SELECT x.a, x.b, x.d
1244 FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT)
1246* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an
1247 ordinal counter to the output of a function and is accepted by a limited set
1248 of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The
1249 :meth:`_functions.FunctionElement.table_valued` method accepts a keyword
1250 parameter ``with_ordinality`` for this purpose, which accepts the string name
1251 that will be applied to the "ordinality" column::
1253 >>> from sqlalchemy import select, func
1254 >>> stmt = select(
1255 ... func.generate_series(4, 1, -1).
1256 ... table_valued("value", with_ordinality="ordinality").
1257 ... render_derived()
1258 ... )
1259 >>> print(stmt)
1260 SELECT anon_1.value, anon_1.ordinality
1261 FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3)
1262 WITH ORDINALITY AS anon_1(value, ordinality)
1264.. versionadded:: 1.4.0b2
1266.. seealso::
1268 :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial`
1270.. _postgresql_column_valued:
1272Column Valued Functions
1273^^^^^^^^^^^^^^^^^^^^^^^
1275Similar to the table valued function, a column valued function is present
1276in the FROM clause, but delivers itself to the columns clause as a single
1277scalar value. PostgreSQL functions such as ``json_array_elements()``,
1278``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the
1279:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`:
1281* ``json_array_elements()``::
1283 >>> from sqlalchemy import select, func
1284 >>> stmt = select(func.json_array_elements('["one", "two"]').column_valued("x"))
1285 >>> print(stmt)
1286 SELECT x
1287 FROM json_array_elements(:json_array_elements_1) AS x
1289* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the
1290 :func:`_postgresql.array` construct may be used::
1293 >>> from sqlalchemy.dialects.postgresql import array
1294 >>> from sqlalchemy import select, func
1295 >>> stmt = select(func.unnest(array([1, 2])).column_valued())
1296 >>> print(stmt)
1297 SELECT anon_1
1298 FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1
1300 The function can of course be used against an existing table-bound column
1301 that's of type :class:`_types.ARRAY`::
1303 >>> from sqlalchemy import table, column, ARRAY, Integer
1304 >>> from sqlalchemy import select, func
1305 >>> t = table("t", column('value', ARRAY(Integer)))
1306 >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value"))
1307 >>> print(stmt)
1308 SELECT unnested_value
1309 FROM unnest(t.value) AS unnested_value
1311.. seealso::
1313 :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial`
1316Row Types
1317^^^^^^^^^
1319Built-in support for rendering a ``ROW`` may be approximated using
1320``func.ROW`` with the :attr:`_sa.func` namespace, or by using the
1321:func:`_sql.tuple_` construct::
1323 >>> from sqlalchemy import table, column, func, tuple_
1324 >>> t = table("t", column("id"), column("fk"))
1325 >>> stmt = t.select().where(
1326 ... tuple_(t.c.id, t.c.fk) > (1,2)
1327 ... ).where(
1328 ... func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7)
1329 ... )
1330 >>> print(stmt)
1331 SELECT t.id, t.fk
1332 FROM t
1333 WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2)
1335.. seealso::
1337 `PostgreSQL Row Constructors
1338 <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_
1340 `PostgreSQL Row Constructor Comparison
1341 <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_
1343Table Types passed to Functions
1344^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1346PostgreSQL supports passing a table as an argument to a function, which it
1347refers towards as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects
1348such as :class:`_schema.Table` support this special form using the
1349:meth:`_sql.FromClause.table_valued` method, which is comparable to the
1350:meth:`_functions.FunctionElement.table_valued` method except that the collection
1351of columns is already established by that of the :class:`_sql.FromClause`
1352itself::
1355 >>> from sqlalchemy import table, column, func, select
1356 >>> a = table( "a", column("id"), column("x"), column("y"))
1357 >>> stmt = select(func.row_to_json(a.table_valued()))
1358 >>> print(stmt)
1359 SELECT row_to_json(a) AS row_to_json_1
1360 FROM a
1362.. versionadded:: 1.4.0b2
1365ARRAY Types
1366-----------
1368The PostgreSQL dialect supports arrays, both as multidimensional column types
1369as well as array literals:
1371* :class:`_postgresql.ARRAY` - ARRAY datatype
1373* :class:`_postgresql.array` - array literal
1375* :func:`_postgresql.array_agg` - ARRAY_AGG SQL function
1377* :class:`_postgresql.aggregate_order_by` - helper for PG's ORDER BY aggregate
1378 function syntax.
1380JSON Types
1381----------
1383The PostgreSQL dialect supports both JSON and JSONB datatypes, including
1384psycopg2's native support and support for all of PostgreSQL's special
1385operators:
1387* :class:`_postgresql.JSON`
1389* :class:`_postgresql.JSONB`
1391HSTORE Type
1392-----------
1394The PostgreSQL HSTORE type as well as hstore literals are supported:
1396* :class:`_postgresql.HSTORE` - HSTORE datatype
1398* :class:`_postgresql.hstore` - hstore literal
1400ENUM Types
1401----------
1403PostgreSQL has an independently creatable TYPE structure which is used
1404to implement an enumerated type. This approach introduces significant
1405complexity on the SQLAlchemy side in terms of when this type should be
1406CREATED and DROPPED. The type object is also an independently reflectable
1407entity. The following sections should be consulted:
1409* :class:`_postgresql.ENUM` - DDL and typing support for ENUM.
1411* :meth:`.PGInspector.get_enums` - retrieve a listing of current ENUM types
1413* :meth:`.postgresql.ENUM.create` , :meth:`.postgresql.ENUM.drop` - individual
1414 CREATE and DROP commands for ENUM.
1416.. _postgresql_array_of_enum:
1418Using ENUM with ARRAY
1419^^^^^^^^^^^^^^^^^^^^^
1421The combination of ENUM and ARRAY is not directly supported by backend
1422DBAPIs at this time. Prior to SQLAlchemy 1.3.17, a special workaround
1423was needed in order to allow this combination to work, described below.
1425.. versionchanged:: 1.3.17 The combination of ENUM and ARRAY is now directly
1426 handled by SQLAlchemy's implementation without any workarounds needed.
1428.. sourcecode:: python
1430 from sqlalchemy import TypeDecorator
1431 from sqlalchemy.dialects.postgresql import ARRAY
1433 class ArrayOfEnum(TypeDecorator):
1434 impl = ARRAY
1436 def bind_expression(self, bindvalue):
1437 return sa.cast(bindvalue, self)
1439 def result_processor(self, dialect, coltype):
1440 super_rp = super(ArrayOfEnum, self).result_processor(
1441 dialect, coltype)
1443 def handle_raw_string(value):
1444 inner = re.match(r"^{(.*)}$", value).group(1)
1445 return inner.split(",") if inner else []
1447 def process(value):
1448 if value is None:
1449 return None
1450 return super_rp(handle_raw_string(value))
1451 return process
1453E.g.::
1455 Table(
1456 'mydata', metadata,
1457 Column('id', Integer, primary_key=True),
1458 Column('data', ArrayOfEnum(ENUM('a', 'b, 'c', name='myenum')))
1460 )
1462This type is not included as a built-in type as it would be incompatible
1463with a DBAPI that suddenly decides to support ARRAY of ENUM directly in
1464a new version.
1466.. _postgresql_array_of_json:
1468Using JSON/JSONB with ARRAY
1469^^^^^^^^^^^^^^^^^^^^^^^^^^^
1471Similar to using ENUM, prior to SQLAlchemy 1.3.17, for an ARRAY of JSON/JSONB
1472we need to render the appropriate CAST. Current psycopg2 drivers accommodate
1473the result set correctly without any special steps.
1475.. versionchanged:: 1.3.17 The combination of JSON/JSONB and ARRAY is now
1476 directly handled by SQLAlchemy's implementation without any workarounds
1477 needed.
1479.. sourcecode:: python
1481 class CastingArray(ARRAY):
1482 def bind_expression(self, bindvalue):
1483 return sa.cast(bindvalue, self)
1485E.g.::
1487 Table(
1488 'mydata', metadata,
1489 Column('id', Integer, primary_key=True),
1490 Column('data', CastingArray(JSONB))
1491 )
1494""" # noqa: E501
1496from collections import defaultdict
1497import datetime as dt
1498import re
1499from uuid import UUID as _python_UUID
1501from . import array as _array
1502from . import dml
1503from . import hstore as _hstore
1504from . import json as _json
1505from . import ranges as _ranges
1506from ... import exc
1507from ... import schema
1508from ... import sql
1509from ... import util
1510from ...engine import characteristics
1511from ...engine import default
1512from ...engine import reflection
1513from ...sql import coercions
1514from ...sql import compiler
1515from ...sql import elements
1516from ...sql import expression
1517from ...sql import roles
1518from ...sql import sqltypes
1519from ...sql import util as sql_util
1520from ...sql.ddl import DDLBase
1521from ...types import BIGINT
1522from ...types import BOOLEAN
1523from ...types import CHAR
1524from ...types import DATE
1525from ...types import FLOAT
1526from ...types import INTEGER
1527from ...types import NUMERIC
1528from ...types import REAL
1529from ...types import SMALLINT
1530from ...types import TEXT
1531from ...types import VARCHAR
1533IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I)
1535AUTOCOMMIT_REGEXP = re.compile(
1536 r"\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER|GRANT|REVOKE|"
1537 "IMPORT FOREIGN SCHEMA|REFRESH MATERIALIZED VIEW|TRUNCATE)",
1538 re.I | re.UNICODE,
1539)
1541RESERVED_WORDS = set(
1542 [
1543 "all",
1544 "analyse",
1545 "analyze",
1546 "and",
1547 "any",
1548 "array",
1549 "as",
1550 "asc",
1551 "asymmetric",
1552 "both",
1553 "case",
1554 "cast",
1555 "check",
1556 "collate",
1557 "column",
1558 "constraint",
1559 "create",
1560 "current_catalog",
1561 "current_date",
1562 "current_role",
1563 "current_time",
1564 "current_timestamp",
1565 "current_user",
1566 "default",
1567 "deferrable",
1568 "desc",
1569 "distinct",
1570 "do",
1571 "else",
1572 "end",
1573 "except",
1574 "false",
1575 "fetch",
1576 "for",
1577 "foreign",
1578 "from",
1579 "grant",
1580 "group",
1581 "having",
1582 "in",
1583 "initially",
1584 "intersect",
1585 "into",
1586 "leading",
1587 "limit",
1588 "localtime",
1589 "localtimestamp",
1590 "new",
1591 "not",
1592 "null",
1593 "of",
1594 "off",
1595 "offset",
1596 "old",
1597 "on",
1598 "only",
1599 "or",
1600 "order",
1601 "placing",
1602 "primary",
1603 "references",
1604 "returning",
1605 "select",
1606 "session_user",
1607 "some",
1608 "symmetric",
1609 "table",
1610 "then",
1611 "to",
1612 "trailing",
1613 "true",
1614 "union",
1615 "unique",
1616 "user",
1617 "using",
1618 "variadic",
1619 "when",
1620 "where",
1621 "window",
1622 "with",
1623 "authorization",
1624 "between",
1625 "binary",
1626 "cross",
1627 "current_schema",
1628 "freeze",
1629 "full",
1630 "ilike",
1631 "inner",
1632 "is",
1633 "isnull",
1634 "join",
1635 "left",
1636 "like",
1637 "natural",
1638 "notnull",
1639 "outer",
1640 "over",
1641 "overlaps",
1642 "right",
1643 "similar",
1644 "verbose",
1645 ]
1646)
1648_DECIMAL_TYPES = (1231, 1700)
1649_FLOAT_TYPES = (700, 701, 1021, 1022)
1650_INT_TYPES = (20, 21, 23, 26, 1005, 1007, 1016)
1653class BYTEA(sqltypes.LargeBinary):
1654 __visit_name__ = "BYTEA"
1657class DOUBLE_PRECISION(sqltypes.Float):
1658 __visit_name__ = "DOUBLE_PRECISION"
1661class INET(sqltypes.TypeEngine):
1662 __visit_name__ = "INET"
1665PGInet = INET
1668class CIDR(sqltypes.TypeEngine):
1669 __visit_name__ = "CIDR"
1672PGCidr = CIDR
1675class MACADDR(sqltypes.TypeEngine):
1676 __visit_name__ = "MACADDR"
1679PGMacAddr = MACADDR
1682class MONEY(sqltypes.TypeEngine):
1684 r"""Provide the PostgreSQL MONEY type.
1686 Depending on driver, result rows using this type may return a
1687 string value which includes currency symbols.
1689 For this reason, it may be preferable to provide conversion to a
1690 numerically-based currency datatype using :class:`_types.TypeDecorator`::
1692 import re
1693 import decimal
1694 from sqlalchemy import TypeDecorator
1696 class NumericMoney(TypeDecorator):
1697 impl = MONEY
1699 def process_result_value(self, value: Any, dialect: Any) -> None:
1700 if value is not None:
1701 # adjust this for the currency and numeric
1702 m = re.match(r"\$([\d.]+)", value)
1703 if m:
1704 value = decimal.Decimal(m.group(1))
1705 return value
1707 Alternatively, the conversion may be applied as a CAST using
1708 the :meth:`_types.TypeDecorator.column_expression` method as follows::
1710 import decimal
1711 from sqlalchemy import cast
1712 from sqlalchemy import TypeDecorator
1714 class NumericMoney(TypeDecorator):
1715 impl = MONEY
1717 def column_expression(self, column: Any):
1718 return cast(column, Numeric())
1720 .. versionadded:: 1.2
1722 """
1724 __visit_name__ = "MONEY"
1727class OID(sqltypes.TypeEngine):
1729 """Provide the PostgreSQL OID type.
1731 .. versionadded:: 0.9.5
1733 """
1735 __visit_name__ = "OID"
1738class REGCLASS(sqltypes.TypeEngine):
1740 """Provide the PostgreSQL REGCLASS type.
1742 .. versionadded:: 1.2.7
1744 """
1746 __visit_name__ = "REGCLASS"
1749class TIMESTAMP(sqltypes.TIMESTAMP):
1751 """Provide the PostgreSQL TIMESTAMP type."""
1753 __visit_name__ = "TIMESTAMP"
1755 def __init__(self, timezone=False, precision=None):
1756 """Construct a TIMESTAMP.
1758 :param timezone: boolean value if timezone present, default False
1759 :param precision: optional integer precision value
1761 .. versionadded:: 1.4
1763 """
1764 super(TIMESTAMP, self).__init__(timezone=timezone)
1765 self.precision = precision
1768class TIME(sqltypes.TIME):
1770 """PostgreSQL TIME type."""
1772 __visit_name__ = "TIME"
1774 def __init__(self, timezone=False, precision=None):
1775 """Construct a TIME.
1777 :param timezone: boolean value if timezone present, default False
1778 :param precision: optional integer precision value
1780 .. versionadded:: 1.4
1782 """
1783 super(TIME, self).__init__(timezone=timezone)
1784 self.precision = precision
1787class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval):
1789 """PostgreSQL INTERVAL type."""
1791 __visit_name__ = "INTERVAL"
1792 native = True
1794 def __init__(self, precision=None, fields=None):
1795 """Construct an INTERVAL.
1797 :param precision: optional integer precision value
1798 :param fields: string fields specifier. allows storage of fields
1799 to be limited, such as ``"YEAR"``, ``"MONTH"``, ``"DAY TO HOUR"``,
1800 etc.
1802 .. versionadded:: 1.2
1804 """
1805 self.precision = precision
1806 self.fields = fields
1808 @classmethod
1809 def adapt_emulated_to_native(cls, interval, **kw):
1810 return INTERVAL(precision=interval.second_precision)
1812 @property
1813 def _type_affinity(self):
1814 return sqltypes.Interval
1816 def as_generic(self, allow_nulltype=False):
1817 return sqltypes.Interval(native=True, second_precision=self.precision)
1819 @property
1820 def python_type(self):
1821 return dt.timedelta
1823 def coerce_compared_value(self, op, value):
1824 return self
1827PGInterval = INTERVAL
1830class BIT(sqltypes.TypeEngine):
1831 __visit_name__ = "BIT"
1833 def __init__(self, length=None, varying=False):
1834 if not varying:
1835 # BIT without VARYING defaults to length 1
1836 self.length = length or 1
1837 else:
1838 # but BIT VARYING can be unlimited-length, so no default
1839 self.length = length
1840 self.varying = varying
1843PGBit = BIT
1846class UUID(sqltypes.TypeEngine):
1848 """PostgreSQL UUID type.
1850 Represents the UUID column type, interpreting
1851 data either as natively returned by the DBAPI
1852 or as Python uuid objects.
1854 The UUID type is currently known to work within the prominent DBAPI
1855 drivers supported by SQLAlchemy including psycopg2, pg8000 and
1856 asyncpg. Support for other DBAPI drivers may be incomplete or non-present.
1858 """
1860 __visit_name__ = "UUID"
1862 def __init__(self, as_uuid=False):
1863 """Construct a UUID type.
1866 :param as_uuid=False: if True, values will be interpreted
1867 as Python uuid objects, converting to/from string via the
1868 DBAPI.
1870 """
1871 self.as_uuid = as_uuid
1873 def coerce_compared_value(self, op, value):
1874 """See :meth:`.TypeEngine.coerce_compared_value` for a description."""
1876 if isinstance(value, util.string_types):
1877 return self
1878 else:
1879 return super(UUID, self).coerce_compared_value(op, value)
1881 def bind_processor(self, dialect):
1882 if self.as_uuid:
1884 def process(value):
1885 if value is not None:
1886 value = util.text_type(value)
1887 return value
1889 return process
1890 else:
1891 return None
1893 def result_processor(self, dialect, coltype):
1894 if self.as_uuid:
1896 def process(value):
1897 if value is not None:
1898 value = _python_UUID(value)
1899 return value
1901 return process
1902 else:
1903 return None
1905 def literal_processor(self, dialect):
1906 if self.as_uuid:
1908 def process(value):
1909 if value is not None:
1910 value = "'%s'::UUID" % value
1911 return value
1913 return process
1914 else:
1916 def process(value):
1917 if value is not None:
1918 value = "'%s'" % value
1919 return value
1921 return process
1923 @property
1924 def python_type(self):
1925 return _python_UUID if self.as_uuid else str
1928PGUuid = UUID
1931class TSVECTOR(sqltypes.TypeEngine):
1933 """The :class:`_postgresql.TSVECTOR` type implements the PostgreSQL
1934 text search type TSVECTOR.
1936 It can be used to do full text queries on natural language
1937 documents.
1939 .. versionadded:: 0.9.0
1941 .. seealso::
1943 :ref:`postgresql_match`
1945 """
1947 __visit_name__ = "TSVECTOR"
1950class ENUM(sqltypes.NativeForEmulated, sqltypes.Enum):
1952 """PostgreSQL ENUM type.
1954 This is a subclass of :class:`_types.Enum` which includes
1955 support for PG's ``CREATE TYPE`` and ``DROP TYPE``.
1957 When the builtin type :class:`_types.Enum` is used and the
1958 :paramref:`.Enum.native_enum` flag is left at its default of
1959 True, the PostgreSQL backend will use a :class:`_postgresql.ENUM`
1960 type as the implementation, so the special create/drop rules
1961 will be used.
1963 The create/drop behavior of ENUM is necessarily intricate, due to the
1964 awkward relationship the ENUM type has in relationship to the
1965 parent table, in that it may be "owned" by just a single table, or
1966 may be shared among many tables.
1968 When using :class:`_types.Enum` or :class:`_postgresql.ENUM`
1969 in an "inline" fashion, the ``CREATE TYPE`` and ``DROP TYPE`` is emitted
1970 corresponding to when the :meth:`_schema.Table.create` and
1971 :meth:`_schema.Table.drop`
1972 methods are called::
1974 table = Table('sometable', metadata,
1975 Column('some_enum', ENUM('a', 'b', 'c', name='myenum'))
1976 )
1978 table.create(engine) # will emit CREATE ENUM and CREATE TABLE
1979 table.drop(engine) # will emit DROP TABLE and DROP ENUM
1981 To use a common enumerated type between multiple tables, the best
1982 practice is to declare the :class:`_types.Enum` or
1983 :class:`_postgresql.ENUM` independently, and associate it with the
1984 :class:`_schema.MetaData` object itself::
1986 my_enum = ENUM('a', 'b', 'c', name='myenum', metadata=metadata)
1988 t1 = Table('sometable_one', metadata,
1989 Column('some_enum', myenum)
1990 )
1992 t2 = Table('sometable_two', metadata,
1993 Column('some_enum', myenum)
1994 )
1996 When this pattern is used, care must still be taken at the level
1997 of individual table creates. Emitting CREATE TABLE without also
1998 specifying ``checkfirst=True`` will still cause issues::
2000 t1.create(engine) # will fail: no such type 'myenum'
2002 If we specify ``checkfirst=True``, the individual table-level create
2003 operation will check for the ``ENUM`` and create if not exists::
2005 # will check if enum exists, and emit CREATE TYPE if not
2006 t1.create(engine, checkfirst=True)
2008 When using a metadata-level ENUM type, the type will always be created
2009 and dropped if either the metadata-wide create/drop is called::
2011 metadata.create_all(engine) # will emit CREATE TYPE
2012 metadata.drop_all(engine) # will emit DROP TYPE
2014 The type can also be created and dropped directly::
2016 my_enum.create(engine)
2017 my_enum.drop(engine)
2019 .. versionchanged:: 1.0.0 The PostgreSQL :class:`_postgresql.ENUM` type
2020 now behaves more strictly with regards to CREATE/DROP. A metadata-level
2021 ENUM type will only be created and dropped at the metadata level,
2022 not the table level, with the exception of
2023 ``table.create(checkfirst=True)``.
2024 The ``table.drop()`` call will now emit a DROP TYPE for a table-level
2025 enumerated type.
2027 """
2029 native_enum = True
2031 def __init__(self, *enums, **kw):
2032 """Construct an :class:`_postgresql.ENUM`.
2034 Arguments are the same as that of
2035 :class:`_types.Enum`, but also including
2036 the following parameters.
2038 :param create_type: Defaults to True.
2039 Indicates that ``CREATE TYPE`` should be
2040 emitted, after optionally checking for the
2041 presence of the type, when the parent
2042 table is being created; and additionally
2043 that ``DROP TYPE`` is called when the table
2044 is dropped. When ``False``, no check
2045 will be performed and no ``CREATE TYPE``
2046 or ``DROP TYPE`` is emitted, unless
2047 :meth:`~.postgresql.ENUM.create`
2048 or :meth:`~.postgresql.ENUM.drop`
2049 are called directly.
2050 Setting to ``False`` is helpful
2051 when invoking a creation scheme to a SQL file
2052 without access to the actual database -
2053 the :meth:`~.postgresql.ENUM.create` and
2054 :meth:`~.postgresql.ENUM.drop` methods can
2055 be used to emit SQL to a target bind.
2057 """
2058 native_enum = kw.pop("native_enum", None)
2059 if native_enum is False:
2060 util.warn(
2061 "the native_enum flag does not apply to the "
2062 "sqlalchemy.dialects.postgresql.ENUM datatype; this type "
2063 "always refers to ENUM. Use sqlalchemy.types.Enum for "
2064 "non-native enum."
2065 )
2066 self.create_type = kw.pop("create_type", True)
2067 super(ENUM, self).__init__(*enums, **kw)
2069 @classmethod
2070 def adapt_emulated_to_native(cls, impl, **kw):
2071 """Produce a PostgreSQL native :class:`_postgresql.ENUM` from plain
2072 :class:`.Enum`.
2074 """
2075 kw.setdefault("validate_strings", impl.validate_strings)
2076 kw.setdefault("name", impl.name)
2077 kw.setdefault("schema", impl.schema)
2078 kw.setdefault("inherit_schema", impl.inherit_schema)
2079 kw.setdefault("metadata", impl.metadata)
2080 kw.setdefault("_create_events", False)
2081 kw.setdefault("values_callable", impl.values_callable)
2082 kw.setdefault("omit_aliases", impl._omit_aliases)
2083 return cls(**kw)
2085 def create(self, bind=None, checkfirst=True):
2086 """Emit ``CREATE TYPE`` for this
2087 :class:`_postgresql.ENUM`.
2089 If the underlying dialect does not support
2090 PostgreSQL CREATE TYPE, no action is taken.
2092 :param bind: a connectable :class:`_engine.Engine`,
2093 :class:`_engine.Connection`, or similar object to emit
2094 SQL.
2095 :param checkfirst: if ``True``, a query against
2096 the PG catalog will be first performed to see
2097 if the type does not exist already before
2098 creating.
2100 """
2101 if not bind.dialect.supports_native_enum:
2102 return
2104 bind._run_ddl_visitor(self.EnumGenerator, self, checkfirst=checkfirst)
2106 def drop(self, bind=None, checkfirst=True):
2107 """Emit ``DROP TYPE`` for this
2108 :class:`_postgresql.ENUM`.
2110 If the underlying dialect does not support
2111 PostgreSQL DROP TYPE, no action is taken.
2113 :param bind: a connectable :class:`_engine.Engine`,
2114 :class:`_engine.Connection`, or similar object to emit
2115 SQL.
2116 :param checkfirst: if ``True``, a query against
2117 the PG catalog will be first performed to see
2118 if the type actually exists before dropping.
2120 """
2121 if not bind.dialect.supports_native_enum:
2122 return
2124 bind._run_ddl_visitor(self.EnumDropper, self, checkfirst=checkfirst)
2126 class EnumGenerator(DDLBase):
2127 def __init__(self, dialect, connection, checkfirst=False, **kwargs):
2128 super(ENUM.EnumGenerator, self).__init__(connection, **kwargs)
2129 self.checkfirst = checkfirst
2131 def _can_create_enum(self, enum):
2132 if not self.checkfirst:
2133 return True
2135 effective_schema = self.connection.schema_for_object(enum)
2137 return not self.connection.dialect.has_type(
2138 self.connection, enum.name, schema=effective_schema
2139 )
2141 def visit_enum(self, enum):
2142 if not self._can_create_enum(enum):
2143 return
2145 self.connection.execute(CreateEnumType(enum))
2147 class EnumDropper(DDLBase):
2148 def __init__(self, dialect, connection, checkfirst=False, **kwargs):
2149 super(ENUM.EnumDropper, self).__init__(connection, **kwargs)
2150 self.checkfirst = checkfirst
2152 def _can_drop_enum(self, enum):
2153 if not self.checkfirst:
2154 return True
2156 effective_schema = self.connection.schema_for_object(enum)
2158 return self.connection.dialect.has_type(
2159 self.connection, enum.name, schema=effective_schema
2160 )
2162 def visit_enum(self, enum):
2163 if not self._can_drop_enum(enum):
2164 return
2166 self.connection.execute(DropEnumType(enum))
2168 def _check_for_name_in_memos(self, checkfirst, kw):
2169 """Look in the 'ddl runner' for 'memos', then
2170 note our name in that collection.
2172 This to ensure a particular named enum is operated
2173 upon only once within any kind of create/drop
2174 sequence without relying upon "checkfirst".
2176 """
2177 if not self.create_type:
2178 return True
2179 if "_ddl_runner" in kw:
2180 ddl_runner = kw["_ddl_runner"]
2181 if "_pg_enums" in ddl_runner.memo:
2182 pg_enums = ddl_runner.memo["_pg_enums"]
2183 else:
2184 pg_enums = ddl_runner.memo["_pg_enums"] = set()
2185 present = (self.schema, self.name) in pg_enums
2186 pg_enums.add((self.schema, self.name))
2187 return present
2188 else:
2189 return False
2191 def _on_table_create(self, target, bind, checkfirst=False, **kw):
2192 if (
2193 checkfirst
2194 or (
2195 not self.metadata
2196 and not kw.get("_is_metadata_operation", False)
2197 )
2198 ) and not self._check_for_name_in_memos(checkfirst, kw):
2199 self.create(bind=bind, checkfirst=checkfirst)
2201 def _on_table_drop(self, target, bind, checkfirst=False, **kw):
2202 if (
2203 not self.metadata
2204 and not kw.get("_is_metadata_operation", False)
2205 and not self._check_for_name_in_memos(checkfirst, kw)
2206 ):
2207 self.drop(bind=bind, checkfirst=checkfirst)
2209 def _on_metadata_create(self, target, bind, checkfirst=False, **kw):
2210 if not self._check_for_name_in_memos(checkfirst, kw):
2211 self.create(bind=bind, checkfirst=checkfirst)
2213 def _on_metadata_drop(self, target, bind, checkfirst=False, **kw):
2214 if not self._check_for_name_in_memos(checkfirst, kw):
2215 self.drop(bind=bind, checkfirst=checkfirst)
2218class _ColonCast(elements.Cast):
2219 __visit_name__ = "colon_cast"
2221 def __init__(self, expression, type_):
2222 self.type = type_
2223 self.clause = expression
2224 self.typeclause = elements.TypeClause(type_)
2227colspecs = {
2228 sqltypes.ARRAY: _array.ARRAY,
2229 sqltypes.Interval: INTERVAL,
2230 sqltypes.Enum: ENUM,
2231 sqltypes.JSON.JSONPathType: _json.JSONPathType,
2232 sqltypes.JSON: _json.JSON,
2233}
2235ischema_names = {
2236 "_array": _array.ARRAY,
2237 "hstore": _hstore.HSTORE,
2238 "json": _json.JSON,
2239 "jsonb": _json.JSONB,
2240 "int4range": _ranges.INT4RANGE,
2241 "int8range": _ranges.INT8RANGE,
2242 "numrange": _ranges.NUMRANGE,
2243 "daterange": _ranges.DATERANGE,
2244 "tsrange": _ranges.TSRANGE,
2245 "tstzrange": _ranges.TSTZRANGE,
2246 "integer": INTEGER,
2247 "bigint": BIGINT,
2248 "smallint": SMALLINT,
2249 "character varying": VARCHAR,
2250 "character": CHAR,
2251 '"char"': sqltypes.String,
2252 "name": sqltypes.String,
2253 "text": TEXT,
2254 "numeric": NUMERIC,
2255 "float": FLOAT,
2256 "real": REAL,
2257 "inet": INET,
2258 "cidr": CIDR,
2259 "uuid": UUID,
2260 "bit": BIT,
2261 "bit varying": BIT,
2262 "macaddr": MACADDR,
2263 "money": MONEY,
2264 "oid": OID,
2265 "regclass": REGCLASS,
2266 "double precision": DOUBLE_PRECISION,
2267 "timestamp": TIMESTAMP,
2268 "timestamp with time zone": TIMESTAMP,
2269 "timestamp without time zone": TIMESTAMP,
2270 "time with time zone": TIME,
2271 "time without time zone": TIME,
2272 "date": DATE,
2273 "time": TIME,
2274 "bytea": BYTEA,
2275 "boolean": BOOLEAN,
2276 "interval": INTERVAL,
2277 "tsvector": TSVECTOR,
2278}
2281class PGCompiler(compiler.SQLCompiler):
2282 def visit_colon_cast(self, element, **kw):
2283 return "%s::%s" % (
2284 element.clause._compiler_dispatch(self, **kw),
2285 element.typeclause._compiler_dispatch(self, **kw),
2286 )
2288 def visit_array(self, element, **kw):
2289 return "ARRAY[%s]" % self.visit_clauselist(element, **kw)
2291 def visit_slice(self, element, **kw):
2292 return "%s:%s" % (
2293 self.process(element.start, **kw),
2294 self.process(element.stop, **kw),
2295 )
2297 def visit_json_getitem_op_binary(
2298 self, binary, operator, _cast_applied=False, **kw
2299 ):
2300 if (
2301 not _cast_applied
2302 and binary.type._type_affinity is not sqltypes.JSON
2303 ):
2304 kw["_cast_applied"] = True
2305 return self.process(sql.cast(binary, binary.type), **kw)
2307 kw["eager_grouping"] = True
2309 return self._generate_generic_binary(
2310 binary, " -> " if not _cast_applied else " ->> ", **kw
2311 )
2313 def visit_json_path_getitem_op_binary(
2314 self, binary, operator, _cast_applied=False, **kw
2315 ):
2316 if (
2317 not _cast_applied
2318 and binary.type._type_affinity is not sqltypes.JSON
2319 ):
2320 kw["_cast_applied"] = True
2321 return self.process(sql.cast(binary, binary.type), **kw)
2323 kw["eager_grouping"] = True
2324 return self._generate_generic_binary(
2325 binary, " #> " if not _cast_applied else " #>> ", **kw
2326 )
2328 def visit_getitem_binary(self, binary, operator, **kw):
2329 return "%s[%s]" % (
2330 self.process(binary.left, **kw),
2331 self.process(binary.right, **kw),
2332 )
2334 def visit_aggregate_order_by(self, element, **kw):
2335 return "%s ORDER BY %s" % (
2336 self.process(element.target, **kw),
2337 self.process(element.order_by, **kw),
2338 )
2340 def visit_match_op_binary(self, binary, operator, **kw):
2341 if "postgresql_regconfig" in binary.modifiers:
2342 regconfig = self.render_literal_value(
2343 binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE
2344 )
2345 if regconfig:
2346 return "%s @@ to_tsquery(%s, %s)" % (
2347 self.process(binary.left, **kw),
2348 regconfig,
2349 self.process(binary.right, **kw),
2350 )
2351 return "%s @@ to_tsquery(%s)" % (
2352 self.process(binary.left, **kw),
2353 self.process(binary.right, **kw),
2354 )
2356 def visit_ilike_op_binary(self, binary, operator, **kw):
2357 escape = binary.modifiers.get("escape", None)
2359 return "%s ILIKE %s" % (
2360 self.process(binary.left, **kw),
2361 self.process(binary.right, **kw),
2362 ) + (
2363 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
2364 if escape
2365 else ""
2366 )
2368 def visit_not_ilike_op_binary(self, binary, operator, **kw):
2369 escape = binary.modifiers.get("escape", None)
2370 return "%s NOT ILIKE %s" % (
2371 self.process(binary.left, **kw),
2372 self.process(binary.right, **kw),
2373 ) + (
2374 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
2375 if escape
2376 else ""
2377 )
2379 def _regexp_match(self, base_op, binary, operator, kw):
2380 flags = binary.modifiers["flags"]
2381 if flags is None:
2382 return self._generate_generic_binary(
2383 binary, " %s " % base_op, **kw
2384 )
2385 if isinstance(flags, elements.BindParameter) and flags.value == "i":
2386 return self._generate_generic_binary(
2387 binary, " %s* " % base_op, **kw
2388 )
2389 return "%s %s CONCAT('(?', %s, ')', %s)" % (
2390 self.process(binary.left, **kw),
2391 base_op,
2392 self.process(flags, **kw),
2393 self.process(binary.right, **kw),
2394 )
2396 def visit_regexp_match_op_binary(self, binary, operator, **kw):
2397 return self._regexp_match("~", binary, operator, kw)
2399 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
2400 return self._regexp_match("!~", binary, operator, kw)
2402 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
2403 string = self.process(binary.left, **kw)
2404 pattern = self.process(binary.right, **kw)
2405 flags = binary.modifiers["flags"]
2406 replacement = self.process(binary.modifiers["replacement"], **kw)
2407 if flags is None:
2408 return "REGEXP_REPLACE(%s, %s, %s)" % (
2409 string,
2410 pattern,
2411 replacement,
2412 )
2413 else:
2414 return "REGEXP_REPLACE(%s, %s, %s, %s)" % (
2415 string,
2416 pattern,
2417 replacement,
2418 self.process(flags, **kw),
2419 )
2421 def visit_empty_set_expr(self, element_types):
2422 # cast the empty set to the type we are comparing against. if
2423 # we are comparing against the null type, pick an arbitrary
2424 # datatype for the empty set
2425 return "SELECT %s WHERE 1!=1" % (
2426 ", ".join(
2427 "CAST(NULL AS %s)"
2428 % self.dialect.type_compiler.process(
2429 INTEGER() if type_._isnull else type_
2430 )
2431 for type_ in element_types or [INTEGER()]
2432 ),
2433 )
2435 def render_literal_value(self, value, type_):
2436 value = super(PGCompiler, self).render_literal_value(value, type_)
2438 if self.dialect._backslash_escapes:
2439 value = value.replace("\\", "\\\\")
2440 return value
2442 def visit_sequence(self, seq, **kw):
2443 return "nextval('%s')" % self.preparer.format_sequence(seq)
2445 def limit_clause(self, select, **kw):
2446 text = ""
2447 if select._limit_clause is not None:
2448 text += " \n LIMIT " + self.process(select._limit_clause, **kw)
2449 if select._offset_clause is not None:
2450 if select._limit_clause is None:
2451 text += "\n LIMIT ALL"
2452 text += " OFFSET " + self.process(select._offset_clause, **kw)
2453 return text
2455 def format_from_hint_text(self, sqltext, table, hint, iscrud):
2456 if hint.upper() != "ONLY":
2457 raise exc.CompileError("Unrecognized hint: %r" % hint)
2458 return "ONLY " + sqltext
2460 def get_select_precolumns(self, select, **kw):
2461 # Do not call super().get_select_precolumns because
2462 # it will warn/raise when distinct on is present
2463 if select._distinct or select._distinct_on:
2464 if select._distinct_on:
2465 return (
2466 "DISTINCT ON ("
2467 + ", ".join(
2468 [
2469 self.process(col, **kw)
2470 for col in select._distinct_on
2471 ]
2472 )
2473 + ") "
2474 )
2475 else:
2476 return "DISTINCT "
2477 else:
2478 return ""
2480 def for_update_clause(self, select, **kw):
2482 if select._for_update_arg.read:
2483 if select._for_update_arg.key_share:
2484 tmp = " FOR KEY SHARE"
2485 else:
2486 tmp = " FOR SHARE"
2487 elif select._for_update_arg.key_share:
2488 tmp = " FOR NO KEY UPDATE"
2489 else:
2490 tmp = " FOR UPDATE"
2492 if select._for_update_arg.of:
2494 tables = util.OrderedSet()
2495 for c in select._for_update_arg.of:
2496 tables.update(sql_util.surface_selectables_only(c))
2498 tmp += " OF " + ", ".join(
2499 self.process(table, ashint=True, use_schema=False, **kw)
2500 for table in tables
2501 )
2503 if select._for_update_arg.nowait:
2504 tmp += " NOWAIT"
2505 if select._for_update_arg.skip_locked:
2506 tmp += " SKIP LOCKED"
2508 return tmp
2510 def returning_clause(self, stmt, returning_cols):
2512 columns = [
2513 self._label_returning_column(
2514 stmt, c, fallback_label_name=c._non_anon_label
2515 )
2516 for c in expression._select_iterables(returning_cols)
2517 ]
2519 return "RETURNING " + ", ".join(columns)
2521 def visit_substring_func(self, func, **kw):
2522 s = self.process(func.clauses.clauses[0], **kw)
2523 start = self.process(func.clauses.clauses[1], **kw)
2524 if len(func.clauses.clauses) > 2:
2525 length = self.process(func.clauses.clauses[2], **kw)
2526 return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length)
2527 else:
2528 return "SUBSTRING(%s FROM %s)" % (s, start)
2530 def _on_conflict_target(self, clause, **kw):
2532 if clause.constraint_target is not None:
2533 # target may be a name of an Index, UniqueConstraint or
2534 # ExcludeConstraint. While there is a separate
2535 # "max_identifier_length" for indexes, PostgreSQL uses the same
2536 # length for all objects so we can use
2537 # truncate_and_render_constraint_name
2538 target_text = (
2539 "ON CONSTRAINT %s"
2540 % self.preparer.truncate_and_render_constraint_name(
2541 clause.constraint_target
2542 )
2543 )
2544 elif clause.inferred_target_elements is not None:
2545 target_text = "(%s)" % ", ".join(
2546 (
2547 self.preparer.quote(c)
2548 if isinstance(c, util.string_types)
2549 else self.process(c, include_table=False, use_schema=False)
2550 )
2551 for c in clause.inferred_target_elements
2552 )
2553 if clause.inferred_target_whereclause is not None:
2554 target_text += " WHERE %s" % self.process(
2555 clause.inferred_target_whereclause,
2556 include_table=False,
2557 use_schema=False,
2558 )
2559 else:
2560 target_text = ""
2562 return target_text
2564 @util.memoized_property
2565 def _is_safe_for_fast_insert_values_helper(self):
2566 # don't allow fast executemany if _post_values_clause is
2567 # present and is not an OnConflictDoNothing. what this means
2568 # concretely is that the
2569 # "fast insert executemany helper" won't be used, in other
2570 # words we won't convert "executemany()" of many parameter
2571 # sets into a single INSERT with many elements in VALUES.
2572 # We can't apply that optimization safely if for example the
2573 # statement includes a clause like "ON CONFLICT DO UPDATE"
2575 return self.insert_single_values_expr is not None and (
2576 self.statement._post_values_clause is None
2577 or isinstance(
2578 self.statement._post_values_clause, dml.OnConflictDoNothing
2579 )
2580 )
2582 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
2584 target_text = self._on_conflict_target(on_conflict, **kw)
2586 if target_text:
2587 return "ON CONFLICT %s DO NOTHING" % target_text
2588 else:
2589 return "ON CONFLICT DO NOTHING"
2591 def visit_on_conflict_do_update(self, on_conflict, **kw):
2593 clause = on_conflict
2595 target_text = self._on_conflict_target(on_conflict, **kw)
2597 action_set_ops = []
2599 set_parameters = dict(clause.update_values_to_set)
2600 # create a list of column assignment clauses as tuples
2602 insert_statement = self.stack[-1]["selectable"]
2603 cols = insert_statement.table.c
2604 for c in cols:
2605 col_key = c.key
2607 if col_key in set_parameters:
2608 value = set_parameters.pop(col_key)
2609 elif c in set_parameters:
2610 value = set_parameters.pop(c)
2611 else:
2612 continue
2614 if coercions._is_literal(value):
2615 value = elements.BindParameter(None, value, type_=c.type)
2617 else:
2618 if (
2619 isinstance(value, elements.BindParameter)
2620 and value.type._isnull
2621 ):
2622 value = value._clone()
2623 value.type = c.type
2624 value_text = self.process(value.self_group(), use_schema=False)
2626 key_text = self.preparer.quote(c.name)
2627 action_set_ops.append("%s = %s" % (key_text, value_text))
2629 # check for names that don't match columns
2630 if set_parameters:
2631 util.warn(
2632 "Additional column names not matching "
2633 "any column keys in table '%s': %s"
2634 % (
2635 self.current_executable.table.name,
2636 (", ".join("'%s'" % c for c in set_parameters)),
2637 )
2638 )
2639 for k, v in set_parameters.items():
2640 key_text = (
2641 self.preparer.quote(k)
2642 if isinstance(k, util.string_types)
2643 else self.process(k, use_schema=False)
2644 )
2645 value_text = self.process(
2646 coercions.expect(roles.ExpressionElementRole, v),
2647 use_schema=False,
2648 )
2649 action_set_ops.append("%s = %s" % (key_text, value_text))
2651 action_text = ", ".join(action_set_ops)
2652 if clause.update_whereclause is not None:
2653 action_text += " WHERE %s" % self.process(
2654 clause.update_whereclause, include_table=True, use_schema=False
2655 )
2657 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
2659 def update_from_clause(
2660 self, update_stmt, from_table, extra_froms, from_hints, **kw
2661 ):
2662 kw["asfrom"] = True
2663 return "FROM " + ", ".join(
2664 t._compiler_dispatch(self, fromhints=from_hints, **kw)
2665 for t in extra_froms
2666 )
2668 def delete_extra_from_clause(
2669 self, delete_stmt, from_table, extra_froms, from_hints, **kw
2670 ):
2671 """Render the DELETE .. USING clause specific to PostgreSQL."""
2672 kw["asfrom"] = True
2673 return "USING " + ", ".join(
2674 t._compiler_dispatch(self, fromhints=from_hints, **kw)
2675 for t in extra_froms
2676 )
2678 def fetch_clause(self, select, **kw):
2679 # pg requires parens for non literal clauses. It's also required for
2680 # bind parameters if a ::type casts is used by the driver (asyncpg),
2681 # so it's easiest to just always add it
2682 text = ""
2683 if select._offset_clause is not None:
2684 text += "\n OFFSET (%s) ROWS" % self.process(
2685 select._offset_clause, **kw
2686 )
2687 if select._fetch_clause is not None:
2688 text += "\n FETCH FIRST (%s)%s ROWS %s" % (
2689 self.process(select._fetch_clause, **kw),
2690 " PERCENT" if select._fetch_clause_options["percent"] else "",
2691 "WITH TIES"
2692 if select._fetch_clause_options["with_ties"]
2693 else "ONLY",
2694 )
2695 return text
2698class PGDDLCompiler(compiler.DDLCompiler):
2699 def get_column_specification(self, column, **kwargs):
2701 colspec = self.preparer.format_column(column)
2702 impl_type = column.type.dialect_impl(self.dialect)
2703 if isinstance(impl_type, sqltypes.TypeDecorator):
2704 impl_type = impl_type.impl
2706 has_identity = (
2707 column.identity is not None
2708 and self.dialect.supports_identity_columns
2709 )
2711 if (
2712 column.primary_key
2713 and column is column.table._autoincrement_column
2714 and (
2715 self.dialect.supports_smallserial
2716 or not isinstance(impl_type, sqltypes.SmallInteger)
2717 )
2718 and not has_identity
2719 and (
2720 column.default is None
2721 or (
2722 isinstance(column.default, schema.Sequence)
2723 and column.default.optional
2724 )
2725 )
2726 ):
2727 if isinstance(impl_type, sqltypes.BigInteger):
2728 colspec += " BIGSERIAL"
2729 elif isinstance(impl_type, sqltypes.SmallInteger):
2730 colspec += " SMALLSERIAL"
2731 else:
2732 colspec += " SERIAL"
2733 else:
2734 colspec += " " + self.dialect.type_compiler.process(
2735 column.type,
2736 type_expression=column,
2737 identifier_preparer=self.preparer,
2738 )
2739 default = self.get_column_default_string(column)
2740 if default is not None:
2741 colspec += " DEFAULT " + default
2743 if column.computed is not None:
2744 colspec += " " + self.process(column.computed)
2745 if has_identity:
2746 colspec += " " + self.process(column.identity)
2748 if not column.nullable and not has_identity:
2749 colspec += " NOT NULL"
2750 elif column.nullable and has_identity:
2751 colspec += " NULL"
2752 return colspec
2754 def _define_constraint_validity(self, constraint):
2755 not_valid = constraint.dialect_options["postgresql"]["not_valid"]
2756 return " NOT VALID" if not_valid else ""
2758 def visit_check_constraint(self, constraint):
2759 if constraint._type_bound:
2760 typ = list(constraint.columns)[0].type
2761 if (
2762 isinstance(typ, sqltypes.ARRAY)
2763 and isinstance(typ.item_type, sqltypes.Enum)
2764 and not typ.item_type.native_enum
2765 ):
2766 raise exc.CompileError(
2767 "PostgreSQL dialect cannot produce the CHECK constraint "
2768 "for ARRAY of non-native ENUM; please specify "
2769 "create_constraint=False on this Enum datatype."
2770 )
2772 text = super(PGDDLCompiler, self).visit_check_constraint(constraint)
2773 text += self._define_constraint_validity(constraint)
2774 return text
2776 def visit_foreign_key_constraint(self, constraint):
2777 text = super(PGDDLCompiler, self).visit_foreign_key_constraint(
2778 constraint
2779 )
2780 text += self._define_constraint_validity(constraint)
2781 return text
2783 def visit_drop_table_comment(self, drop):
2784 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
2785 drop.element
2786 )
2788 def visit_create_enum_type(self, create):
2789 type_ = create.element
2791 return "CREATE TYPE %s AS ENUM (%s)" % (
2792 self.preparer.format_type(type_),
2793 ", ".join(
2794 self.sql_compiler.process(sql.literal(e), literal_binds=True)
2795 for e in type_.enums
2796 ),
2797 )
2799 def visit_drop_enum_type(self, drop):
2800 type_ = drop.element
2802 return "DROP TYPE %s" % (self.preparer.format_type(type_))
2804 def visit_create_index(self, create):
2805 preparer = self.preparer
2806 index = create.element
2807 self._verify_index_table(index)
2808 text = "CREATE "
2809 if index.unique:
2810 text += "UNIQUE "
2811 text += "INDEX "
2813 if self.dialect._supports_create_index_concurrently:
2814 concurrently = index.dialect_options["postgresql"]["concurrently"]
2815 if concurrently:
2816 text += "CONCURRENTLY "
2818 if create.if_not_exists:
2819 text += "IF NOT EXISTS "
2821 text += "%s ON %s " % (
2822 self._prepared_index_name(index, include_schema=False),
2823 preparer.format_table(index.table),
2824 )
2826 using = index.dialect_options["postgresql"]["using"]
2827 if using:
2828 text += (
2829 "USING %s "
2830 % self.preparer.validate_sql_phrase(using, IDX_USING).lower()
2831 )
2833 ops = index.dialect_options["postgresql"]["ops"]
2834 text += "(%s)" % (
2835 ", ".join(
2836 [
2837 self.sql_compiler.process(
2838 expr.self_group()
2839 if not isinstance(expr, expression.ColumnClause)
2840 else expr,
2841 include_table=False,
2842 literal_binds=True,
2843 )
2844 + (
2845 (" " + ops[expr.key])
2846 if hasattr(expr, "key") and expr.key in ops
2847 else ""
2848 )
2849 for expr in index.expressions
2850 ]
2851 )
2852 )
2854 includeclause = index.dialect_options["postgresql"]["include"]
2855 if includeclause:
2856 inclusions = [
2857 index.table.c[col]
2858 if isinstance(col, util.string_types)
2859 else col
2860 for col in includeclause
2861 ]
2862 text += " INCLUDE (%s)" % ", ".join(
2863 [preparer.quote(c.name) for c in inclusions]
2864 )
2866 withclause = index.dialect_options["postgresql"]["with"]
2867 if withclause:
2868 text += " WITH (%s)" % (
2869 ", ".join(
2870 [
2871 "%s = %s" % storage_parameter
2872 for storage_parameter in withclause.items()
2873 ]
2874 )
2875 )
2877 tablespace_name = index.dialect_options["postgresql"]["tablespace"]
2878 if tablespace_name:
2879 text += " TABLESPACE %s" % preparer.quote(tablespace_name)
2881 whereclause = index.dialect_options["postgresql"]["where"]
2882 if whereclause is not None:
2883 whereclause = coercions.expect(
2884 roles.DDLExpressionRole, whereclause
2885 )
2887 where_compiled = self.sql_compiler.process(
2888 whereclause, include_table=False, literal_binds=True
2889 )
2890 text += " WHERE " + where_compiled
2892 return text
2894 def visit_drop_index(self, drop):
2895 index = drop.element
2897 text = "\nDROP INDEX "
2899 if self.dialect._supports_drop_index_concurrently:
2900 concurrently = index.dialect_options["postgresql"]["concurrently"]
2901 if concurrently:
2902 text += "CONCURRENTLY "
2904 if drop.if_exists:
2905 text += "IF EXISTS "
2907 text += self._prepared_index_name(index, include_schema=True)
2908 return text
2910 def visit_exclude_constraint(self, constraint, **kw):
2911 text = ""
2912 if constraint.name is not None:
2913 text += "CONSTRAINT %s " % self.preparer.format_constraint(
2914 constraint
2915 )
2916 elements = []
2917 for expr, name, op in constraint._render_exprs:
2918 kw["include_table"] = False
2919 exclude_element = self.sql_compiler.process(expr, **kw) + (
2920 (" " + constraint.ops[expr.key])
2921 if hasattr(expr, "key") and expr.key in constraint.ops
2922 else ""
2923 )
2925 elements.append("%s WITH %s" % (exclude_element, op))
2926 text += "EXCLUDE USING %s (%s)" % (
2927 self.preparer.validate_sql_phrase(
2928 constraint.using, IDX_USING
2929 ).lower(),
2930 ", ".join(elements),
2931 )
2932 if constraint.where is not None:
2933 text += " WHERE (%s)" % self.sql_compiler.process(
2934 constraint.where, literal_binds=True
2935 )
2936 text += self.define_constraint_deferrability(constraint)
2937 return text
2939 def post_create_table(self, table):
2940 table_opts = []
2941 pg_opts = table.dialect_options["postgresql"]
2943 inherits = pg_opts.get("inherits")
2944 if inherits is not None:
2945 if not isinstance(inherits, (list, tuple)):
2946 inherits = (inherits,)
2947 table_opts.append(
2948 "\n INHERITS ( "
2949 + ", ".join(self.preparer.quote(name) for name in inherits)
2950 + " )"
2951 )
2953 if pg_opts["partition_by"]:
2954 table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"])
2956 if pg_opts["with_oids"] is True:
2957 table_opts.append("\n WITH OIDS")
2958 elif pg_opts["with_oids"] is False:
2959 table_opts.append("\n WITHOUT OIDS")
2961 if pg_opts["on_commit"]:
2962 on_commit_options = pg_opts["on_commit"].replace("_", " ").upper()
2963 table_opts.append("\n ON COMMIT %s" % on_commit_options)
2965 if pg_opts["tablespace"]:
2966 tablespace_name = pg_opts["tablespace"]
2967 table_opts.append(
2968 "\n TABLESPACE %s" % self.preparer.quote(tablespace_name)
2969 )
2971 return "".join(table_opts)
2973 def visit_computed_column(self, generated):
2974 if generated.persisted is False:
2975 raise exc.CompileError(
2976 "PostrgreSQL computed columns do not support 'virtual' "
2977 "persistence; set the 'persisted' flag to None or True for "
2978 "PostgreSQL support."
2979 )
2981 return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process(
2982 generated.sqltext, include_table=False, literal_binds=True
2983 )
2985 def visit_create_sequence(self, create, **kw):
2986 prefix = None
2987 if create.element.data_type is not None:
2988 prefix = " AS %s" % self.type_compiler.process(
2989 create.element.data_type
2990 )
2992 return super(PGDDLCompiler, self).visit_create_sequence(
2993 create, prefix=prefix, **kw
2994 )
2997class PGTypeCompiler(compiler.GenericTypeCompiler):
2998 def visit_TSVECTOR(self, type_, **kw):
2999 return "TSVECTOR"
3001 def visit_INET(self, type_, **kw):
3002 return "INET"
3004 def visit_CIDR(self, type_, **kw):
3005 return "CIDR"
3007 def visit_MACADDR(self, type_, **kw):
3008 return "MACADDR"
3010 def visit_MONEY(self, type_, **kw):
3011 return "MONEY"
3013 def visit_OID(self, type_, **kw):
3014 return "OID"
3016 def visit_REGCLASS(self, type_, **kw):
3017 return "REGCLASS"
3019 def visit_FLOAT(self, type_, **kw):
3020 if not type_.precision:
3021 return "FLOAT"
3022 else:
3023 return "FLOAT(%(precision)s)" % {"precision": type_.precision}
3025 def visit_DOUBLE_PRECISION(self, type_, **kw):
3026 return "DOUBLE PRECISION"
3028 def visit_BIGINT(self, type_, **kw):
3029 return "BIGINT"
3031 def visit_HSTORE(self, type_, **kw):
3032 return "HSTORE"
3034 def visit_JSON(self, type_, **kw):
3035 return "JSON"
3037 def visit_JSONB(self, type_, **kw):
3038 return "JSONB"
3040 def visit_INT4RANGE(self, type_, **kw):
3041 return "INT4RANGE"
3043 def visit_INT8RANGE(self, type_, **kw):
3044 return "INT8RANGE"
3046 def visit_NUMRANGE(self, type_, **kw):
3047 return "NUMRANGE"
3049 def visit_DATERANGE(self, type_, **kw):
3050 return "DATERANGE"
3052 def visit_TSRANGE(self, type_, **kw):
3053 return "TSRANGE"
3055 def visit_TSTZRANGE(self, type_, **kw):
3056 return "TSTZRANGE"
3058 def visit_datetime(self, type_, **kw):
3059 return self.visit_TIMESTAMP(type_, **kw)
3061 def visit_enum(self, type_, **kw):
3062 if not type_.native_enum or not self.dialect.supports_native_enum:
3063 return super(PGTypeCompiler, self).visit_enum(type_, **kw)
3064 else:
3065 return self.visit_ENUM(type_, **kw)
3067 def visit_ENUM(self, type_, identifier_preparer=None, **kw):
3068 if identifier_preparer is None:
3069 identifier_preparer = self.dialect.identifier_preparer
3070 return identifier_preparer.format_type(type_)
3072 def visit_TIMESTAMP(self, type_, **kw):
3073 return "TIMESTAMP%s %s" % (
3074 "(%d)" % type_.precision
3075 if getattr(type_, "precision", None) is not None
3076 else "",
3077 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
3078 )
3080 def visit_TIME(self, type_, **kw):
3081 return "TIME%s %s" % (
3082 "(%d)" % type_.precision
3083 if getattr(type_, "precision", None) is not None
3084 else "",
3085 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
3086 )
3088 def visit_INTERVAL(self, type_, **kw):
3089 text = "INTERVAL"
3090 if type_.fields is not None:
3091 text += " " + type_.fields
3092 if type_.precision is not None:
3093 text += " (%d)" % type_.precision
3094 return text
3096 def visit_BIT(self, type_, **kw):
3097 if type_.varying:
3098 compiled = "BIT VARYING"
3099 if type_.length is not None:
3100 compiled += "(%d)" % type_.length
3101 else:
3102 compiled = "BIT(%d)" % type_.length
3103 return compiled
3105 def visit_UUID(self, type_, **kw):
3106 return "UUID"
3108 def visit_large_binary(self, type_, **kw):
3109 return self.visit_BYTEA(type_, **kw)
3111 def visit_BYTEA(self, type_, **kw):
3112 return "BYTEA"
3114 def visit_ARRAY(self, type_, **kw):
3116 inner = self.process(type_.item_type, **kw)
3117 return re.sub(
3118 r"((?: COLLATE.*)?)$",
3119 (
3120 r"%s\1"
3121 % (
3122 "[]"
3123 * (type_.dimensions if type_.dimensions is not None else 1)
3124 )
3125 ),
3126 inner,
3127 count=1,
3128 )
3131class PGIdentifierPreparer(compiler.IdentifierPreparer):
3133 reserved_words = RESERVED_WORDS
3135 def _unquote_identifier(self, value):
3136 if value[0] == self.initial_quote:
3137 value = value[1:-1].replace(
3138 self.escape_to_quote, self.escape_quote
3139 )
3140 return value
3142 def format_type(self, type_, use_schema=True):
3143 if not type_.name:
3144 raise exc.CompileError("PostgreSQL ENUM type requires a name.")
3146 name = self.quote(type_.name)
3147 effective_schema = self.schema_for_object(type_)
3149 if (
3150 not self.omit_schema
3151 and use_schema
3152 and effective_schema is not None
3153 ):
3154 name = self.quote_schema(effective_schema) + "." + name
3155 return name
3158class PGInspector(reflection.Inspector):
3159 def get_table_oid(self, table_name, schema=None):
3160 """Return the OID for the given table name."""
3162 with self._operation_context() as conn:
3163 return self.dialect.get_table_oid(
3164 conn, table_name, schema, info_cache=self.info_cache
3165 )
3167 def get_enums(self, schema=None):
3168 """Return a list of ENUM objects.
3170 Each member is a dictionary containing these fields:
3172 * name - name of the enum
3173 * schema - the schema name for the enum.
3174 * visible - boolean, whether or not this enum is visible
3175 in the default search path.
3176 * labels - a list of string labels that apply to the enum.
3178 :param schema: schema name. If None, the default schema
3179 (typically 'public') is used. May also be set to '*' to
3180 indicate load enums for all schemas.
3182 .. versionadded:: 1.0.0
3184 """
3185 schema = schema or self.default_schema_name
3186 with self._operation_context() as conn:
3187 return self.dialect._load_enums(conn, schema)
3189 def get_foreign_table_names(self, schema=None):
3190 """Return a list of FOREIGN TABLE names.
3192 Behavior is similar to that of
3193 :meth:`_reflection.Inspector.get_table_names`,
3194 except that the list is limited to those tables that report a
3195 ``relkind`` value of ``f``.
3197 .. versionadded:: 1.0.0
3199 """
3200 schema = schema or self.default_schema_name
3201 with self._operation_context() as conn:
3202 return self.dialect._get_foreign_table_names(conn, schema)
3204 def get_view_names(self, schema=None, include=("plain", "materialized")):
3205 """Return all view names in `schema`.
3207 :param schema: Optional, retrieve names from a non-default schema.
3208 For special quoting, use :class:`.quoted_name`.
3210 :param include: specify which types of views to return. Passed
3211 as a string value (for a single type) or a tuple (for any number
3212 of types). Defaults to ``('plain', 'materialized')``.
3214 .. versionadded:: 1.1
3216 """
3218 with self._operation_context() as conn:
3219 return self.dialect.get_view_names(
3220 conn, schema, info_cache=self.info_cache, include=include
3221 )
3224class CreateEnumType(schema._CreateDropBase):
3225 __visit_name__ = "create_enum_type"
3228class DropEnumType(schema._CreateDropBase):
3229 __visit_name__ = "drop_enum_type"
3232class PGExecutionContext(default.DefaultExecutionContext):
3233 def fire_sequence(self, seq, type_):
3234 return self._execute_scalar(
3235 (
3236 "select nextval('%s')"
3237 % self.identifier_preparer.format_sequence(seq)
3238 ),
3239 type_,
3240 )
3242 def get_insert_default(self, column):
3243 if column.primary_key and column is column.table._autoincrement_column:
3244 if column.server_default and column.server_default.has_argument:
3246 # pre-execute passive defaults on primary key columns
3247 return self._execute_scalar(
3248 "select %s" % column.server_default.arg, column.type
3249 )
3251 elif column.default is None or (
3252 column.default.is_sequence and column.default.optional
3253 ):
3254 # execute the sequence associated with a SERIAL primary
3255 # key column. for non-primary-key SERIAL, the ID just
3256 # generates server side.
3258 try:
3259 seq_name = column._postgresql_seq_name
3260 except AttributeError:
3261 tab = column.table.name
3262 col = column.name
3263 tab = tab[0 : 29 + max(0, (29 - len(col)))]
3264 col = col[0 : 29 + max(0, (29 - len(tab)))]
3265 name = "%s_%s_seq" % (tab, col)
3266 column._postgresql_seq_name = seq_name = name
3268 if column.table is not None:
3269 effective_schema = self.connection.schema_for_object(
3270 column.table
3271 )
3272 else:
3273 effective_schema = None
3275 if effective_schema is not None:
3276 exc = 'select nextval(\'"%s"."%s"\')' % (
3277 effective_schema,
3278 seq_name,
3279 )
3280 else:
3281 exc = "select nextval('\"%s\"')" % (seq_name,)
3283 return self._execute_scalar(exc, column.type)
3285 return super(PGExecutionContext, self).get_insert_default(column)
3287 def should_autocommit_text(self, statement):
3288 return AUTOCOMMIT_REGEXP.match(statement)
3291class PGReadOnlyConnectionCharacteristic(
3292 characteristics.ConnectionCharacteristic
3293):
3294 transactional = True
3296 def reset_characteristic(self, dialect, dbapi_conn):
3297 dialect.set_readonly(dbapi_conn, False)
3299 def set_characteristic(self, dialect, dbapi_conn, value):
3300 dialect.set_readonly(dbapi_conn, value)
3302 def get_characteristic(self, dialect, dbapi_conn):
3303 return dialect.get_readonly(dbapi_conn)
3306class PGDeferrableConnectionCharacteristic(
3307 characteristics.ConnectionCharacteristic
3308):
3309 transactional = True
3311 def reset_characteristic(self, dialect, dbapi_conn):
3312 dialect.set_deferrable(dbapi_conn, False)
3314 def set_characteristic(self, dialect, dbapi_conn, value):
3315 dialect.set_deferrable(dbapi_conn, value)
3317 def get_characteristic(self, dialect, dbapi_conn):
3318 return dialect.get_deferrable(dbapi_conn)
3321class PGDialect(default.DefaultDialect):
3322 name = "postgresql"
3323 supports_statement_cache = True
3324 supports_alter = True
3325 max_identifier_length = 63
3326 supports_sane_rowcount = True
3328 supports_native_enum = True
3329 supports_native_boolean = True
3330 supports_smallserial = True
3332 supports_sequences = True
3333 sequences_optional = True
3334 preexecute_autoincrement_sequences = True
3335 postfetch_lastrowid = False
3337 supports_comments = True
3338 supports_default_values = True
3340 supports_default_metavalue = True
3342 supports_empty_insert = False
3343 supports_multivalues_insert = True
3344 supports_identity_columns = True
3346 default_paramstyle = "pyformat"
3347 ischema_names = ischema_names
3348 colspecs = colspecs
3350 statement_compiler = PGCompiler
3351 ddl_compiler = PGDDLCompiler
3352 type_compiler = PGTypeCompiler
3353 preparer = PGIdentifierPreparer
3354 execution_ctx_cls = PGExecutionContext
3355 inspector = PGInspector
3356 isolation_level = None
3358 implicit_returning = True
3359 full_returning = True
3361 connection_characteristics = (
3362 default.DefaultDialect.connection_characteristics
3363 )
3364 connection_characteristics = connection_characteristics.union(
3365 {
3366 "postgresql_readonly": PGReadOnlyConnectionCharacteristic(),
3367 "postgresql_deferrable": PGDeferrableConnectionCharacteristic(),
3368 }
3369 )
3371 construct_arguments = [
3372 (
3373 schema.Index,
3374 {
3375 "using": False,
3376 "include": None,
3377 "where": None,
3378 "ops": {},
3379 "concurrently": False,
3380 "with": {},
3381 "tablespace": None,
3382 },
3383 ),
3384 (
3385 schema.Table,
3386 {
3387 "ignore_search_path": False,
3388 "tablespace": None,
3389 "partition_by": None,
3390 "with_oids": None,
3391 "on_commit": None,
3392 "inherits": None,
3393 },
3394 ),
3395 (
3396 schema.CheckConstraint,
3397 {
3398 "not_valid": False,
3399 },
3400 ),
3401 (
3402 schema.ForeignKeyConstraint,
3403 {
3404 "not_valid": False,
3405 },
3406 ),
3407 ]
3409 reflection_options = ("postgresql_ignore_search_path",)
3411 _backslash_escapes = True
3412 _supports_create_index_concurrently = True
3413 _supports_drop_index_concurrently = True
3415 def __init__(
3416 self,
3417 isolation_level=None,
3418 json_serializer=None,
3419 json_deserializer=None,
3420 **kwargs
3421 ):
3422 default.DefaultDialect.__init__(self, **kwargs)
3424 # the isolation_level parameter to the PGDialect itself is legacy.
3425 # still works however the execution_options method is the one that
3426 # is documented.
3427 self.isolation_level = isolation_level
3428 self._json_deserializer = json_deserializer
3429 self._json_serializer = json_serializer
3431 def initialize(self, connection):
3432 super(PGDialect, self).initialize(connection)
3434 if self.server_version_info <= (8, 2):
3435 self.full_returning = self.implicit_returning = False
3437 self.supports_native_enum = self.server_version_info >= (8, 3)
3438 if not self.supports_native_enum:
3439 self.colspecs = self.colspecs.copy()
3440 # pop base Enum type
3441 self.colspecs.pop(sqltypes.Enum, None)
3442 # psycopg2, others may have placed ENUM here as well
3443 self.colspecs.pop(ENUM, None)
3445 # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689
3446 self.supports_smallserial = self.server_version_info >= (9, 2)
3448 if self.server_version_info < (8, 2):
3449 self._backslash_escapes = False
3450 else:
3451 # ensure this query is not emitted on server version < 8.2
3452 # as it will fail
3453 std_string = connection.exec_driver_sql(
3454 "show standard_conforming_strings"
3455 ).scalar()
3456 self._backslash_escapes = std_string == "off"
3458 self._supports_create_index_concurrently = (
3459 self.server_version_info >= (8, 2)
3460 )
3461 self._supports_drop_index_concurrently = self.server_version_info >= (
3462 9,
3463 2,
3464 )
3465 self.supports_identity_columns = self.server_version_info >= (10,)
3467 def on_connect(self):
3468 if self.isolation_level is not None:
3470 def connect(conn):
3471 self.set_isolation_level(conn, self.isolation_level)
3473 return connect
3474 else:
3475 return None
3477 _isolation_lookup = set(
3478 [
3479 "SERIALIZABLE",
3480 "READ UNCOMMITTED",
3481 "READ COMMITTED",
3482 "REPEATABLE READ",
3483 ]
3484 )
3486 def set_isolation_level(self, connection, level):
3487 level = level.replace("_", " ")
3488 if level not in self._isolation_lookup:
3489 raise exc.ArgumentError(
3490 "Invalid value '%s' for isolation_level. "
3491 "Valid isolation levels for %s are %s"
3492 % (level, self.name, ", ".join(self._isolation_lookup))
3493 )
3494 cursor = connection.cursor()
3495 cursor.execute(
3496 "SET SESSION CHARACTERISTICS AS TRANSACTION "
3497 "ISOLATION LEVEL %s" % level
3498 )
3499 cursor.execute("COMMIT")
3500 cursor.close()
3502 def get_isolation_level(self, connection):
3503 cursor = connection.cursor()
3504 cursor.execute("show transaction isolation level")
3505 val = cursor.fetchone()[0]
3506 cursor.close()
3507 return val.upper()
3509 def set_readonly(self, connection, value):
3510 raise NotImplementedError()
3512 def get_readonly(self, connection):
3513 raise NotImplementedError()
3515 def set_deferrable(self, connection, value):
3516 raise NotImplementedError()
3518 def get_deferrable(self, connection):
3519 raise NotImplementedError()
3521 def do_begin_twophase(self, connection, xid):
3522 self.do_begin(connection.connection)
3524 def do_prepare_twophase(self, connection, xid):
3525 connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid)
3527 def do_rollback_twophase(
3528 self, connection, xid, is_prepared=True, recover=False
3529 ):
3530 if is_prepared:
3531 if recover:
3532 # FIXME: ugly hack to get out of transaction
3533 # context when committing recoverable transactions
3534 # Must find out a way how to make the dbapi not
3535 # open a transaction.
3536 connection.exec_driver_sql("ROLLBACK")
3537 connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
3538 connection.exec_driver_sql("BEGIN")
3539 self.do_rollback(connection.connection)
3540 else:
3541 self.do_rollback(connection.connection)
3543 def do_commit_twophase(
3544 self, connection, xid, is_prepared=True, recover=False
3545 ):
3546 if is_prepared:
3547 if recover:
3548 connection.exec_driver_sql("ROLLBACK")
3549 connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid)
3550 connection.exec_driver_sql("BEGIN")
3551 self.do_rollback(connection.connection)
3552 else:
3553 self.do_commit(connection.connection)
3555 def do_recover_twophase(self, connection):
3556 resultset = connection.execute(
3557 sql.text("SELECT gid FROM pg_prepared_xacts")
3558 )
3559 return [row[0] for row in resultset]
3561 def _get_default_schema_name(self, connection):
3562 return connection.exec_driver_sql("select current_schema()").scalar()
3564 def has_schema(self, connection, schema):
3565 query = (
3566 "select nspname from pg_namespace " "where lower(nspname)=:schema"
3567 )
3568 cursor = connection.execute(
3569 sql.text(query).bindparams(
3570 sql.bindparam(
3571 "schema",
3572 util.text_type(schema.lower()),
3573 type_=sqltypes.Unicode,
3574 )
3575 )
3576 )
3578 return bool(cursor.first())
3580 def has_table(self, connection, table_name, schema=None):
3581 self._ensure_has_table_connection(connection)
3582 # seems like case gets folded in pg_class...
3583 if schema is None:
3584 cursor = connection.execute(
3585 sql.text(
3586 "select relname from pg_class c join pg_namespace n on "
3587 "n.oid=c.relnamespace where "
3588 "pg_catalog.pg_table_is_visible(c.oid) "
3589 "and relname=:name"
3590 ).bindparams(
3591 sql.bindparam(
3592 "name",
3593 util.text_type(table_name),
3594 type_=sqltypes.Unicode,
3595 )
3596 )
3597 )
3598 else:
3599 cursor = connection.execute(
3600 sql.text(
3601 "select relname from pg_class c join pg_namespace n on "
3602 "n.oid=c.relnamespace where n.nspname=:schema and "
3603 "relname=:name"
3604 ).bindparams(
3605 sql.bindparam(
3606 "name",
3607 util.text_type(table_name),
3608 type_=sqltypes.Unicode,
3609 ),
3610 sql.bindparam(
3611 "schema",
3612 util.text_type(schema),
3613 type_=sqltypes.Unicode,
3614 ),
3615 )
3616 )
3617 return bool(cursor.first())
3619 def has_sequence(self, connection, sequence_name, schema=None):
3620 if schema is None:
3621 schema = self.default_schema_name
3622 cursor = connection.execute(
3623 sql.text(
3624 "SELECT relname FROM pg_class c join pg_namespace n on "
3625 "n.oid=c.relnamespace where relkind='S' and "
3626 "n.nspname=:schema and relname=:name"
3627 ).bindparams(
3628 sql.bindparam(
3629 "name",
3630 util.text_type(sequence_name),
3631 type_=sqltypes.Unicode,
3632 ),
3633 sql.bindparam(
3634 "schema",
3635 util.text_type(schema),
3636 type_=sqltypes.Unicode,
3637 ),
3638 )
3639 )
3641 return bool(cursor.first())
3643 def has_type(self, connection, type_name, schema=None):
3644 if schema is not None:
3645 query = """
3646 SELECT EXISTS (
3647 SELECT * FROM pg_catalog.pg_type t, pg_catalog.pg_namespace n
3648 WHERE t.typnamespace = n.oid
3649 AND t.typname = :typname
3650 AND n.nspname = :nspname
3651 )
3652 """
3653 query = sql.text(query)
3654 else:
3655 query = """
3656 SELECT EXISTS (
3657 SELECT * FROM pg_catalog.pg_type t
3658 WHERE t.typname = :typname
3659 AND pg_type_is_visible(t.oid)
3660 )
3661 """
3662 query = sql.text(query)
3663 query = query.bindparams(
3664 sql.bindparam(
3665 "typname", util.text_type(type_name), type_=sqltypes.Unicode
3666 )
3667 )
3668 if schema is not None:
3669 query = query.bindparams(
3670 sql.bindparam(
3671 "nspname", util.text_type(schema), type_=sqltypes.Unicode
3672 )
3673 )
3674 cursor = connection.execute(query)
3675 return bool(cursor.scalar())
3677 def _get_server_version_info(self, connection):
3678 v = connection.exec_driver_sql("select pg_catalog.version()").scalar()
3679 m = re.match(
3680 r".*(?:PostgreSQL|EnterpriseDB) "
3681 r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?",
3682 v,
3683 )
3684 if not m:
3685 raise AssertionError(
3686 "Could not determine version from string '%s'" % v
3687 )
3688 return tuple([int(x) for x in m.group(1, 2, 3) if x is not None])
3690 @reflection.cache
3691 def get_table_oid(self, connection, table_name, schema=None, **kw):
3692 """Fetch the oid for schema.table_name.
3694 Several reflection methods require the table oid. The idea for using
3695 this method is that it can be fetched one time and cached for
3696 subsequent calls.
3698 """
3699 table_oid = None
3700 if schema is not None:
3701 schema_where_clause = "n.nspname = :schema"
3702 else:
3703 schema_where_clause = "pg_catalog.pg_table_is_visible(c.oid)"
3704 query = (
3705 """
3706 SELECT c.oid
3707 FROM pg_catalog.pg_class c
3708 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
3709 WHERE (%s)
3710 AND c.relname = :table_name AND c.relkind in
3711 ('r', 'v', 'm', 'f', 'p')
3712 """
3713 % schema_where_clause
3714 )
3715 # Since we're binding to unicode, table_name and schema_name must be
3716 # unicode.
3717 table_name = util.text_type(table_name)
3718 if schema is not None:
3719 schema = util.text_type(schema)
3720 s = sql.text(query).bindparams(table_name=sqltypes.Unicode)
3721 s = s.columns(oid=sqltypes.Integer)
3722 if schema:
3723 s = s.bindparams(sql.bindparam("schema", type_=sqltypes.Unicode))
3724 c = connection.execute(s, dict(table_name=table_name, schema=schema))
3725 table_oid = c.scalar()
3726 if table_oid is None:
3727 raise exc.NoSuchTableError(table_name)
3728 return table_oid
3730 @reflection.cache
3731 def get_schema_names(self, connection, **kw):
3732 result = connection.execute(
3733 sql.text(
3734 "SELECT nspname FROM pg_namespace "
3735 "WHERE nspname NOT LIKE 'pg_%' "
3736 "ORDER BY nspname"
3737 ).columns(nspname=sqltypes.Unicode)
3738 )
3739 return [name for name, in result]
3741 @reflection.cache
3742 def get_table_names(self, connection, schema=None, **kw):
3743 result = connection.execute(
3744 sql.text(
3745 "SELECT c.relname FROM pg_class c "
3746 "JOIN pg_namespace n ON n.oid = c.relnamespace "
3747 "WHERE n.nspname = :schema AND c.relkind in ('r', 'p')"
3748 ).columns(relname=sqltypes.Unicode),
3749 dict(
3750 schema=schema
3751 if schema is not None
3752 else self.default_schema_name
3753 ),
3754 )
3755 return [name for name, in result]
3757 @reflection.cache
3758 def _get_foreign_table_names(self, connection, schema=None, **kw):
3759 result = connection.execute(
3760 sql.text(
3761 "SELECT c.relname FROM pg_class c "
3762 "JOIN pg_namespace n ON n.oid = c.relnamespace "
3763 "WHERE n.nspname = :schema AND c.relkind = 'f'"
3764 ).columns(relname=sqltypes.Unicode),
3765 dict(
3766 schema=schema
3767 if schema is not None
3768 else self.default_schema_name
3769 ),
3770 )
3771 return [name for name, in result]
3773 @reflection.cache
3774 def get_view_names(
3775 self, connection, schema=None, include=("plain", "materialized"), **kw
3776 ):
3778 include_kind = {"plain": "v", "materialized": "m"}
3779 try:
3780 kinds = [include_kind[i] for i in util.to_list(include)]
3781 except KeyError:
3782 raise ValueError(
3783 "include %r unknown, needs to be a sequence containing "
3784 "one or both of 'plain' and 'materialized'" % (include,)
3785 )
3786 if not kinds:
3787 raise ValueError(
3788 "empty include, needs to be a sequence containing "
3789 "one or both of 'plain' and 'materialized'"
3790 )
3792 result = connection.execute(
3793 sql.text(
3794 "SELECT c.relname FROM pg_class c "
3795 "JOIN pg_namespace n ON n.oid = c.relnamespace "
3796 "WHERE n.nspname = :schema AND c.relkind IN (%s)"
3797 % (", ".join("'%s'" % elem for elem in kinds))
3798 ).columns(relname=sqltypes.Unicode),
3799 dict(
3800 schema=schema
3801 if schema is not None
3802 else self.default_schema_name
3803 ),
3804 )
3805 return [name for name, in result]
3807 @reflection.cache
3808 def get_sequence_names(self, connection, schema=None, **kw):
3809 if not schema:
3810 schema = self.default_schema_name
3811 cursor = connection.execute(
3812 sql.text(
3813 "SELECT relname FROM pg_class c join pg_namespace n on "
3814 "n.oid=c.relnamespace where relkind='S' and "
3815 "n.nspname=:schema"
3816 ).bindparams(
3817 sql.bindparam(
3818 "schema",
3819 util.text_type(schema),
3820 type_=sqltypes.Unicode,
3821 ),
3822 )
3823 )
3824 return [row[0] for row in cursor]
3826 @reflection.cache
3827 def get_view_definition(self, connection, view_name, schema=None, **kw):
3828 view_def = connection.scalar(
3829 sql.text(
3830 "SELECT pg_get_viewdef(c.oid) view_def FROM pg_class c "
3831 "JOIN pg_namespace n ON n.oid = c.relnamespace "
3832 "WHERE n.nspname = :schema AND c.relname = :view_name "
3833 "AND c.relkind IN ('v', 'm')"
3834 ).columns(view_def=sqltypes.Unicode),
3835 dict(
3836 schema=schema
3837 if schema is not None
3838 else self.default_schema_name,
3839 view_name=view_name,
3840 ),
3841 )
3842 return view_def
3844 @reflection.cache
3845 def get_columns(self, connection, table_name, schema=None, **kw):
3847 table_oid = self.get_table_oid(
3848 connection, table_name, schema, info_cache=kw.get("info_cache")
3849 )
3851 generated = (
3852 "a.attgenerated as generated"
3853 if self.server_version_info >= (12,)
3854 else "NULL as generated"
3855 )
3856 if self.server_version_info >= (10,):
3857 # a.attidentity != '' is required or it will reflect also
3858 # serial columns as identity.
3859 identity = """\
3860 (SELECT json_build_object(
3861 'always', a.attidentity = 'a',
3862 'start', s.seqstart,
3863 'increment', s.seqincrement,
3864 'minvalue', s.seqmin,
3865 'maxvalue', s.seqmax,
3866 'cache', s.seqcache,
3867 'cycle', s.seqcycle)
3868 FROM pg_catalog.pg_sequence s
3869 JOIN pg_catalog.pg_class c on s.seqrelid = c."oid"
3870 WHERE c.relkind = 'S'
3871 AND a.attidentity != ''
3872 AND s.seqrelid = pg_catalog.pg_get_serial_sequence(
3873 a.attrelid::regclass::text, a.attname
3874 )::regclass::oid
3875 ) as identity_options\
3876 """
3877 else:
3878 identity = "NULL as identity_options"
3880 SQL_COLS = """
3881 SELECT a.attname,
3882 pg_catalog.format_type(a.atttypid, a.atttypmod),
3883 (
3884 SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
3885 FROM pg_catalog.pg_attrdef d
3886 WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum
3887 AND a.atthasdef
3888 ) AS DEFAULT,
3889 a.attnotnull,
3890 a.attrelid as table_oid,
3891 pgd.description as comment,
3892 %s,
3893 %s
3894 FROM pg_catalog.pg_attribute a
3895 LEFT JOIN pg_catalog.pg_description pgd ON (
3896 pgd.objoid = a.attrelid AND pgd.objsubid = a.attnum)
3897 WHERE a.attrelid = :table_oid
3898 AND a.attnum > 0 AND NOT a.attisdropped
3899 ORDER BY a.attnum
3900 """ % (
3901 generated,
3902 identity,
3903 )
3904 s = (
3905 sql.text(SQL_COLS)
3906 .bindparams(sql.bindparam("table_oid", type_=sqltypes.Integer))
3907 .columns(attname=sqltypes.Unicode, default=sqltypes.Unicode)
3908 )
3909 c = connection.execute(s, dict(table_oid=table_oid))
3910 rows = c.fetchall()
3912 # dictionary with (name, ) if default search path or (schema, name)
3913 # as keys
3914 domains = self._load_domains(connection)
3916 # dictionary with (name, ) if default search path or (schema, name)
3917 # as keys
3918 enums = dict(
3919 ((rec["name"],), rec)
3920 if rec["visible"]
3921 else ((rec["schema"], rec["name"]), rec)
3922 for rec in self._load_enums(connection, schema="*")
3923 )
3925 # format columns
3926 columns = []
3928 for (
3929 name,
3930 format_type,
3931 default_,
3932 notnull,
3933 table_oid,
3934 comment,
3935 generated,
3936 identity,
3937 ) in rows:
3938 column_info = self._get_column_info(
3939 name,
3940 format_type,
3941 default_,
3942 notnull,
3943 domains,
3944 enums,
3945 schema,
3946 comment,
3947 generated,
3948 identity,
3949 )
3950 columns.append(column_info)
3951 return columns
3953 def _get_column_info(
3954 self,
3955 name,
3956 format_type,
3957 default,
3958 notnull,
3959 domains,
3960 enums,
3961 schema,
3962 comment,
3963 generated,
3964 identity,
3965 ):
3966 def _handle_array_type(attype):
3967 return (
3968 # strip '[]' from integer[], etc.
3969 re.sub(r"\[\]$", "", attype),
3970 attype.endswith("[]"),
3971 )
3973 if format_type is None:
3974 no_format_type = True
3975 attype = format_type = "no format_type()"
3976 is_array = False
3977 else:
3978 no_format_type = False
3980 # strip (*) from character varying(5), timestamp(5)
3981 # with time zone, geometry(POLYGON), etc.
3982 attype = re.sub(r"\(.*\)", "", format_type)
3984 # strip '[]' from integer[], etc. and check if an array
3985 attype, is_array = _handle_array_type(attype)
3987 # strip quotes from case sensitive enum or domain names
3988 enum_or_domain_key = tuple(util.quoted_token_parser(attype))
3990 nullable = not notnull
3992 charlen = re.search(r"\(([\d,]+)\)", format_type)
3993 if charlen:
3994 charlen = charlen.group(1)
3995 args = re.search(r"\((.*)\)", format_type)
3996 if args and args.group(1):
3997 args = tuple(re.split(r"\s*,\s*", args.group(1)))
3998 else:
3999 args = ()
4000 kwargs = {}
4002 if attype == "numeric":
4003 if charlen:
4004 prec, scale = charlen.split(",")
4005 args = (int(prec), int(scale))
4006 else:
4007 args = ()
4008 elif attype == "double precision":
4009 args = (53,)
4010 elif attype == "integer":
4011 args = ()
4012 elif attype in ("timestamp with time zone", "time with time zone"):
4013 kwargs["timezone"] = True
4014 if charlen:
4015 kwargs["precision"] = int(charlen)
4016 args = ()
4017 elif attype in (
4018 "timestamp without time zone",
4019 "time without time zone",
4020 "time",
4021 ):
4022 kwargs["timezone"] = False
4023 if charlen:
4024 kwargs["precision"] = int(charlen)
4025 args = ()
4026 elif attype == "bit varying":
4027 kwargs["varying"] = True
4028 if charlen:
4029 args = (int(charlen),)
4030 else:
4031 args = ()
4032 elif attype.startswith("interval"):
4033 field_match = re.match(r"interval (.+)", attype, re.I)
4034 if charlen:
4035 kwargs["precision"] = int(charlen)
4036 if field_match:
4037 kwargs["fields"] = field_match.group(1)
4038 attype = "interval"
4039 args = ()
4040 elif charlen:
4041 args = (int(charlen),)
4043 while True:
4044 # looping here to suit nested domains
4045 if attype in self.ischema_names:
4046 coltype = self.ischema_names[attype]
4047 break
4048 elif enum_or_domain_key in enums:
4049 enum = enums[enum_or_domain_key]
4050 coltype = ENUM
4051 kwargs["name"] = enum["name"]
4052 if not enum["visible"]:
4053 kwargs["schema"] = enum["schema"]
4054 args = tuple(enum["labels"])
4055 break
4056 elif enum_or_domain_key in domains:
4057 domain = domains[enum_or_domain_key]
4058 attype = domain["attype"]
4059 attype, is_array = _handle_array_type(attype)
4060 # strip quotes from case sensitive enum or domain names
4061 enum_or_domain_key = tuple(util.quoted_token_parser(attype))
4062 # A table can't override a not null on the domain,
4063 # but can override nullable
4064 nullable = nullable and domain["nullable"]
4065 if domain["default"] and not default:
4066 # It can, however, override the default
4067 # value, but can't set it to null.
4068 default = domain["default"]
4069 continue
4070 else:
4071 coltype = None
4072 break
4074 if coltype:
4075 coltype = coltype(*args, **kwargs)
4076 if is_array:
4077 coltype = self.ischema_names["_array"](coltype)
4078 elif no_format_type:
4079 util.warn(
4080 "PostgreSQL format_type() returned NULL for column '%s'"
4081 % (name,)
4082 )
4083 coltype = sqltypes.NULLTYPE
4084 else:
4085 util.warn(
4086 "Did not recognize type '%s' of column '%s'" % (attype, name)
4087 )
4088 coltype = sqltypes.NULLTYPE
4090 # If a zero byte or blank string depending on driver (is also absent
4091 # for older PG versions), then not a generated column. Otherwise, s =
4092 # stored. (Other values might be added in the future.)
4093 if generated not in (None, "", b"\x00"):
4094 computed = dict(
4095 sqltext=default, persisted=generated in ("s", b"s")
4096 )
4097 default = None
4098 else:
4099 computed = None
4101 # adjust the default value
4102 autoincrement = False
4103 if default is not None:
4104 match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
4105 if match is not None:
4106 if issubclass(coltype._type_affinity, sqltypes.Integer):
4107 autoincrement = True
4108 # the default is related to a Sequence
4109 sch = schema
4110 if "." not in match.group(2) and sch is not None:
4111 # unconditionally quote the schema name. this could
4112 # later be enhanced to obey quoting rules /
4113 # "quote schema"
4114 default = (
4115 match.group(1)
4116 + ('"%s"' % sch)
4117 + "."
4118 + match.group(2)
4119 + match.group(3)
4120 )
4122 column_info = dict(
4123 name=name,
4124 type=coltype,
4125 nullable=nullable,
4126 default=default,
4127 autoincrement=autoincrement or identity is not None,
4128 comment=comment,
4129 )
4130 if computed is not None:
4131 column_info["computed"] = computed
4132 if identity is not None:
4133 column_info["identity"] = identity
4134 return column_info
4136 @reflection.cache
4137 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
4138 table_oid = self.get_table_oid(
4139 connection, table_name, schema, info_cache=kw.get("info_cache")
4140 )
4142 if self.server_version_info < (8, 4):
4143 PK_SQL = """
4144 SELECT a.attname
4145 FROM
4146 pg_class t
4147 join pg_index ix on t.oid = ix.indrelid
4148 join pg_attribute a
4149 on t.oid=a.attrelid AND %s
4150 WHERE
4151 t.oid = :table_oid and ix.indisprimary = 't'
4152 ORDER BY a.attnum
4153 """ % self._pg_index_any(
4154 "a.attnum", "ix.indkey"
4155 )
4157 else:
4158 # unnest() and generate_subscripts() both introduced in
4159 # version 8.4
4160 PK_SQL = """
4161 SELECT a.attname
4162 FROM pg_attribute a JOIN (
4163 SELECT unnest(ix.indkey) attnum,
4164 generate_subscripts(ix.indkey, 1) ord
4165 FROM pg_index ix
4166 WHERE ix.indrelid = :table_oid AND ix.indisprimary
4167 ) k ON a.attnum=k.attnum
4168 WHERE a.attrelid = :table_oid
4169 ORDER BY k.ord
4170 """
4171 t = sql.text(PK_SQL).columns(attname=sqltypes.Unicode)
4172 c = connection.execute(t, dict(table_oid=table_oid))
4173 cols = [r[0] for r in c.fetchall()]
4175 PK_CONS_SQL = """
4176 SELECT conname
4177 FROM pg_catalog.pg_constraint r
4178 WHERE r.conrelid = :table_oid AND r.contype = 'p'
4179 ORDER BY 1
4180 """
4181 t = sql.text(PK_CONS_SQL).columns(conname=sqltypes.Unicode)
4182 c = connection.execute(t, dict(table_oid=table_oid))
4183 name = c.scalar()
4185 return {"constrained_columns": cols, "name": name}
4187 @reflection.cache
4188 def get_foreign_keys(
4189 self,
4190 connection,
4191 table_name,
4192 schema=None,
4193 postgresql_ignore_search_path=False,
4194 **kw
4195 ):
4196 preparer = self.identifier_preparer
4197 table_oid = self.get_table_oid(
4198 connection, table_name, schema, info_cache=kw.get("info_cache")
4199 )
4201 FK_SQL = """
4202 SELECT r.conname,
4203 pg_catalog.pg_get_constraintdef(r.oid, true) as condef,
4204 n.nspname as conschema
4205 FROM pg_catalog.pg_constraint r,
4206 pg_namespace n,
4207 pg_class c
4209 WHERE r.conrelid = :table AND
4210 r.contype = 'f' AND
4211 c.oid = confrelid AND
4212 n.oid = c.relnamespace
4213 ORDER BY 1
4214 """
4215 # https://www.postgresql.org/docs/9.0/static/sql-createtable.html
4216 FK_REGEX = re.compile(
4217 r"FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)"
4218 r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?"
4219 r"[\s]?(ON UPDATE "
4220 r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
4221 r"[\s]?(ON DELETE "
4222 r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
4223 r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?"
4224 r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?"
4225 )
4227 t = sql.text(FK_SQL).columns(
4228 conname=sqltypes.Unicode, condef=sqltypes.Unicode
4229 )
4230 c = connection.execute(t, dict(table=table_oid))
4231 fkeys = []
4232 for conname, condef, conschema in c.fetchall():
4233 m = re.search(FK_REGEX, condef).groups()
4235 (
4236 constrained_columns,
4237 referred_schema,
4238 referred_table,
4239 referred_columns,
4240 _,
4241 match,
4242 _,
4243 onupdate,
4244 _,
4245 ondelete,
4246 deferrable,
4247 _,
4248 initially,
4249 ) = m
4251 if deferrable is not None:
4252 deferrable = True if deferrable == "DEFERRABLE" else False
4253 constrained_columns = [
4254 preparer._unquote_identifier(x)
4255 for x in re.split(r"\s*,\s*", constrained_columns)
4256 ]
4258 if postgresql_ignore_search_path:
4259 # when ignoring search path, we use the actual schema
4260 # provided it isn't the "default" schema
4261 if conschema != self.default_schema_name:
4262 referred_schema = conschema
4263 else:
4264 referred_schema = schema
4265 elif referred_schema:
4266 # referred_schema is the schema that we regexp'ed from
4267 # pg_get_constraintdef(). If the schema is in the search
4268 # path, pg_get_constraintdef() will give us None.
4269 referred_schema = preparer._unquote_identifier(referred_schema)
4270 elif schema is not None and schema == conschema:
4271 # If the actual schema matches the schema of the table
4272 # we're reflecting, then we will use that.
4273 referred_schema = schema
4275 referred_table = preparer._unquote_identifier(referred_table)
4276 referred_columns = [
4277 preparer._unquote_identifier(x)
4278 for x in re.split(r"\s*,\s", referred_columns)
4279 ]
4280 options = {
4281 k: v
4282 for k, v in [
4283 ("onupdate", onupdate),
4284 ("ondelete", ondelete),
4285 ("initially", initially),
4286 ("deferrable", deferrable),
4287 ("match", match),
4288 ]
4289 if v is not None and v != "NO ACTION"
4290 }
4291 fkey_d = {
4292 "name": conname,
4293 "constrained_columns": constrained_columns,
4294 "referred_schema": referred_schema,
4295 "referred_table": referred_table,
4296 "referred_columns": referred_columns,
4297 "options": options,
4298 }
4299 fkeys.append(fkey_d)
4300 return fkeys
4302 def _pg_index_any(self, col, compare_to):
4303 if self.server_version_info < (8, 1):
4304 # https://www.postgresql.org/message-id/10279.1124395722@sss.pgh.pa.us
4305 # "In CVS tip you could replace this with "attnum = ANY (indkey)".
4306 # Unfortunately, most array support doesn't work on int2vector in
4307 # pre-8.1 releases, so I think you're kinda stuck with the above
4308 # for now.
4309 # regards, tom lane"
4310 return "(%s)" % " OR ".join(
4311 "%s[%d] = %s" % (compare_to, ind, col) for ind in range(0, 10)
4312 )
4313 else:
4314 return "%s = ANY(%s)" % (col, compare_to)
4316 @reflection.cache
4317 def get_indexes(self, connection, table_name, schema, **kw):
4318 table_oid = self.get_table_oid(
4319 connection, table_name, schema, info_cache=kw.get("info_cache")
4320 )
4322 # cast indkey as varchar since it's an int2vector,
4323 # returned as a list by some drivers such as pypostgresql
4325 if self.server_version_info < (8, 5):
4326 IDX_SQL = """
4327 SELECT
4328 i.relname as relname,
4329 ix.indisunique, ix.indexprs, ix.indpred,
4330 a.attname, a.attnum, NULL, ix.indkey%s,
4331 %s, %s, am.amname,
4332 NULL as indnkeyatts
4333 FROM
4334 pg_class t
4335 join pg_index ix on t.oid = ix.indrelid
4336 join pg_class i on i.oid = ix.indexrelid
4337 left outer join
4338 pg_attribute a
4339 on t.oid = a.attrelid and %s
4340 left outer join
4341 pg_am am
4342 on i.relam = am.oid
4343 WHERE
4344 t.relkind IN ('r', 'v', 'f', 'm')
4345 and t.oid = :table_oid
4346 and ix.indisprimary = 'f'
4347 ORDER BY
4348 t.relname,
4349 i.relname
4350 """ % (
4351 # version 8.3 here was based on observing the
4352 # cast does not work in PG 8.2.4, does work in 8.3.0.
4353 # nothing in PG changelogs regarding this.
4354 "::varchar" if self.server_version_info >= (8, 3) else "",
4355 "ix.indoption::varchar"
4356 if self.server_version_info >= (8, 3)
4357 else "NULL",
4358 "i.reloptions"
4359 if self.server_version_info >= (8, 2)
4360 else "NULL",
4361 self._pg_index_any("a.attnum", "ix.indkey"),
4362 )
4363 else:
4364 IDX_SQL = """
4365 SELECT
4366 i.relname as relname,
4367 ix.indisunique, ix.indexprs,
4368 a.attname, a.attnum, c.conrelid, ix.indkey::varchar,
4369 ix.indoption::varchar, i.reloptions, am.amname,
4370 pg_get_expr(ix.indpred, ix.indrelid),
4371 %s as indnkeyatts
4372 FROM
4373 pg_class t
4374 join pg_index ix on t.oid = ix.indrelid
4375 join pg_class i on i.oid = ix.indexrelid
4376 left outer join
4377 pg_attribute a
4378 on t.oid = a.attrelid and a.attnum = ANY(ix.indkey)
4379 left outer join
4380 pg_constraint c
4381 on (ix.indrelid = c.conrelid and
4382 ix.indexrelid = c.conindid and
4383 c.contype in ('p', 'u', 'x'))
4384 left outer join
4385 pg_am am
4386 on i.relam = am.oid
4387 WHERE
4388 t.relkind IN ('r', 'v', 'f', 'm', 'p')
4389 and t.oid = :table_oid
4390 and ix.indisprimary = 'f'
4391 ORDER BY
4392 t.relname,
4393 i.relname
4394 """ % (
4395 "ix.indnkeyatts"
4396 if self.server_version_info >= (11, 0)
4397 else "NULL",
4398 )
4400 t = sql.text(IDX_SQL).columns(
4401 relname=sqltypes.Unicode, attname=sqltypes.Unicode
4402 )
4403 c = connection.execute(t, dict(table_oid=table_oid))
4405 indexes = defaultdict(lambda: defaultdict(dict))
4407 sv_idx_name = None
4408 for row in c.fetchall():
4409 (
4410 idx_name,
4411 unique,
4412 expr,
4413 col,
4414 col_num,
4415 conrelid,
4416 idx_key,
4417 idx_option,
4418 options,
4419 amname,
4420 filter_definition,
4421 indnkeyatts,
4422 ) = row
4424 if expr:
4425 if idx_name != sv_idx_name:
4426 util.warn(
4427 "Skipped unsupported reflection of "
4428 "expression-based index %s" % idx_name
4429 )
4430 sv_idx_name = idx_name
4431 continue
4433 has_idx = idx_name in indexes
4434 index = indexes[idx_name]
4435 if col is not None:
4436 index["cols"][col_num] = col
4437 if not has_idx:
4438 idx_keys = idx_key.split()
4439 # "The number of key columns in the index, not counting any
4440 # included columns, which are merely stored and do not
4441 # participate in the index semantics"
4442 if indnkeyatts and idx_keys[indnkeyatts:]:
4443 # this is a "covering index" which has INCLUDE columns
4444 # as well as regular index columns
4445 inc_keys = idx_keys[indnkeyatts:]
4446 idx_keys = idx_keys[:indnkeyatts]
4447 else:
4448 inc_keys = []
4450 index["key"] = [int(k.strip()) for k in idx_keys]
4451 index["inc"] = [int(k.strip()) for k in inc_keys]
4453 # (new in pg 8.3)
4454 # "pg_index.indoption" is list of ints, one per column/expr.
4455 # int acts as bitmask: 0x01=DESC, 0x02=NULLSFIRST
4456 sorting = {}
4457 for col_idx, col_flags in enumerate(
4458 (idx_option or "").split()
4459 ):
4460 col_flags = int(col_flags.strip())
4461 col_sorting = ()
4462 # try to set flags only if they differ from PG defaults...
4463 if col_flags & 0x01:
4464 col_sorting += ("desc",)
4465 if not (col_flags & 0x02):
4466 col_sorting += ("nulls_last",)
4467 else:
4468 if col_flags & 0x02:
4469 col_sorting += ("nulls_first",)
4470 if col_sorting:
4471 sorting[col_idx] = col_sorting
4472 if sorting:
4473 index["sorting"] = sorting
4475 index["unique"] = unique
4476 if conrelid is not None:
4477 index["duplicates_constraint"] = idx_name
4478 if options:
4479 index["options"] = dict(
4480 [option.split("=") for option in options]
4481 )
4483 # it *might* be nice to include that this is 'btree' in the
4484 # reflection info. But we don't want an Index object
4485 # to have a ``postgresql_using`` in it that is just the
4486 # default, so for the moment leaving this out.
4487 if amname and amname != "btree":
4488 index["amname"] = amname
4490 if filter_definition:
4491 index["postgresql_where"] = filter_definition
4493 result = []
4494 for name, idx in indexes.items():
4495 entry = {
4496 "name": name,
4497 "unique": idx["unique"],
4498 "column_names": [idx["cols"][i] for i in idx["key"]],
4499 }
4500 if self.server_version_info >= (11, 0):
4501 # NOTE: this is legacy, this is part of dialect_options now
4502 # as of #7382
4503 entry["include_columns"] = [idx["cols"][i] for i in idx["inc"]]
4504 if "duplicates_constraint" in idx:
4505 entry["duplicates_constraint"] = idx["duplicates_constraint"]
4506 if "sorting" in idx:
4507 entry["column_sorting"] = dict(
4508 (idx["cols"][idx["key"][i]], value)
4509 for i, value in idx["sorting"].items()
4510 )
4511 if "include_columns" in entry:
4512 entry.setdefault("dialect_options", {})[
4513 "postgresql_include"
4514 ] = entry["include_columns"]
4515 if "options" in idx:
4516 entry.setdefault("dialect_options", {})[
4517 "postgresql_with"
4518 ] = idx["options"]
4519 if "amname" in idx:
4520 entry.setdefault("dialect_options", {})[
4521 "postgresql_using"
4522 ] = idx["amname"]
4523 if "postgresql_where" in idx:
4524 entry.setdefault("dialect_options", {})[
4525 "postgresql_where"
4526 ] = idx["postgresql_where"]
4527 result.append(entry)
4528 return result
4530 @reflection.cache
4531 def get_unique_constraints(
4532 self, connection, table_name, schema=None, **kw
4533 ):
4534 table_oid = self.get_table_oid(
4535 connection, table_name, schema, info_cache=kw.get("info_cache")
4536 )
4538 UNIQUE_SQL = """
4539 SELECT
4540 cons.conname as name,
4541 cons.conkey as key,
4542 a.attnum as col_num,
4543 a.attname as col_name
4544 FROM
4545 pg_catalog.pg_constraint cons
4546 join pg_attribute a
4547 on cons.conrelid = a.attrelid AND
4548 a.attnum = ANY(cons.conkey)
4549 WHERE
4550 cons.conrelid = :table_oid AND
4551 cons.contype = 'u'
4552 """
4554 t = sql.text(UNIQUE_SQL).columns(col_name=sqltypes.Unicode)
4555 c = connection.execute(t, dict(table_oid=table_oid))
4557 uniques = defaultdict(lambda: defaultdict(dict))
4558 for row in c.fetchall():
4559 uc = uniques[row.name]
4560 uc["key"] = row.key
4561 uc["cols"][row.col_num] = row.col_name
4563 return [
4564 {"name": name, "column_names": [uc["cols"][i] for i in uc["key"]]}
4565 for name, uc in uniques.items()
4566 ]
4568 @reflection.cache
4569 def get_table_comment(self, connection, table_name, schema=None, **kw):
4570 table_oid = self.get_table_oid(
4571 connection, table_name, schema, info_cache=kw.get("info_cache")
4572 )
4574 COMMENT_SQL = """
4575 SELECT
4576 pgd.description as table_comment
4577 FROM
4578 pg_catalog.pg_description pgd
4579 WHERE
4580 pgd.objsubid = 0 AND
4581 pgd.objoid = :table_oid
4582 """
4584 c = connection.execute(
4585 sql.text(COMMENT_SQL), dict(table_oid=table_oid)
4586 )
4587 return {"text": c.scalar()}
4589 @reflection.cache
4590 def get_check_constraints(self, connection, table_name, schema=None, **kw):
4591 table_oid = self.get_table_oid(
4592 connection, table_name, schema, info_cache=kw.get("info_cache")
4593 )
4595 CHECK_SQL = """
4596 SELECT
4597 cons.conname as name,
4598 pg_get_constraintdef(cons.oid) as src
4599 FROM
4600 pg_catalog.pg_constraint cons
4601 WHERE
4602 cons.conrelid = :table_oid AND
4603 cons.contype = 'c'
4604 """
4606 c = connection.execute(sql.text(CHECK_SQL), dict(table_oid=table_oid))
4608 ret = []
4609 for name, src in c:
4610 # samples:
4611 # "CHECK (((a > 1) AND (a < 5)))"
4612 # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))"
4613 # "CHECK (((a > 1) AND (a < 5))) NOT VALID"
4614 # "CHECK (some_boolean_function(a))"
4615 # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)"
4617 m = re.match(
4618 r"^CHECK *\((.+)\)( NOT VALID)?$", src, flags=re.DOTALL
4619 )
4620 if not m:
4621 util.warn("Could not parse CHECK constraint text: %r" % src)
4622 sqltext = ""
4623 else:
4624 sqltext = re.compile(
4625 r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL
4626 ).sub(r"\1", m.group(1))
4627 entry = {"name": name, "sqltext": sqltext}
4628 if m and m.group(2):
4629 entry["dialect_options"] = {"not_valid": True}
4631 ret.append(entry)
4632 return ret
4634 def _load_enums(self, connection, schema=None):
4635 schema = schema or self.default_schema_name
4636 if not self.supports_native_enum:
4637 return {}
4639 # Load data types for enums:
4640 SQL_ENUMS = """
4641 SELECT t.typname as "name",
4642 -- no enum defaults in 8.4 at least
4643 -- t.typdefault as "default",
4644 pg_catalog.pg_type_is_visible(t.oid) as "visible",
4645 n.nspname as "schema",
4646 e.enumlabel as "label"
4647 FROM pg_catalog.pg_type t
4648 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
4649 LEFT JOIN pg_catalog.pg_enum e ON t.oid = e.enumtypid
4650 WHERE t.typtype = 'e'
4651 """
4653 if schema != "*":
4654 SQL_ENUMS += "AND n.nspname = :schema "
4656 # e.oid gives us label order within an enum
4657 SQL_ENUMS += 'ORDER BY "schema", "name", e.oid'
4659 s = sql.text(SQL_ENUMS).columns(
4660 attname=sqltypes.Unicode, label=sqltypes.Unicode
4661 )
4663 if schema != "*":
4664 s = s.bindparams(schema=schema)
4666 c = connection.execute(s)
4668 enums = []
4669 enum_by_name = {}
4670 for enum in c.fetchall():
4671 key = (enum.schema, enum.name)
4672 if key in enum_by_name:
4673 enum_by_name[key]["labels"].append(enum.label)
4674 else:
4675 enum_by_name[key] = enum_rec = {
4676 "name": enum.name,
4677 "schema": enum.schema,
4678 "visible": enum.visible,
4679 "labels": [],
4680 }
4681 if enum.label is not None:
4682 enum_rec["labels"].append(enum.label)
4683 enums.append(enum_rec)
4684 return enums
4686 def _load_domains(self, connection):
4687 # Load data types for domains:
4688 SQL_DOMAINS = """
4689 SELECT t.typname as "name",
4690 pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype",
4691 not t.typnotnull as "nullable",
4692 t.typdefault as "default",
4693 pg_catalog.pg_type_is_visible(t.oid) as "visible",
4694 n.nspname as "schema"
4695 FROM pg_catalog.pg_type t
4696 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
4697 WHERE t.typtype = 'd'
4698 """
4700 s = sql.text(SQL_DOMAINS)
4701 c = connection.execution_options(future_result=True).execute(s)
4703 domains = {}
4704 for domain in c.mappings():
4705 domain = domain
4706 # strip (30) from character varying(30)
4707 attype = re.search(r"([^\(]+)", domain["attype"]).group(1)
4708 # 'visible' just means whether or not the domain is in a
4709 # schema that's on the search path -- or not overridden by
4710 # a schema with higher precedence. If it's not visible,
4711 # it will be prefixed with the schema-name when it's used.
4712 if domain["visible"]:
4713 key = (domain["name"],)
4714 else:
4715 key = (domain["schema"], domain["name"])
4717 domains[key] = {
4718 "attype": attype,
4719 "nullable": domain["nullable"],
4720 "default": domain["default"],
4721 }
4723 return domains