1# dialects/postgresql/base.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7
8r"""
9.. dialect:: postgresql
10 :name: PostgreSQL
11 :full_support: 12, 13, 14, 15
12 :normal_support: 9.6+
13 :best_effort: 8+
14
15.. _postgresql_sequences:
16
17Sequences/SERIAL/IDENTITY
18-------------------------
19
20PostgreSQL supports sequences, and SQLAlchemy uses these as the default means
21of creating new primary key values for integer-based primary key columns. When
22creating tables, SQLAlchemy will issue the ``SERIAL`` datatype for
23integer-based primary key columns, which generates a sequence and server side
24default corresponding to the column.
25
26To specify a specific named sequence to be used for primary key generation,
27use the :func:`~sqlalchemy.schema.Sequence` construct::
28
29 Table('sometable', metadata,
30 Column('id', Integer, Sequence('some_id_seq'), primary_key=True)
31 )
32
33When SQLAlchemy issues a single INSERT statement, to fulfill the contract of
34having the "last insert identifier" available, a RETURNING clause is added to
35the INSERT statement which specifies the primary key columns should be
36returned after the statement completes. The RETURNING functionality only takes
37place if PostgreSQL 8.2 or later is in use. As a fallback approach, the
38sequence, whether specified explicitly or implicitly via ``SERIAL``, is
39executed independently beforehand, the returned value to be used in the
40subsequent insert. Note that when an
41:func:`~sqlalchemy.sql.expression.insert()` construct is executed using
42"executemany" semantics, the "last inserted identifier" functionality does not
43apply; no RETURNING clause is emitted nor is the sequence pre-executed in this
44case.
45
46To force the usage of RETURNING by default off, specify the flag
47``implicit_returning=False`` to :func:`_sa.create_engine`.
48
49PostgreSQL 10 and above IDENTITY columns
50^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
51
52PostgreSQL 10 and above have a new IDENTITY feature that supersedes the use
53of SERIAL. The :class:`_schema.Identity` construct in a
54:class:`_schema.Column` can be used to control its behavior::
55
56 from sqlalchemy import Table, Column, MetaData, Integer, Computed
57
58 metadata = MetaData()
59
60 data = Table(
61 "data",
62 metadata,
63 Column(
64 'id', Integer, Identity(start=42, cycle=True), primary_key=True
65 ),
66 Column('data', String)
67 )
68
69The CREATE TABLE for the above :class:`_schema.Table` object would be:
70
71.. sourcecode:: sql
72
73 CREATE TABLE data (
74 id INTEGER GENERATED BY DEFAULT AS IDENTITY (START WITH 42 CYCLE),
75 data VARCHAR,
76 PRIMARY KEY (id)
77 )
78
79.. versionchanged:: 1.4 Added :class:`_schema.Identity` construct
80 in a :class:`_schema.Column` to specify the option of an autoincrementing
81 column.
82
83.. note::
84
85 Previous versions of SQLAlchemy did not have built-in support for rendering
86 of IDENTITY, and could use the following compilation hook to replace
87 occurrences of SERIAL with IDENTITY::
88
89 from sqlalchemy.schema import CreateColumn
90 from sqlalchemy.ext.compiler import compiles
91
92
93 @compiles(CreateColumn, 'postgresql')
94 def use_identity(element, compiler, **kw):
95 text = compiler.visit_create_column(element, **kw)
96 text = text.replace(
97 "SERIAL", "INT GENERATED BY DEFAULT AS IDENTITY"
98 )
99 return text
100
101 Using the above, a table such as::
102
103 t = Table(
104 't', m,
105 Column('id', Integer, primary_key=True),
106 Column('data', String)
107 )
108
109 Will generate on the backing database as::
110
111 CREATE TABLE t (
112 id INT GENERATED BY DEFAULT AS IDENTITY,
113 data VARCHAR,
114 PRIMARY KEY (id)
115 )
116
117.. _postgresql_ss_cursors:
118
119Server Side Cursors
120-------------------
121
122Server-side cursor support is available for the psycopg2, asyncpg
123dialects and may also be available in others.
124
125Server side cursors are enabled on a per-statement basis by using the
126:paramref:`.Connection.execution_options.stream_results` connection execution
127option::
128
129 with engine.connect() as conn:
130 result = conn.execution_options(stream_results=True).execute(text("select * from table"))
131
132Note that some kinds of SQL statements may not be supported with
133server side cursors; generally, only SQL statements that return rows should be
134used with this option.
135
136.. deprecated:: 1.4 The dialect-level server_side_cursors flag is deprecated
137 and will be removed in a future release. Please use the
138 :paramref:`_engine.Connection.stream_results` execution option for
139 unbuffered cursor support.
140
141.. seealso::
142
143 :ref:`engine_stream_results`
144
145.. _postgresql_isolation_level:
146
147Transaction Isolation Level
148---------------------------
149
150Most SQLAlchemy dialects support setting of transaction isolation level
151using the :paramref:`_sa.create_engine.isolation_level` parameter
152at the :func:`_sa.create_engine` level, and at the :class:`_engine.Connection`
153level via the :paramref:`.Connection.execution_options.isolation_level`
154parameter.
155
156For PostgreSQL dialects, this feature works either by making use of the
157DBAPI-specific features, such as psycopg2's isolation level flags which will
158embed the isolation level setting inline with the ``"BEGIN"`` statement, or for
159DBAPIs with no direct support by emitting ``SET SESSION CHARACTERISTICS AS
160TRANSACTION ISOLATION LEVEL <level>`` ahead of the ``"BEGIN"`` statement
161emitted by the DBAPI. For the special AUTOCOMMIT isolation level,
162DBAPI-specific techniques are used which is typically an ``.autocommit``
163flag on the DBAPI connection object.
164
165To set isolation level using :func:`_sa.create_engine`::
166
167 engine = create_engine(
168 "postgresql+pg8000://scott:tiger@localhost/test",
169 isolation_level = "REPEATABLE READ"
170 )
171
172To set using per-connection execution options::
173
174 with engine.connect() as conn:
175 conn = conn.execution_options(
176 isolation_level="REPEATABLE READ"
177 )
178 with conn.begin():
179 # ... work with transaction
180
181There are also more options for isolation level configurations, such as
182"sub-engine" objects linked to a main :class:`_engine.Engine` which each apply
183different isolation level settings. See the discussion at
184:ref:`dbapi_autocommit` for background.
185
186Valid values for ``isolation_level`` on most PostgreSQL dialects include:
187
188* ``READ COMMITTED``
189* ``READ UNCOMMITTED``
190* ``REPEATABLE READ``
191* ``SERIALIZABLE``
192* ``AUTOCOMMIT``
193
194.. seealso::
195
196 :ref:`dbapi_autocommit`
197
198 :ref:`postgresql_readonly_deferrable`
199
200 :ref:`psycopg2_isolation_level`
201
202 :ref:`pg8000_isolation_level`
203
204.. _postgresql_readonly_deferrable:
205
206Setting READ ONLY / DEFERRABLE
207------------------------------
208
209Most PostgreSQL dialects support setting the "READ ONLY" and "DEFERRABLE"
210characteristics of the transaction, which is in addition to the isolation level
211setting. These two attributes can be established either in conjunction with or
212independently of the isolation level by passing the ``postgresql_readonly`` and
213``postgresql_deferrable`` flags with
214:meth:`_engine.Connection.execution_options`. The example below illustrates
215passing the ``"SERIALIZABLE"`` isolation level at the same time as setting
216"READ ONLY" and "DEFERRABLE"::
217
218 with engine.connect() as conn:
219 conn = conn.execution_options(
220 isolation_level="SERIALIZABLE",
221 postgresql_readonly=True,
222 postgresql_deferrable=True
223 )
224 with conn.begin():
225 # ... work with transaction
226
227Note that some DBAPIs such as asyncpg only support "readonly" with
228SERIALIZABLE isolation.
229
230.. versionadded:: 1.4 added support for the ``postgresql_readonly``
231 and ``postgresql_deferrable`` execution options.
232
233.. _postgresql_reset_on_return:
234
235Temporary Table / Resource Reset for Connection Pooling
236-------------------------------------------------------
237
238The :class:`.QueuePool` connection pool implementation used
239by the SQLAlchemy :class:`_sa.Engine` object includes
240:ref:`reset on return <pool_reset_on_return>` behavior that will invoke
241the DBAPI ``.rollback()`` method when connections are returned to the pool.
242While this rollback will clear out the immediate state used by the previous
243transaction, it does not cover a wider range of session-level state, including
244temporary tables as well as other server state such as prepared statement
245handles and statement caches. The PostgreSQL database includes a variety
246of commands which may be used to reset this state, including
247``DISCARD``, ``RESET``, ``DEALLOCATE``, and ``UNLISTEN``.
248
249
250To install
251one or more of these commands as the means of performing reset-on-return,
252the :meth:`.PoolEvents.reset` event hook may be used, as demonstrated
253in the example below (**requires SQLAlchemy 1.4.43 or greater**). The implementation
254will end transactions in progress as well as discard temporary tables
255using the ``CLOSE``, ``RESET`` and ``DISCARD`` commands; see the PostgreSQL
256documentation for background on what each of these statements do.
257
258The :paramref:`_sa.create_engine.pool_reset_on_return` parameter
259is set to ``None`` so that the custom scheme can replace the default behavior
260completely. The custom hook implementation calls ``.rollback()`` in any case,
261as it's usually important that the DBAPI's own tracking of commit/rollback
262will remain consistent with the state of the transaction::
263
264
265 from sqlalchemy import create_engine
266 from sqlalchemy import event
267
268 postgresql_engine = create_engine(
269 "postgresql+pyscopg2://scott:tiger@hostname/dbname",
270
271 # disable default reset-on-return scheme
272 pool_reset_on_return=None,
273 )
274
275
276 @event.listens_for(postgresql_engine, "reset")
277 def _reset_postgresql(dbapi_connection, connection_record, reset_state):
278 dbapi_connection.execute("CLOSE ALL")
279 dbapi_connection.execute("RESET ALL")
280 dbapi_connection.execute("DISCARD TEMP")
281
282 # so that the DBAPI itself knows that the connection has been
283 # reset
284 dbapi_connection.rollback()
285
286.. versionchanged:: 1.4.43 Ensured the :meth:`.PoolEvents.reset` event
287 is invoked for all "reset" occurrences, so that it's appropriate
288 as a place for custom "reset" handlers. Previous schemes which
289 use the :meth:`.PoolEvents.checkin` handler remain usable as well.
290
291.. seealso::
292
293 :ref:`pool_reset_on_return` - in the :ref:`pooling_toplevel` documentation
294
295.. _postgresql_alternate_search_path:
296
297Setting Alternate Search Paths on Connect
298------------------------------------------
299
300The PostgreSQL ``search_path`` variable refers to the list of schema names
301that will be implicitly referred towards when a particular table or other
302object is referenced in a SQL statement. As detailed in the next section
303:ref:`postgresql_schema_reflection`, SQLAlchemy is generally organized around
304the concept of keeping this variable at its default value of ``public``,
305however, in order to have it set to any arbitrary name or names when connections
306are used automatically, the "SET SESSION search_path" command may be invoked
307for all connections in a pool using the following event handler, as discussed
308at :ref:`schema_set_default_connections`::
309
310 from sqlalchemy import event
311 from sqlalchemy import create_engine
312
313 engine = create_engine("postgresql+psycopg2://scott:tiger@host/dbname")
314
315 @event.listens_for(engine, "connect", insert=True)
316 def set_search_path(dbapi_connection, connection_record):
317 existing_autocommit = dbapi_connection.autocommit
318 dbapi_connection.autocommit = True
319 cursor = dbapi_connection.cursor()
320 cursor.execute("SET SESSION search_path='%s'" % schema_name)
321 cursor.close()
322 dbapi_connection.autocommit = existing_autocommit
323
324The reason the recipe is complicated by use of the ``.autocommit`` DBAPI
325attribute is so that when the ``SET SESSION search_path`` directive is invoked,
326it is invoked outside of the scope of any transaction and therefore will not
327be reverted when the DBAPI connection has a rollback.
328
329.. seealso::
330
331 :ref:`schema_set_default_connections` - in the :ref:`metadata_toplevel` documentation
332
333
334
335
336.. _postgresql_schema_reflection:
337
338Remote-Schema Table Introspection and PostgreSQL search_path
339------------------------------------------------------------
340
341.. admonition:: Section Best Practices Summarized
342
343 keep the ``search_path`` variable set to its default of ``public``, without
344 any other schema names. For other schema names, name these explicitly
345 within :class:`_schema.Table` definitions. Alternatively, the
346 ``postgresql_ignore_search_path`` option will cause all reflected
347 :class:`_schema.Table` objects to have a :attr:`_schema.Table.schema`
348 attribute set up.
349
350The PostgreSQL dialect can reflect tables from any schema, as outlined in
351:ref:`metadata_reflection_schemas`.
352
353With regards to tables which these :class:`_schema.Table`
354objects refer to via foreign key constraint, a decision must be made as to how
355the ``.schema`` is represented in those remote tables, in the case where that
356remote schema name is also a member of the current
357`PostgreSQL search path
358<https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_.
359
360By default, the PostgreSQL dialect mimics the behavior encouraged by
361PostgreSQL's own ``pg_get_constraintdef()`` builtin procedure. This function
362returns a sample definition for a particular foreign key constraint,
363omitting the referenced schema name from that definition when the name is
364also in the PostgreSQL schema search path. The interaction below
365illustrates this behavior::
366
367 test=> CREATE TABLE test_schema.referred(id INTEGER PRIMARY KEY);
368 CREATE TABLE
369 test=> CREATE TABLE referring(
370 test(> id INTEGER PRIMARY KEY,
371 test(> referred_id INTEGER REFERENCES test_schema.referred(id));
372 CREATE TABLE
373 test=> SET search_path TO public, test_schema;
374 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
375 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
376 test-> ON n.oid = c.relnamespace
377 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
378 test-> WHERE c.relname='referring' AND r.contype = 'f'
379 test-> ;
380 pg_get_constraintdef
381 ---------------------------------------------------
382 FOREIGN KEY (referred_id) REFERENCES referred(id)
383 (1 row)
384
385Above, we created a table ``referred`` as a member of the remote schema
386``test_schema``, however when we added ``test_schema`` to the
387PG ``search_path`` and then asked ``pg_get_constraintdef()`` for the
388``FOREIGN KEY`` syntax, ``test_schema`` was not included in the output of
389the function.
390
391On the other hand, if we set the search path back to the typical default
392of ``public``::
393
394 test=> SET search_path TO public;
395 SET
396
397The same query against ``pg_get_constraintdef()`` now returns the fully
398schema-qualified name for us::
399
400 test=> SELECT pg_catalog.pg_get_constraintdef(r.oid, true) FROM
401 test-> pg_catalog.pg_class c JOIN pg_catalog.pg_namespace n
402 test-> ON n.oid = c.relnamespace
403 test-> JOIN pg_catalog.pg_constraint r ON c.oid = r.conrelid
404 test-> WHERE c.relname='referring' AND r.contype = 'f';
405 pg_get_constraintdef
406 ---------------------------------------------------------------
407 FOREIGN KEY (referred_id) REFERENCES test_schema.referred(id)
408 (1 row)
409
410SQLAlchemy will by default use the return value of ``pg_get_constraintdef()``
411in order to determine the remote schema name. That is, if our ``search_path``
412were set to include ``test_schema``, and we invoked a table
413reflection process as follows::
414
415 >>> from sqlalchemy import Table, MetaData, create_engine, text
416 >>> engine = create_engine("postgresql://scott:tiger@localhost/test")
417 >>> with engine.connect() as conn:
418 ... conn.execute(text("SET search_path TO test_schema, public"))
419 ... metadata_obj = MetaData()
420 ... referring = Table('referring', metadata_obj,
421 ... autoload_with=conn)
422 ...
423 <sqlalchemy.engine.result.CursorResult object at 0x101612ed0>
424
425The above process would deliver to the :attr:`_schema.MetaData.tables`
426collection
427``referred`` table named **without** the schema::
428
429 >>> metadata_obj.tables['referred'].schema is None
430 True
431
432To alter the behavior of reflection such that the referred schema is
433maintained regardless of the ``search_path`` setting, use the
434``postgresql_ignore_search_path`` option, which can be specified as a
435dialect-specific argument to both :class:`_schema.Table` as well as
436:meth:`_schema.MetaData.reflect`::
437
438 >>> with engine.connect() as conn:
439 ... conn.execute(text("SET search_path TO test_schema, public"))
440 ... metadata_obj = MetaData()
441 ... referring = Table('referring', metadata_obj,
442 ... autoload_with=conn,
443 ... postgresql_ignore_search_path=True)
444 ...
445 <sqlalchemy.engine.result.CursorResult object at 0x1016126d0>
446
447We will now have ``test_schema.referred`` stored as schema-qualified::
448
449 >>> metadata_obj.tables['test_schema.referred'].schema
450 'test_schema'
451
452.. sidebar:: Best Practices for PostgreSQL Schema reflection
453
454 The description of PostgreSQL schema reflection behavior is complex, and
455 is the product of many years of dealing with widely varied use cases and
456 user preferences. But in fact, there's no need to understand any of it if
457 you just stick to the simplest use pattern: leave the ``search_path`` set
458 to its default of ``public`` only, never refer to the name ``public`` as
459 an explicit schema name otherwise, and refer to all other schema names
460 explicitly when building up a :class:`_schema.Table` object. The options
461 described here are only for those users who can't, or prefer not to, stay
462 within these guidelines.
463
464Note that **in all cases**, the "default" schema is always reflected as
465``None``. The "default" schema on PostgreSQL is that which is returned by the
466PostgreSQL ``current_schema()`` function. On a typical PostgreSQL
467installation, this is the name ``public``. So a table that refers to another
468which is in the ``public`` (i.e. default) schema will always have the
469``.schema`` attribute set to ``None``.
470
471.. seealso::
472
473 :ref:`reflection_schema_qualified_interaction` - discussion of the issue
474 from a backend-agnostic perspective
475
476 `The Schema Search Path
477 <https://www.postgresql.org/docs/current/static/ddl-schemas.html#DDL-SCHEMAS-PATH>`_
478 - on the PostgreSQL website.
479
480INSERT/UPDATE...RETURNING
481-------------------------
482
483The dialect supports PG 8.2's ``INSERT..RETURNING``, ``UPDATE..RETURNING`` and
484``DELETE..RETURNING`` syntaxes. ``INSERT..RETURNING`` is used by default
485for single-row INSERT statements in order to fetch newly generated
486primary key identifiers. To specify an explicit ``RETURNING`` clause,
487use the :meth:`._UpdateBase.returning` method on a per-statement basis::
488
489 # INSERT..RETURNING
490 result = table.insert().returning(table.c.col1, table.c.col2).\
491 values(name='foo')
492 print(result.fetchall())
493
494 # UPDATE..RETURNING
495 result = table.update().returning(table.c.col1, table.c.col2).\
496 where(table.c.name=='foo').values(name='bar')
497 print(result.fetchall())
498
499 # DELETE..RETURNING
500 result = table.delete().returning(table.c.col1, table.c.col2).\
501 where(table.c.name=='foo')
502 print(result.fetchall())
503
504.. _postgresql_insert_on_conflict:
505
506INSERT...ON CONFLICT (Upsert)
507------------------------------
508
509Starting with version 9.5, PostgreSQL allows "upserts" (update or insert) of
510rows into a table via the ``ON CONFLICT`` clause of the ``INSERT`` statement. A
511candidate row will only be inserted if that row does not violate any unique
512constraints. In the case of a unique constraint violation, a secondary action
513can occur which can be either "DO UPDATE", indicating that the data in the
514target row should be updated, or "DO NOTHING", which indicates to silently skip
515this row.
516
517Conflicts are determined using existing unique constraints and indexes. These
518constraints may be identified either using their name as stated in DDL,
519or they may be inferred by stating the columns and conditions that comprise
520the indexes.
521
522SQLAlchemy provides ``ON CONFLICT`` support via the PostgreSQL-specific
523:func:`_postgresql.insert()` function, which provides
524the generative methods :meth:`_postgresql.Insert.on_conflict_do_update`
525and :meth:`~.postgresql.Insert.on_conflict_do_nothing`:
526
527.. sourcecode:: pycon+sql
528
529 >>> from sqlalchemy.dialects.postgresql import insert
530 >>> insert_stmt = insert(my_table).values(
531 ... id='some_existing_id',
532 ... data='inserted value')
533 >>> do_nothing_stmt = insert_stmt.on_conflict_do_nothing(
534 ... index_elements=['id']
535 ... )
536 >>> print(do_nothing_stmt)
537 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
538 ON CONFLICT (id) DO NOTHING
539 {stop}
540
541 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
542 ... constraint='pk_my_table',
543 ... set_=dict(data='updated value')
544 ... )
545 >>> print(do_update_stmt)
546 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
547 ON CONFLICT ON CONSTRAINT pk_my_table DO UPDATE SET data = %(param_1)s
548
549.. versionadded:: 1.1
550
551.. seealso::
552
553 `INSERT .. ON CONFLICT
554 <https://www.postgresql.org/docs/current/static/sql-insert.html#SQL-ON-CONFLICT>`_
555 - in the PostgreSQL documentation.
556
557Specifying the Target
558^^^^^^^^^^^^^^^^^^^^^
559
560Both methods supply the "target" of the conflict using either the
561named constraint or by column inference:
562
563* The :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` argument
564 specifies a sequence containing string column names, :class:`_schema.Column`
565 objects, and/or SQL expression elements, which would identify a unique
566 index:
567
568 .. sourcecode:: pycon+sql
569
570 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
571 ... index_elements=['id'],
572 ... set_=dict(data='updated value')
573 ... )
574 >>> print(do_update_stmt)
575 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
576 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
577 {stop}
578
579 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
580 ... index_elements=[my_table.c.id],
581 ... set_=dict(data='updated value')
582 ... )
583 >>> print(do_update_stmt)
584 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
585 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
586
587* When using :paramref:`_postgresql.Insert.on_conflict_do_update.index_elements` to
588 infer an index, a partial index can be inferred by also specifying the
589 use the :paramref:`_postgresql.Insert.on_conflict_do_update.index_where` parameter:
590
591 .. sourcecode:: pycon+sql
592
593 >>> stmt = insert(my_table).values(user_email='a@b.com', data='inserted data')
594 >>> stmt = stmt.on_conflict_do_update(
595 ... index_elements=[my_table.c.user_email],
596 ... index_where=my_table.c.user_email.like('%@gmail.com'),
597 ... set_=dict(data=stmt.excluded.data)
598 ... )
599 >>> print(stmt)
600 {opensql}INSERT INTO my_table (data, user_email)
601 VALUES (%(data)s, %(user_email)s) ON CONFLICT (user_email)
602 WHERE user_email LIKE %(user_email_1)s DO UPDATE SET data = excluded.data
603
604* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument is
605 used to specify an index directly rather than inferring it. This can be
606 the name of a UNIQUE constraint, a PRIMARY KEY constraint, or an INDEX:
607
608 .. sourcecode:: pycon+sql
609
610 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
611 ... constraint='my_table_idx_1',
612 ... set_=dict(data='updated value')
613 ... )
614 >>> print(do_update_stmt)
615 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
616 ON CONFLICT ON CONSTRAINT my_table_idx_1 DO UPDATE SET data = %(param_1)s
617 {stop}
618
619 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
620 ... constraint='my_table_pk',
621 ... set_=dict(data='updated value')
622 ... )
623 >>> print(do_update_stmt)
624 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
625 ON CONFLICT ON CONSTRAINT my_table_pk DO UPDATE SET data = %(param_1)s
626 {stop}
627
628* The :paramref:`_postgresql.Insert.on_conflict_do_update.constraint` argument may
629 also refer to a SQLAlchemy construct representing a constraint,
630 e.g. :class:`.UniqueConstraint`, :class:`.PrimaryKeyConstraint`,
631 :class:`.Index`, or :class:`.ExcludeConstraint`. In this use,
632 if the constraint has a name, it is used directly. Otherwise, if the
633 constraint is unnamed, then inference will be used, where the expressions
634 and optional WHERE clause of the constraint will be spelled out in the
635 construct. This use is especially convenient
636 to refer to the named or unnamed primary key of a :class:`_schema.Table`
637 using the
638 :attr:`_schema.Table.primary_key` attribute:
639
640 .. sourcecode:: pycon+sql
641
642 >>> do_update_stmt = insert_stmt.on_conflict_do_update(
643 ... constraint=my_table.primary_key,
644 ... set_=dict(data='updated value')
645 ... )
646 >>> print(do_update_stmt)
647 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
648 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
649
650The SET Clause
651^^^^^^^^^^^^^^^
652
653``ON CONFLICT...DO UPDATE`` is used to perform an update of the already
654existing row, using any combination of new values as well as values
655from the proposed insertion. These values are specified using the
656:paramref:`_postgresql.Insert.on_conflict_do_update.set_` parameter. This
657parameter accepts a dictionary which consists of direct values
658for UPDATE:
659
660.. sourcecode:: pycon+sql
661
662 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
663 >>> do_update_stmt = stmt.on_conflict_do_update(
664 ... index_elements=['id'],
665 ... set_=dict(data='updated value')
666 ... )
667 >>> print(do_update_stmt)
668 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
669 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s
670
671.. warning::
672
673 The :meth:`_expression.Insert.on_conflict_do_update`
674 method does **not** take into
675 account Python-side default UPDATE values or generation functions, e.g.
676 those specified using :paramref:`_schema.Column.onupdate`.
677 These values will not be exercised for an ON CONFLICT style of UPDATE,
678 unless they are manually specified in the
679 :paramref:`_postgresql.Insert.on_conflict_do_update.set_` dictionary.
680
681Updating using the Excluded INSERT Values
682^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
683
684In order to refer to the proposed insertion row, the special alias
685:attr:`~.postgresql.Insert.excluded` is available as an attribute on
686the :class:`_postgresql.Insert` object; this object is a
687:class:`_expression.ColumnCollection`
688which alias contains all columns of the target
689table:
690
691.. sourcecode:: pycon+sql
692
693 >>> stmt = insert(my_table).values(
694 ... id='some_id',
695 ... data='inserted value',
696 ... author='jlh'
697 ... )
698 >>> do_update_stmt = stmt.on_conflict_do_update(
699 ... index_elements=['id'],
700 ... set_=dict(data='updated value', author=stmt.excluded.author)
701 ... )
702 >>> print(do_update_stmt)
703 {opensql}INSERT INTO my_table (id, data, author)
704 VALUES (%(id)s, %(data)s, %(author)s)
705 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
706
707Additional WHERE Criteria
708^^^^^^^^^^^^^^^^^^^^^^^^^
709
710The :meth:`_expression.Insert.on_conflict_do_update` method also accepts
711a WHERE clause using the :paramref:`_postgresql.Insert.on_conflict_do_update.where`
712parameter, which will limit those rows which receive an UPDATE:
713
714.. sourcecode:: pycon+sql
715
716 >>> stmt = insert(my_table).values(
717 ... id='some_id',
718 ... data='inserted value',
719 ... author='jlh'
720 ... )
721 >>> on_update_stmt = stmt.on_conflict_do_update(
722 ... index_elements=['id'],
723 ... set_=dict(data='updated value', author=stmt.excluded.author),
724 ... where=(my_table.c.status == 2)
725 ... )
726 >>> print(on_update_stmt)
727 {opensql}INSERT INTO my_table (id, data, author)
728 VALUES (%(id)s, %(data)s, %(author)s)
729 ON CONFLICT (id) DO UPDATE SET data = %(param_1)s, author = excluded.author
730 WHERE my_table.status = %(status_1)s
731
732Skipping Rows with DO NOTHING
733^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
734
735``ON CONFLICT`` may be used to skip inserting a row entirely
736if any conflict with a unique or exclusion constraint occurs; below
737this is illustrated using the
738:meth:`~.postgresql.Insert.on_conflict_do_nothing` method:
739
740.. sourcecode:: pycon+sql
741
742 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
743 >>> stmt = stmt.on_conflict_do_nothing(index_elements=['id'])
744 >>> print(stmt)
745 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
746 ON CONFLICT (id) DO NOTHING
747
748If ``DO NOTHING`` is used without specifying any columns or constraint,
749it has the effect of skipping the INSERT for any unique or exclusion
750constraint violation which occurs:
751
752.. sourcecode:: pycon+sql
753
754 >>> stmt = insert(my_table).values(id='some_id', data='inserted value')
755 >>> stmt = stmt.on_conflict_do_nothing()
756 >>> print(stmt)
757 {opensql}INSERT INTO my_table (id, data) VALUES (%(id)s, %(data)s)
758 ON CONFLICT DO NOTHING
759
760.. _postgresql_match:
761
762Full Text Search
763----------------
764
765SQLAlchemy makes available the PostgreSQL ``@@`` operator via the
766:meth:`_expression.ColumnElement.match` method on any textual column expression.
767
768On the PostgreSQL dialect, an expression like the following::
769
770 select(sometable.c.text.match("search string"))
771
772will emit to the database::
773
774 SELECT text @@ to_tsquery('search string') FROM table
775
776Various other PostgreSQL text search functions such as ``to_tsquery()``,
777``to_tsvector()``, and ``plainto_tsquery()`` are available by explicitly using
778the standard SQLAlchemy :data:`.func` construct.
779
780For example::
781
782 select(func.to_tsvector('fat cats ate rats').match('cat & rat'))
783
784Emits the equivalent of::
785
786 SELECT to_tsvector('fat cats ate rats') @@ to_tsquery('cat & rat')
787
788The :class:`_postgresql.TSVECTOR` type can provide for explicit CAST::
789
790 from sqlalchemy.dialects.postgresql import TSVECTOR
791 from sqlalchemy import select, cast
792 select(cast("some text", TSVECTOR))
793
794produces a statement equivalent to::
795
796 SELECT CAST('some text' AS TSVECTOR) AS anon_1
797
798.. tip::
799
800 It's important to remember that text searching in PostgreSQL is powerful but complicated,
801 and SQLAlchemy users are advised to reference the PostgreSQL documentation
802 regarding
803 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_.
804
805 There are important differences between ``to_tsquery`` and
806 ``plainto_tsquery``, the most significant of which is that ``to_tsquery``
807 expects specially formatted "querytext" that is written to PostgreSQL's own
808 specification, while ``plainto_tsquery`` expects unformatted text that is
809 transformed into ``to_tsquery`` compatible querytext. This means the input to
810 ``.match()`` under PostgreSQL may be incompatible with the input to
811 ``.match()`` under another database backend. SQLAlchemy users who support
812 multiple backends are advised to carefully implement their usage of
813 ``.match()`` to work around these constraints.
814
815Full Text Searches in PostgreSQL are influenced by a combination of: the
816PostgreSQL setting of ``default_text_search_config``, the ``regconfig`` used
817to build the GIN/GiST indexes, and the ``regconfig`` optionally passed in
818during a query.
819
820When performing a Full Text Search against a column that has a GIN or
821GiST index that is already pre-computed (which is common on full text
822searches) one may need to explicitly pass in a particular PostgreSQL
823``regconfig`` value to ensure the query-planner utilizes the index and does
824not re-compute the column on demand.
825
826In order to provide for this explicit query planning, or to use different
827search strategies, the ``match`` method accepts a ``postgresql_regconfig``
828keyword argument::
829
830 select(mytable.c.id).where(
831 mytable.c.title.match('somestring', postgresql_regconfig='english')
832 )
833
834Emits the equivalent of::
835
836 SELECT mytable.id FROM mytable
837 WHERE mytable.title @@ to_tsquery('english', 'somestring')
838
839One can also specifically pass in a `'regconfig'` value to the
840``to_tsvector()`` command as the initial argument::
841
842 select(mytable.c.id).where(
843 func.to_tsvector('english', mytable.c.title )\
844 .match('somestring', postgresql_regconfig='english')
845 )
846
847produces a statement equivalent to::
848
849 SELECT mytable.id FROM mytable
850 WHERE to_tsvector('english', mytable.title) @@
851 to_tsquery('english', 'somestring')
852
853It is recommended that you use the ``EXPLAIN ANALYZE...`` tool from
854PostgreSQL to ensure that you are generating queries with SQLAlchemy that
855take full advantage of any indexes you may have created for full text search.
856
857.. seealso::
858
859 `Full Text Search <https://www.postgresql.org/docs/current/textsearch-controls.html>`_ - in the PostgreSQL documentation
860
861
862FROM ONLY ...
863-------------
864
865The dialect supports PostgreSQL's ONLY keyword for targeting only a particular
866table in an inheritance hierarchy. This can be used to produce the
867``SELECT ... FROM ONLY``, ``UPDATE ONLY ...``, and ``DELETE FROM ONLY ...``
868syntaxes. It uses SQLAlchemy's hints mechanism::
869
870 # SELECT ... FROM ONLY ...
871 result = table.select().with_hint(table, 'ONLY', 'postgresql')
872 print(result.fetchall())
873
874 # UPDATE ONLY ...
875 table.update(values=dict(foo='bar')).with_hint('ONLY',
876 dialect_name='postgresql')
877
878 # DELETE FROM ONLY ...
879 table.delete().with_hint('ONLY', dialect_name='postgresql')
880
881
882.. _postgresql_indexes:
883
884PostgreSQL-Specific Index Options
885---------------------------------
886
887Several extensions to the :class:`.Index` construct are available, specific
888to the PostgreSQL dialect.
889
890Covering Indexes
891^^^^^^^^^^^^^^^^
892
893The ``postgresql_include`` option renders INCLUDE(colname) for the given
894string names::
895
896 Index("my_index", table.c.x, postgresql_include=['y'])
897
898would render the index as ``CREATE INDEX my_index ON table (x) INCLUDE (y)``
899
900Note that this feature requires PostgreSQL 11 or later.
901
902.. versionadded:: 1.4
903
904.. _postgresql_partial_indexes:
905
906Partial Indexes
907^^^^^^^^^^^^^^^
908
909Partial indexes add criterion to the index definition so that the index is
910applied to a subset of rows. These can be specified on :class:`.Index`
911using the ``postgresql_where`` keyword argument::
912
913 Index('my_index', my_table.c.id, postgresql_where=my_table.c.value > 10)
914
915.. _postgresql_operator_classes:
916
917Operator Classes
918^^^^^^^^^^^^^^^^
919
920PostgreSQL allows the specification of an *operator class* for each column of
921an index (see
922https://www.postgresql.org/docs/current/interactive/indexes-opclass.html).
923The :class:`.Index` construct allows these to be specified via the
924``postgresql_ops`` keyword argument::
925
926 Index(
927 'my_index', my_table.c.id, my_table.c.data,
928 postgresql_ops={
929 'data': 'text_pattern_ops',
930 'id': 'int4_ops'
931 })
932
933Note that the keys in the ``postgresql_ops`` dictionaries are the
934"key" name of the :class:`_schema.Column`, i.e. the name used to access it from
935the ``.c`` collection of :class:`_schema.Table`, which can be configured to be
936different than the actual name of the column as expressed in the database.
937
938If ``postgresql_ops`` is to be used against a complex SQL expression such
939as a function call, then to apply to the column it must be given a label
940that is identified in the dictionary by name, e.g.::
941
942 Index(
943 'my_index', my_table.c.id,
944 func.lower(my_table.c.data).label('data_lower'),
945 postgresql_ops={
946 'data_lower': 'text_pattern_ops',
947 'id': 'int4_ops'
948 })
949
950Operator classes are also supported by the
951:class:`_postgresql.ExcludeConstraint` construct using the
952:paramref:`_postgresql.ExcludeConstraint.ops` parameter. See that parameter for
953details.
954
955.. versionadded:: 1.3.21 added support for operator classes with
956 :class:`_postgresql.ExcludeConstraint`.
957
958
959Index Types
960^^^^^^^^^^^
961
962PostgreSQL provides several index types: B-Tree, Hash, GiST, and GIN, as well
963as the ability for users to create their own (see
964https://www.postgresql.org/docs/current/static/indexes-types.html). These can be
965specified on :class:`.Index` using the ``postgresql_using`` keyword argument::
966
967 Index('my_index', my_table.c.data, postgresql_using='gin')
968
969The value passed to the keyword argument will be simply passed through to the
970underlying CREATE INDEX command, so it *must* be a valid index type for your
971version of PostgreSQL.
972
973.. _postgresql_index_storage:
974
975Index Storage Parameters
976^^^^^^^^^^^^^^^^^^^^^^^^
977
978PostgreSQL allows storage parameters to be set on indexes. The storage
979parameters available depend on the index method used by the index. Storage
980parameters can be specified on :class:`.Index` using the ``postgresql_with``
981keyword argument::
982
983 Index('my_index', my_table.c.data, postgresql_with={"fillfactor": 50})
984
985.. versionadded:: 1.0.6
986
987PostgreSQL allows to define the tablespace in which to create the index.
988The tablespace can be specified on :class:`.Index` using the
989``postgresql_tablespace`` keyword argument::
990
991 Index('my_index', my_table.c.data, postgresql_tablespace='my_tablespace')
992
993.. versionadded:: 1.1
994
995Note that the same option is available on :class:`_schema.Table` as well.
996
997.. _postgresql_index_concurrently:
998
999Indexes with CONCURRENTLY
1000^^^^^^^^^^^^^^^^^^^^^^^^^
1001
1002The PostgreSQL index option CONCURRENTLY is supported by passing the
1003flag ``postgresql_concurrently`` to the :class:`.Index` construct::
1004
1005 tbl = Table('testtbl', m, Column('data', Integer))
1006
1007 idx1 = Index('test_idx1', tbl.c.data, postgresql_concurrently=True)
1008
1009The above index construct will render DDL for CREATE INDEX, assuming
1010PostgreSQL 8.2 or higher is detected or for a connection-less dialect, as::
1011
1012 CREATE INDEX CONCURRENTLY test_idx1 ON testtbl (data)
1013
1014For DROP INDEX, assuming PostgreSQL 9.2 or higher is detected or for
1015a connection-less dialect, it will emit::
1016
1017 DROP INDEX CONCURRENTLY test_idx1
1018
1019.. versionadded:: 1.1 support for CONCURRENTLY on DROP INDEX. The
1020 CONCURRENTLY keyword is now only emitted if a high enough version
1021 of PostgreSQL is detected on the connection (or for a connection-less
1022 dialect).
1023
1024When using CONCURRENTLY, the PostgreSQL database requires that the statement
1025be invoked outside of a transaction block. The Python DBAPI enforces that
1026even for a single statement, a transaction is present, so to use this
1027construct, the DBAPI's "autocommit" mode must be used::
1028
1029 metadata = MetaData()
1030 table = Table(
1031 "foo", metadata,
1032 Column("id", String))
1033 index = Index(
1034 "foo_idx", table.c.id, postgresql_concurrently=True)
1035
1036 with engine.connect() as conn:
1037 with conn.execution_options(isolation_level='AUTOCOMMIT'):
1038 table.create(conn)
1039
1040.. seealso::
1041
1042 :ref:`postgresql_isolation_level`
1043
1044.. _postgresql_index_reflection:
1045
1046PostgreSQL Index Reflection
1047---------------------------
1048
1049The PostgreSQL database creates a UNIQUE INDEX implicitly whenever the
1050UNIQUE CONSTRAINT construct is used. When inspecting a table using
1051:class:`_reflection.Inspector`, the :meth:`_reflection.Inspector.get_indexes`
1052and the :meth:`_reflection.Inspector.get_unique_constraints`
1053will report on these
1054two constructs distinctly; in the case of the index, the key
1055``duplicates_constraint`` will be present in the index entry if it is
1056detected as mirroring a constraint. When performing reflection using
1057``Table(..., autoload_with=engine)``, the UNIQUE INDEX is **not** returned
1058in :attr:`_schema.Table.indexes` when it is detected as mirroring a
1059:class:`.UniqueConstraint` in the :attr:`_schema.Table.constraints` collection
1060.
1061
1062.. versionchanged:: 1.0.0 - :class:`_schema.Table` reflection now includes
1063 :class:`.UniqueConstraint` objects present in the
1064 :attr:`_schema.Table.constraints`
1065 collection; the PostgreSQL backend will no longer include a "mirrored"
1066 :class:`.Index` construct in :attr:`_schema.Table.indexes`
1067 if it is detected
1068 as corresponding to a unique constraint.
1069
1070Special Reflection Options
1071--------------------------
1072
1073The :class:`_reflection.Inspector`
1074used for the PostgreSQL backend is an instance
1075of :class:`.PGInspector`, which offers additional methods::
1076
1077 from sqlalchemy import create_engine, inspect
1078
1079 engine = create_engine("postgresql+psycopg2://localhost/test")
1080 insp = inspect(engine) # will be a PGInspector
1081
1082 print(insp.get_enums())
1083
1084.. autoclass:: PGInspector
1085 :members:
1086
1087.. _postgresql_table_options:
1088
1089PostgreSQL Table Options
1090------------------------
1091
1092Several options for CREATE TABLE are supported directly by the PostgreSQL
1093dialect in conjunction with the :class:`_schema.Table` construct:
1094
1095* ``TABLESPACE``::
1096
1097 Table("some_table", metadata, ..., postgresql_tablespace='some_tablespace')
1098
1099 The above option is also available on the :class:`.Index` construct.
1100
1101* ``ON COMMIT``::
1102
1103 Table("some_table", metadata, ..., postgresql_on_commit='PRESERVE ROWS')
1104
1105* ``WITH OIDS``::
1106
1107 Table("some_table", metadata, ..., postgresql_with_oids=True)
1108
1109* ``WITHOUT OIDS``::
1110
1111 Table("some_table", metadata, ..., postgresql_with_oids=False)
1112
1113* ``INHERITS``::
1114
1115 Table("some_table", metadata, ..., postgresql_inherits="some_supertable")
1116
1117 Table("some_table", metadata, ..., postgresql_inherits=("t1", "t2", ...))
1118
1119 .. versionadded:: 1.0.0
1120
1121* ``PARTITION BY``::
1122
1123 Table("some_table", metadata, ...,
1124 postgresql_partition_by='LIST (part_column)')
1125
1126 .. versionadded:: 1.2.6
1127
1128.. seealso::
1129
1130 `PostgreSQL CREATE TABLE options
1131 <https://www.postgresql.org/docs/current/static/sql-createtable.html>`_ -
1132 in the PostgreSQL documentation.
1133
1134.. _postgresql_constraint_options:
1135
1136PostgreSQL Constraint Options
1137-----------------------------
1138
1139The following option(s) are supported by the PostgreSQL dialect in conjunction
1140with selected constraint constructs:
1141
1142* ``NOT VALID``: This option applies towards CHECK and FOREIGN KEY constraints
1143 when the constraint is being added to an existing table via ALTER TABLE,
1144 and has the effect that existing rows are not scanned during the ALTER
1145 operation against the constraint being added.
1146
1147 When using a SQL migration tool such as `Alembic <https://alembic.sqlalchemy.org>`_
1148 that renders ALTER TABLE constructs, the ``postgresql_not_valid`` argument
1149 may be specified as an additional keyword argument within the operation
1150 that creates the constraint, as in the following Alembic example::
1151
1152 def update():
1153 op.create_foreign_key(
1154 "fk_user_address",
1155 "address",
1156 "user",
1157 ["user_id"],
1158 ["id"],
1159 postgresql_not_valid=True
1160 )
1161
1162 The keyword is ultimately accepted directly by the
1163 :class:`_schema.CheckConstraint`, :class:`_schema.ForeignKeyConstraint`
1164 and :class:`_schema.ForeignKey` constructs; when using a tool like
1165 Alembic, dialect-specific keyword arguments are passed through to
1166 these constructs from the migration operation directives::
1167
1168 CheckConstraint("some_field IS NOT NULL", postgresql_not_valid=True)
1169
1170 ForeignKeyConstraint(["some_id"], ["some_table.some_id"], postgresql_not_valid=True)
1171
1172 .. versionadded:: 1.4.32
1173
1174 .. seealso::
1175
1176 `PostgreSQL ALTER TABLE options
1177 <https://www.postgresql.org/docs/current/static/sql-altertable.html>`_ -
1178 in the PostgreSQL documentation.
1179
1180.. _postgresql_table_valued_overview:
1181
1182Table values, Table and Column valued functions, Row and Tuple objects
1183-----------------------------------------------------------------------
1184
1185PostgreSQL makes great use of modern SQL forms such as table-valued functions,
1186tables and rows as values. These constructs are commonly used as part
1187of PostgreSQL's support for complex datatypes such as JSON, ARRAY, and other
1188datatypes. SQLAlchemy's SQL expression language has native support for
1189most table-valued and row-valued forms.
1190
1191.. _postgresql_table_valued:
1192
1193Table-Valued Functions
1194^^^^^^^^^^^^^^^^^^^^^^^
1195
1196Many PostgreSQL built-in functions are intended to be used in the FROM clause
1197of a SELECT statement, and are capable of returning table rows or sets of table
1198rows. A large portion of PostgreSQL's JSON functions for example such as
1199``json_array_elements()``, ``json_object_keys()``, ``json_each_text()``,
1200``json_each()``, ``json_to_record()``, ``json_populate_recordset()`` use such
1201forms. These classes of SQL function calling forms in SQLAlchemy are available
1202using the :meth:`_functions.FunctionElement.table_valued` method in conjunction
1203with :class:`_functions.Function` objects generated from the :data:`_sql.func`
1204namespace.
1205
1206Examples from PostgreSQL's reference documentation follow below:
1207
1208* ``json_each()``::
1209
1210 >>> from sqlalchemy import select, func
1211 >>> stmt = select(func.json_each('{"a":"foo", "b":"bar"}').table_valued("key", "value"))
1212 >>> print(stmt)
1213 SELECT anon_1.key, anon_1.value
1214 FROM json_each(:json_each_1) AS anon_1
1215
1216* ``json_populate_record()``::
1217
1218 >>> from sqlalchemy import select, func, literal_column
1219 >>> stmt = select(
1220 ... func.json_populate_record(
1221 ... literal_column("null::myrowtype"),
1222 ... '{"a":1,"b":2}'
1223 ... ).table_valued("a", "b", name="x")
1224 ... )
1225 >>> print(stmt)
1226 SELECT x.a, x.b
1227 FROM json_populate_record(null::myrowtype, :json_populate_record_1) AS x
1228
1229* ``json_to_record()`` - this form uses a PostgreSQL specific form of derived
1230 columns in the alias, where we may make use of :func:`_sql.column` elements with
1231 types to produce them. The :meth:`_functions.FunctionElement.table_valued`
1232 method produces a :class:`_sql.TableValuedAlias` construct, and the method
1233 :meth:`_sql.TableValuedAlias.render_derived` method sets up the derived
1234 columns specification::
1235
1236 >>> from sqlalchemy import select, func, column, Integer, Text
1237 >>> stmt = select(
1238 ... func.json_to_record('{"a":1,"b":[1,2,3],"c":"bar"}').table_valued(
1239 ... column("a", Integer), column("b", Text), column("d", Text),
1240 ... ).render_derived(name="x", with_types=True)
1241 ... )
1242 >>> print(stmt)
1243 SELECT x.a, x.b, x.d
1244 FROM json_to_record(:json_to_record_1) AS x(a INTEGER, b TEXT, d TEXT)
1245
1246* ``WITH ORDINALITY`` - part of the SQL standard, ``WITH ORDINALITY`` adds an
1247 ordinal counter to the output of a function and is accepted by a limited set
1248 of PostgreSQL functions including ``unnest()`` and ``generate_series()``. The
1249 :meth:`_functions.FunctionElement.table_valued` method accepts a keyword
1250 parameter ``with_ordinality`` for this purpose, which accepts the string name
1251 that will be applied to the "ordinality" column::
1252
1253 >>> from sqlalchemy import select, func
1254 >>> stmt = select(
1255 ... func.generate_series(4, 1, -1).
1256 ... table_valued("value", with_ordinality="ordinality").
1257 ... render_derived()
1258 ... )
1259 >>> print(stmt)
1260 SELECT anon_1.value, anon_1.ordinality
1261 FROM generate_series(:generate_series_1, :generate_series_2, :generate_series_3)
1262 WITH ORDINALITY AS anon_1(value, ordinality)
1263
1264.. versionadded:: 1.4.0b2
1265
1266.. seealso::
1267
1268 :ref:`tutorial_functions_table_valued` - in the :ref:`unified_tutorial`
1269
1270.. _postgresql_column_valued:
1271
1272Column Valued Functions
1273^^^^^^^^^^^^^^^^^^^^^^^
1274
1275Similar to the table valued function, a column valued function is present
1276in the FROM clause, but delivers itself to the columns clause as a single
1277scalar value. PostgreSQL functions such as ``json_array_elements()``,
1278``unnest()`` and ``generate_series()`` may use this form. Column valued functions are available using the
1279:meth:`_functions.FunctionElement.column_valued` method of :class:`_functions.FunctionElement`:
1280
1281* ``json_array_elements()``::
1282
1283 >>> from sqlalchemy import select, func
1284 >>> stmt = select(func.json_array_elements('["one", "two"]').column_valued("x"))
1285 >>> print(stmt)
1286 SELECT x
1287 FROM json_array_elements(:json_array_elements_1) AS x
1288
1289* ``unnest()`` - in order to generate a PostgreSQL ARRAY literal, the
1290 :func:`_postgresql.array` construct may be used::
1291
1292
1293 >>> from sqlalchemy.dialects.postgresql import array
1294 >>> from sqlalchemy import select, func
1295 >>> stmt = select(func.unnest(array([1, 2])).column_valued())
1296 >>> print(stmt)
1297 SELECT anon_1
1298 FROM unnest(ARRAY[%(param_1)s, %(param_2)s]) AS anon_1
1299
1300 The function can of course be used against an existing table-bound column
1301 that's of type :class:`_types.ARRAY`::
1302
1303 >>> from sqlalchemy import table, column, ARRAY, Integer
1304 >>> from sqlalchemy import select, func
1305 >>> t = table("t", column('value', ARRAY(Integer)))
1306 >>> stmt = select(func.unnest(t.c.value).column_valued("unnested_value"))
1307 >>> print(stmt)
1308 SELECT unnested_value
1309 FROM unnest(t.value) AS unnested_value
1310
1311.. seealso::
1312
1313 :ref:`tutorial_functions_column_valued` - in the :ref:`unified_tutorial`
1314
1315
1316Row Types
1317^^^^^^^^^
1318
1319Built-in support for rendering a ``ROW`` may be approximated using
1320``func.ROW`` with the :attr:`_sa.func` namespace, or by using the
1321:func:`_sql.tuple_` construct::
1322
1323 >>> from sqlalchemy import table, column, func, tuple_
1324 >>> t = table("t", column("id"), column("fk"))
1325 >>> stmt = t.select().where(
1326 ... tuple_(t.c.id, t.c.fk) > (1,2)
1327 ... ).where(
1328 ... func.ROW(t.c.id, t.c.fk) < func.ROW(3, 7)
1329 ... )
1330 >>> print(stmt)
1331 SELECT t.id, t.fk
1332 FROM t
1333 WHERE (t.id, t.fk) > (:param_1, :param_2) AND ROW(t.id, t.fk) < ROW(:ROW_1, :ROW_2)
1334
1335.. seealso::
1336
1337 `PostgreSQL Row Constructors
1338 <https://www.postgresql.org/docs/current/sql-expressions.html#SQL-SYNTAX-ROW-CONSTRUCTORS>`_
1339
1340 `PostgreSQL Row Constructor Comparison
1341 <https://www.postgresql.org/docs/current/functions-comparisons.html#ROW-WISE-COMPARISON>`_
1342
1343Table Types passed to Functions
1344^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1345
1346PostgreSQL supports passing a table as an argument to a function, which it
1347refers towards as a "record" type. SQLAlchemy :class:`_sql.FromClause` objects
1348such as :class:`_schema.Table` support this special form using the
1349:meth:`_sql.FromClause.table_valued` method, which is comparable to the
1350:meth:`_functions.FunctionElement.table_valued` method except that the collection
1351of columns is already established by that of the :class:`_sql.FromClause`
1352itself::
1353
1354
1355 >>> from sqlalchemy import table, column, func, select
1356 >>> a = table( "a", column("id"), column("x"), column("y"))
1357 >>> stmt = select(func.row_to_json(a.table_valued()))
1358 >>> print(stmt)
1359 SELECT row_to_json(a) AS row_to_json_1
1360 FROM a
1361
1362.. versionadded:: 1.4.0b2
1363
1364
1365ARRAY Types
1366-----------
1367
1368The PostgreSQL dialect supports arrays, both as multidimensional column types
1369as well as array literals:
1370
1371* :class:`_postgresql.ARRAY` - ARRAY datatype
1372
1373* :class:`_postgresql.array` - array literal
1374
1375* :func:`_postgresql.array_agg` - ARRAY_AGG SQL function
1376
1377* :class:`_postgresql.aggregate_order_by` - helper for PG's ORDER BY aggregate
1378 function syntax.
1379
1380JSON Types
1381----------
1382
1383The PostgreSQL dialect supports both JSON and JSONB datatypes, including
1384psycopg2's native support and support for all of PostgreSQL's special
1385operators:
1386
1387* :class:`_postgresql.JSON`
1388
1389* :class:`_postgresql.JSONB`
1390
1391HSTORE Type
1392-----------
1393
1394The PostgreSQL HSTORE type as well as hstore literals are supported:
1395
1396* :class:`_postgresql.HSTORE` - HSTORE datatype
1397
1398* :class:`_postgresql.hstore` - hstore literal
1399
1400ENUM Types
1401----------
1402
1403PostgreSQL has an independently creatable TYPE structure which is used
1404to implement an enumerated type. This approach introduces significant
1405complexity on the SQLAlchemy side in terms of when this type should be
1406CREATED and DROPPED. The type object is also an independently reflectable
1407entity. The following sections should be consulted:
1408
1409* :class:`_postgresql.ENUM` - DDL and typing support for ENUM.
1410
1411* :meth:`.PGInspector.get_enums` - retrieve a listing of current ENUM types
1412
1413* :meth:`.postgresql.ENUM.create` , :meth:`.postgresql.ENUM.drop` - individual
1414 CREATE and DROP commands for ENUM.
1415
1416.. _postgresql_array_of_enum:
1417
1418Using ENUM with ARRAY
1419^^^^^^^^^^^^^^^^^^^^^
1420
1421The combination of ENUM and ARRAY is not directly supported by backend
1422DBAPIs at this time. Prior to SQLAlchemy 1.3.17, a special workaround
1423was needed in order to allow this combination to work, described below.
1424
1425.. versionchanged:: 1.3.17 The combination of ENUM and ARRAY is now directly
1426 handled by SQLAlchemy's implementation without any workarounds needed.
1427
1428.. sourcecode:: python
1429
1430 from sqlalchemy import TypeDecorator
1431 from sqlalchemy.dialects.postgresql import ARRAY
1432
1433 class ArrayOfEnum(TypeDecorator):
1434 impl = ARRAY
1435
1436 def bind_expression(self, bindvalue):
1437 return sa.cast(bindvalue, self)
1438
1439 def result_processor(self, dialect, coltype):
1440 super_rp = super(ArrayOfEnum, self).result_processor(
1441 dialect, coltype)
1442
1443 def handle_raw_string(value):
1444 inner = re.match(r"^{(.*)}$", value).group(1)
1445 return inner.split(",") if inner else []
1446
1447 def process(value):
1448 if value is None:
1449 return None
1450 return super_rp(handle_raw_string(value))
1451 return process
1452
1453E.g.::
1454
1455 Table(
1456 'mydata', metadata,
1457 Column('id', Integer, primary_key=True),
1458 Column('data', ArrayOfEnum(ENUM('a', 'b, 'c', name='myenum')))
1459
1460 )
1461
1462This type is not included as a built-in type as it would be incompatible
1463with a DBAPI that suddenly decides to support ARRAY of ENUM directly in
1464a new version.
1465
1466.. _postgresql_array_of_json:
1467
1468Using JSON/JSONB with ARRAY
1469^^^^^^^^^^^^^^^^^^^^^^^^^^^
1470
1471Similar to using ENUM, prior to SQLAlchemy 1.3.17, for an ARRAY of JSON/JSONB
1472we need to render the appropriate CAST. Current psycopg2 drivers accommodate
1473the result set correctly without any special steps.
1474
1475.. versionchanged:: 1.3.17 The combination of JSON/JSONB and ARRAY is now
1476 directly handled by SQLAlchemy's implementation without any workarounds
1477 needed.
1478
1479.. sourcecode:: python
1480
1481 class CastingArray(ARRAY):
1482 def bind_expression(self, bindvalue):
1483 return sa.cast(bindvalue, self)
1484
1485E.g.::
1486
1487 Table(
1488 'mydata', metadata,
1489 Column('id', Integer, primary_key=True),
1490 Column('data', CastingArray(JSONB))
1491 )
1492
1493
1494""" # noqa: E501
1495
1496from collections import defaultdict
1497import datetime as dt
1498import re
1499from uuid import UUID as _python_UUID
1500
1501from . import array as _array
1502from . import dml
1503from . import hstore as _hstore
1504from . import json as _json
1505from . import ranges as _ranges
1506from ... import exc
1507from ... import schema
1508from ... import sql
1509from ... import util
1510from ...engine import characteristics
1511from ...engine import default
1512from ...engine import reflection
1513from ...sql import coercions
1514from ...sql import compiler
1515from ...sql import elements
1516from ...sql import expression
1517from ...sql import roles
1518from ...sql import sqltypes
1519from ...sql import util as sql_util
1520from ...sql.ddl import DDLBase
1521from ...types import BIGINT
1522from ...types import BOOLEAN
1523from ...types import CHAR
1524from ...types import DATE
1525from ...types import FLOAT
1526from ...types import INTEGER
1527from ...types import NUMERIC
1528from ...types import REAL
1529from ...types import SMALLINT
1530from ...types import TEXT
1531from ...types import VARCHAR
1532
1533IDX_USING = re.compile(r"^(?:btree|hash|gist|gin|[\w_]+)$", re.I)
1534
1535AUTOCOMMIT_REGEXP = re.compile(
1536 r"\s*(?:UPDATE|INSERT|CREATE|DELETE|DROP|ALTER|GRANT|REVOKE|"
1537 "IMPORT FOREIGN SCHEMA|REFRESH MATERIALIZED VIEW|TRUNCATE)",
1538 re.I | re.UNICODE,
1539)
1540
1541RESERVED_WORDS = set(
1542 [
1543 "all",
1544 "analyse",
1545 "analyze",
1546 "and",
1547 "any",
1548 "array",
1549 "as",
1550 "asc",
1551 "asymmetric",
1552 "both",
1553 "case",
1554 "cast",
1555 "check",
1556 "collate",
1557 "column",
1558 "constraint",
1559 "create",
1560 "current_catalog",
1561 "current_date",
1562 "current_role",
1563 "current_time",
1564 "current_timestamp",
1565 "current_user",
1566 "default",
1567 "deferrable",
1568 "desc",
1569 "distinct",
1570 "do",
1571 "else",
1572 "end",
1573 "except",
1574 "false",
1575 "fetch",
1576 "for",
1577 "foreign",
1578 "from",
1579 "grant",
1580 "group",
1581 "having",
1582 "in",
1583 "initially",
1584 "intersect",
1585 "into",
1586 "leading",
1587 "limit",
1588 "localtime",
1589 "localtimestamp",
1590 "new",
1591 "not",
1592 "null",
1593 "of",
1594 "off",
1595 "offset",
1596 "old",
1597 "on",
1598 "only",
1599 "or",
1600 "order",
1601 "placing",
1602 "primary",
1603 "references",
1604 "returning",
1605 "select",
1606 "session_user",
1607 "some",
1608 "symmetric",
1609 "table",
1610 "then",
1611 "to",
1612 "trailing",
1613 "true",
1614 "union",
1615 "unique",
1616 "user",
1617 "using",
1618 "variadic",
1619 "when",
1620 "where",
1621 "window",
1622 "with",
1623 "authorization",
1624 "between",
1625 "binary",
1626 "cross",
1627 "current_schema",
1628 "freeze",
1629 "full",
1630 "ilike",
1631 "inner",
1632 "is",
1633 "isnull",
1634 "join",
1635 "left",
1636 "like",
1637 "natural",
1638 "notnull",
1639 "outer",
1640 "over",
1641 "overlaps",
1642 "right",
1643 "similar",
1644 "verbose",
1645 ]
1646)
1647
1648_DECIMAL_TYPES = (1231, 1700)
1649_FLOAT_TYPES = (700, 701, 1021, 1022)
1650_INT_TYPES = (20, 21, 23, 26, 1005, 1007, 1016)
1651
1652
1653class BYTEA(sqltypes.LargeBinary):
1654 __visit_name__ = "BYTEA"
1655
1656
1657class DOUBLE_PRECISION(sqltypes.Float):
1658 __visit_name__ = "DOUBLE_PRECISION"
1659
1660
1661class INET(sqltypes.TypeEngine):
1662 __visit_name__ = "INET"
1663
1664
1665PGInet = INET
1666
1667
1668class CIDR(sqltypes.TypeEngine):
1669 __visit_name__ = "CIDR"
1670
1671
1672PGCidr = CIDR
1673
1674
1675class MACADDR(sqltypes.TypeEngine):
1676 __visit_name__ = "MACADDR"
1677
1678
1679PGMacAddr = MACADDR
1680
1681
1682class MACADDR8(sqltypes.TypeEngine):
1683 __visit_name__ = "MACADDR8"
1684
1685
1686PGMacAddr8 = MACADDR8
1687
1688
1689class MONEY(sqltypes.TypeEngine):
1690
1691 r"""Provide the PostgreSQL MONEY type.
1692
1693 Depending on driver, result rows using this type may return a
1694 string value which includes currency symbols.
1695
1696 For this reason, it may be preferable to provide conversion to a
1697 numerically-based currency datatype using :class:`_types.TypeDecorator`::
1698
1699 import re
1700 import decimal
1701 from sqlalchemy import TypeDecorator
1702
1703 class NumericMoney(TypeDecorator):
1704 impl = MONEY
1705
1706 def process_result_value(self, value: Any, dialect: Any) -> None:
1707 if value is not None:
1708 # adjust this for the currency and numeric
1709 m = re.match(r"\$([\d.]+)", value)
1710 if m:
1711 value = decimal.Decimal(m.group(1))
1712 return value
1713
1714 Alternatively, the conversion may be applied as a CAST using
1715 the :meth:`_types.TypeDecorator.column_expression` method as follows::
1716
1717 import decimal
1718 from sqlalchemy import cast
1719 from sqlalchemy import TypeDecorator
1720
1721 class NumericMoney(TypeDecorator):
1722 impl = MONEY
1723
1724 def column_expression(self, column: Any):
1725 return cast(column, Numeric())
1726
1727 .. versionadded:: 1.2
1728
1729 """
1730
1731 __visit_name__ = "MONEY"
1732
1733
1734class OID(sqltypes.TypeEngine):
1735
1736 """Provide the PostgreSQL OID type.
1737
1738 .. versionadded:: 0.9.5
1739
1740 """
1741
1742 __visit_name__ = "OID"
1743
1744
1745class REGCLASS(sqltypes.TypeEngine):
1746
1747 """Provide the PostgreSQL REGCLASS type.
1748
1749 .. versionadded:: 1.2.7
1750
1751 """
1752
1753 __visit_name__ = "REGCLASS"
1754
1755
1756class TIMESTAMP(sqltypes.TIMESTAMP):
1757
1758 """Provide the PostgreSQL TIMESTAMP type."""
1759
1760 __visit_name__ = "TIMESTAMP"
1761
1762 def __init__(self, timezone=False, precision=None):
1763 """Construct a TIMESTAMP.
1764
1765 :param timezone: boolean value if timezone present, default False
1766 :param precision: optional integer precision value
1767
1768 .. versionadded:: 1.4
1769
1770 """
1771 super(TIMESTAMP, self).__init__(timezone=timezone)
1772 self.precision = precision
1773
1774
1775class TIME(sqltypes.TIME):
1776
1777 """PostgreSQL TIME type."""
1778
1779 __visit_name__ = "TIME"
1780
1781 def __init__(self, timezone=False, precision=None):
1782 """Construct a TIME.
1783
1784 :param timezone: boolean value if timezone present, default False
1785 :param precision: optional integer precision value
1786
1787 .. versionadded:: 1.4
1788
1789 """
1790 super(TIME, self).__init__(timezone=timezone)
1791 self.precision = precision
1792
1793
1794class INTERVAL(sqltypes.NativeForEmulated, sqltypes._AbstractInterval):
1795
1796 """PostgreSQL INTERVAL type."""
1797
1798 __visit_name__ = "INTERVAL"
1799 native = True
1800
1801 def __init__(self, precision=None, fields=None):
1802 """Construct an INTERVAL.
1803
1804 :param precision: optional integer precision value
1805 :param fields: string fields specifier. allows storage of fields
1806 to be limited, such as ``"YEAR"``, ``"MONTH"``, ``"DAY TO HOUR"``,
1807 etc.
1808
1809 .. versionadded:: 1.2
1810
1811 """
1812 self.precision = precision
1813 self.fields = fields
1814
1815 @classmethod
1816 def adapt_emulated_to_native(cls, interval, **kw):
1817 return INTERVAL(precision=interval.second_precision)
1818
1819 @property
1820 def _type_affinity(self):
1821 return sqltypes.Interval
1822
1823 def as_generic(self, allow_nulltype=False):
1824 return sqltypes.Interval(native=True, second_precision=self.precision)
1825
1826 @property
1827 def python_type(self):
1828 return dt.timedelta
1829
1830 def coerce_compared_value(self, op, value):
1831 return self
1832
1833
1834PGInterval = INTERVAL
1835
1836
1837class BIT(sqltypes.TypeEngine):
1838 __visit_name__ = "BIT"
1839
1840 def __init__(self, length=None, varying=False):
1841 if not varying:
1842 # BIT without VARYING defaults to length 1
1843 self.length = length or 1
1844 else:
1845 # but BIT VARYING can be unlimited-length, so no default
1846 self.length = length
1847 self.varying = varying
1848
1849
1850PGBit = BIT
1851
1852
1853class UUID(sqltypes.TypeEngine):
1854
1855 """PostgreSQL UUID type.
1856
1857 Represents the UUID column type, interpreting
1858 data either as natively returned by the DBAPI
1859 or as Python uuid objects.
1860
1861 The UUID type is currently known to work within the prominent DBAPI
1862 drivers supported by SQLAlchemy including psycopg2, pg8000 and
1863 asyncpg. Support for other DBAPI drivers may be incomplete or non-present.
1864
1865 """
1866
1867 __visit_name__ = "UUID"
1868
1869 def __init__(self, as_uuid=False):
1870 """Construct a UUID type.
1871
1872
1873 :param as_uuid=False: if True, values will be interpreted
1874 as Python uuid objects, converting to/from string via the
1875 DBAPI.
1876
1877 """
1878 self.as_uuid = as_uuid
1879
1880 def coerce_compared_value(self, op, value):
1881 """See :meth:`.TypeEngine.coerce_compared_value` for a description."""
1882
1883 if isinstance(value, util.string_types):
1884 return self
1885 else:
1886 return super(UUID, self).coerce_compared_value(op, value)
1887
1888 def bind_processor(self, dialect):
1889 if self.as_uuid:
1890
1891 def process(value):
1892 if value is not None:
1893 value = util.text_type(value)
1894 return value
1895
1896 return process
1897 else:
1898 return None
1899
1900 def result_processor(self, dialect, coltype):
1901 if self.as_uuid:
1902
1903 def process(value):
1904 if value is not None:
1905 value = _python_UUID(value)
1906 return value
1907
1908 return process
1909 else:
1910 return None
1911
1912 def literal_processor(self, dialect):
1913 if self.as_uuid:
1914
1915 def process(value):
1916 if value is not None:
1917 value = "'%s'::UUID" % value
1918 return value
1919
1920 return process
1921 else:
1922
1923 def process(value):
1924 if value is not None:
1925 value = "'%s'" % value
1926 return value
1927
1928 return process
1929
1930 @property
1931 def python_type(self):
1932 return _python_UUID if self.as_uuid else str
1933
1934
1935PGUuid = UUID
1936
1937
1938class TSVECTOR(sqltypes.TypeEngine):
1939
1940 """The :class:`_postgresql.TSVECTOR` type implements the PostgreSQL
1941 text search type TSVECTOR.
1942
1943 It can be used to do full text queries on natural language
1944 documents.
1945
1946 .. versionadded:: 0.9.0
1947
1948 .. seealso::
1949
1950 :ref:`postgresql_match`
1951
1952 """
1953
1954 __visit_name__ = "TSVECTOR"
1955
1956
1957class ENUM(sqltypes.NativeForEmulated, sqltypes.Enum):
1958
1959 """PostgreSQL ENUM type.
1960
1961 This is a subclass of :class:`_types.Enum` which includes
1962 support for PG's ``CREATE TYPE`` and ``DROP TYPE``.
1963
1964 When the builtin type :class:`_types.Enum` is used and the
1965 :paramref:`.Enum.native_enum` flag is left at its default of
1966 True, the PostgreSQL backend will use a :class:`_postgresql.ENUM`
1967 type as the implementation, so the special create/drop rules
1968 will be used.
1969
1970 The create/drop behavior of ENUM is necessarily intricate, due to the
1971 awkward relationship the ENUM type has in relationship to the
1972 parent table, in that it may be "owned" by just a single table, or
1973 may be shared among many tables.
1974
1975 When using :class:`_types.Enum` or :class:`_postgresql.ENUM`
1976 in an "inline" fashion, the ``CREATE TYPE`` and ``DROP TYPE`` is emitted
1977 corresponding to when the :meth:`_schema.Table.create` and
1978 :meth:`_schema.Table.drop`
1979 methods are called::
1980
1981 table = Table('sometable', metadata,
1982 Column('some_enum', ENUM('a', 'b', 'c', name='myenum'))
1983 )
1984
1985 table.create(engine) # will emit CREATE ENUM and CREATE TABLE
1986 table.drop(engine) # will emit DROP TABLE and DROP ENUM
1987
1988 To use a common enumerated type between multiple tables, the best
1989 practice is to declare the :class:`_types.Enum` or
1990 :class:`_postgresql.ENUM` independently, and associate it with the
1991 :class:`_schema.MetaData` object itself::
1992
1993 my_enum = ENUM('a', 'b', 'c', name='myenum', metadata=metadata)
1994
1995 t1 = Table('sometable_one', metadata,
1996 Column('some_enum', myenum)
1997 )
1998
1999 t2 = Table('sometable_two', metadata,
2000 Column('some_enum', myenum)
2001 )
2002
2003 When this pattern is used, care must still be taken at the level
2004 of individual table creates. Emitting CREATE TABLE without also
2005 specifying ``checkfirst=True`` will still cause issues::
2006
2007 t1.create(engine) # will fail: no such type 'myenum'
2008
2009 If we specify ``checkfirst=True``, the individual table-level create
2010 operation will check for the ``ENUM`` and create if not exists::
2011
2012 # will check if enum exists, and emit CREATE TYPE if not
2013 t1.create(engine, checkfirst=True)
2014
2015 When using a metadata-level ENUM type, the type will always be created
2016 and dropped if either the metadata-wide create/drop is called::
2017
2018 metadata.create_all(engine) # will emit CREATE TYPE
2019 metadata.drop_all(engine) # will emit DROP TYPE
2020
2021 The type can also be created and dropped directly::
2022
2023 my_enum.create(engine)
2024 my_enum.drop(engine)
2025
2026 .. versionchanged:: 1.0.0 The PostgreSQL :class:`_postgresql.ENUM` type
2027 now behaves more strictly with regards to CREATE/DROP. A metadata-level
2028 ENUM type will only be created and dropped at the metadata level,
2029 not the table level, with the exception of
2030 ``table.create(checkfirst=True)``.
2031 The ``table.drop()`` call will now emit a DROP TYPE for a table-level
2032 enumerated type.
2033
2034 """
2035
2036 native_enum = True
2037
2038 def __init__(self, *enums, **kw):
2039 """Construct an :class:`_postgresql.ENUM`.
2040
2041 Arguments are the same as that of
2042 :class:`_types.Enum`, but also including
2043 the following parameters.
2044
2045 :param create_type: Defaults to True.
2046 Indicates that ``CREATE TYPE`` should be
2047 emitted, after optionally checking for the
2048 presence of the type, when the parent
2049 table is being created; and additionally
2050 that ``DROP TYPE`` is called when the table
2051 is dropped. When ``False``, no check
2052 will be performed and no ``CREATE TYPE``
2053 or ``DROP TYPE`` is emitted, unless
2054 :meth:`~.postgresql.ENUM.create`
2055 or :meth:`~.postgresql.ENUM.drop`
2056 are called directly.
2057 Setting to ``False`` is helpful
2058 when invoking a creation scheme to a SQL file
2059 without access to the actual database -
2060 the :meth:`~.postgresql.ENUM.create` and
2061 :meth:`~.postgresql.ENUM.drop` methods can
2062 be used to emit SQL to a target bind.
2063
2064 """
2065 native_enum = kw.pop("native_enum", None)
2066 if native_enum is False:
2067 util.warn(
2068 "the native_enum flag does not apply to the "
2069 "sqlalchemy.dialects.postgresql.ENUM datatype; this type "
2070 "always refers to ENUM. Use sqlalchemy.types.Enum for "
2071 "non-native enum."
2072 )
2073 self.create_type = kw.pop("create_type", True)
2074 super(ENUM, self).__init__(*enums, **kw)
2075
2076 @classmethod
2077 def adapt_emulated_to_native(cls, impl, **kw):
2078 """Produce a PostgreSQL native :class:`_postgresql.ENUM` from plain
2079 :class:`.Enum`.
2080
2081 """
2082 kw.setdefault("validate_strings", impl.validate_strings)
2083 kw.setdefault("name", impl.name)
2084 kw.setdefault("schema", impl.schema)
2085 kw.setdefault("inherit_schema", impl.inherit_schema)
2086 kw.setdefault("metadata", impl.metadata)
2087 kw.setdefault("_create_events", False)
2088 kw.setdefault("values_callable", impl.values_callable)
2089 kw.setdefault("omit_aliases", impl._omit_aliases)
2090 return cls(**kw)
2091
2092 def create(self, bind=None, checkfirst=True):
2093 """Emit ``CREATE TYPE`` for this
2094 :class:`_postgresql.ENUM`.
2095
2096 If the underlying dialect does not support
2097 PostgreSQL CREATE TYPE, no action is taken.
2098
2099 :param bind: a connectable :class:`_engine.Engine`,
2100 :class:`_engine.Connection`, or similar object to emit
2101 SQL.
2102 :param checkfirst: if ``True``, a query against
2103 the PG catalog will be first performed to see
2104 if the type does not exist already before
2105 creating.
2106
2107 """
2108 if not bind.dialect.supports_native_enum:
2109 return
2110
2111 bind._run_ddl_visitor(self.EnumGenerator, self, checkfirst=checkfirst)
2112
2113 def drop(self, bind=None, checkfirst=True):
2114 """Emit ``DROP TYPE`` for this
2115 :class:`_postgresql.ENUM`.
2116
2117 If the underlying dialect does not support
2118 PostgreSQL DROP TYPE, no action is taken.
2119
2120 :param bind: a connectable :class:`_engine.Engine`,
2121 :class:`_engine.Connection`, or similar object to emit
2122 SQL.
2123 :param checkfirst: if ``True``, a query against
2124 the PG catalog will be first performed to see
2125 if the type actually exists before dropping.
2126
2127 """
2128 if not bind.dialect.supports_native_enum:
2129 return
2130
2131 bind._run_ddl_visitor(self.EnumDropper, self, checkfirst=checkfirst)
2132
2133 class EnumGenerator(DDLBase):
2134 def __init__(self, dialect, connection, checkfirst=False, **kwargs):
2135 super(ENUM.EnumGenerator, self).__init__(connection, **kwargs)
2136 self.checkfirst = checkfirst
2137
2138 def _can_create_enum(self, enum):
2139 if not self.checkfirst:
2140 return True
2141
2142 effective_schema = self.connection.schema_for_object(enum)
2143
2144 return not self.connection.dialect.has_type(
2145 self.connection, enum.name, schema=effective_schema
2146 )
2147
2148 def visit_enum(self, enum):
2149 if not self._can_create_enum(enum):
2150 return
2151
2152 self.connection.execute(CreateEnumType(enum))
2153
2154 class EnumDropper(DDLBase):
2155 def __init__(self, dialect, connection, checkfirst=False, **kwargs):
2156 super(ENUM.EnumDropper, self).__init__(connection, **kwargs)
2157 self.checkfirst = checkfirst
2158
2159 def _can_drop_enum(self, enum):
2160 if not self.checkfirst:
2161 return True
2162
2163 effective_schema = self.connection.schema_for_object(enum)
2164
2165 return self.connection.dialect.has_type(
2166 self.connection, enum.name, schema=effective_schema
2167 )
2168
2169 def visit_enum(self, enum):
2170 if not self._can_drop_enum(enum):
2171 return
2172
2173 self.connection.execute(DropEnumType(enum))
2174
2175 def _check_for_name_in_memos(self, checkfirst, kw):
2176 """Look in the 'ddl runner' for 'memos', then
2177 note our name in that collection.
2178
2179 This to ensure a particular named enum is operated
2180 upon only once within any kind of create/drop
2181 sequence without relying upon "checkfirst".
2182
2183 """
2184 if not self.create_type:
2185 return True
2186 if "_ddl_runner" in kw:
2187 ddl_runner = kw["_ddl_runner"]
2188 if "_pg_enums" in ddl_runner.memo:
2189 pg_enums = ddl_runner.memo["_pg_enums"]
2190 else:
2191 pg_enums = ddl_runner.memo["_pg_enums"] = set()
2192 present = (self.schema, self.name) in pg_enums
2193 pg_enums.add((self.schema, self.name))
2194 return present
2195 else:
2196 return False
2197
2198 def _on_table_create(self, target, bind, checkfirst=False, **kw):
2199 if (
2200 checkfirst
2201 or (
2202 not self.metadata
2203 and not kw.get("_is_metadata_operation", False)
2204 )
2205 ) and not self._check_for_name_in_memos(checkfirst, kw):
2206 self.create(bind=bind, checkfirst=checkfirst)
2207
2208 def _on_table_drop(self, target, bind, checkfirst=False, **kw):
2209 if (
2210 not self.metadata
2211 and not kw.get("_is_metadata_operation", False)
2212 and not self._check_for_name_in_memos(checkfirst, kw)
2213 ):
2214 self.drop(bind=bind, checkfirst=checkfirst)
2215
2216 def _on_metadata_create(self, target, bind, checkfirst=False, **kw):
2217 if not self._check_for_name_in_memos(checkfirst, kw):
2218 self.create(bind=bind, checkfirst=checkfirst)
2219
2220 def _on_metadata_drop(self, target, bind, checkfirst=False, **kw):
2221 if not self._check_for_name_in_memos(checkfirst, kw):
2222 self.drop(bind=bind, checkfirst=checkfirst)
2223
2224
2225class _ColonCast(elements.Cast):
2226 __visit_name__ = "colon_cast"
2227
2228 def __init__(self, expression, type_):
2229 self.type = type_
2230 self.clause = expression
2231 self.typeclause = elements.TypeClause(type_)
2232
2233
2234colspecs = {
2235 sqltypes.ARRAY: _array.ARRAY,
2236 sqltypes.Interval: INTERVAL,
2237 sqltypes.Enum: ENUM,
2238 sqltypes.JSON.JSONPathType: _json.JSONPathType,
2239 sqltypes.JSON: _json.JSON,
2240}
2241
2242
2243ischema_names = {
2244 "_array": _array.ARRAY,
2245 "hstore": _hstore.HSTORE,
2246 "json": _json.JSON,
2247 "jsonb": _json.JSONB,
2248 "int4range": _ranges.INT4RANGE,
2249 "int8range": _ranges.INT8RANGE,
2250 "numrange": _ranges.NUMRANGE,
2251 "daterange": _ranges.DATERANGE,
2252 "tsrange": _ranges.TSRANGE,
2253 "tstzrange": _ranges.TSTZRANGE,
2254 "integer": INTEGER,
2255 "bigint": BIGINT,
2256 "smallint": SMALLINT,
2257 "character varying": VARCHAR,
2258 "character": CHAR,
2259 '"char"': sqltypes.String,
2260 "name": sqltypes.String,
2261 "text": TEXT,
2262 "numeric": NUMERIC,
2263 "float": FLOAT,
2264 "real": REAL,
2265 "inet": INET,
2266 "cidr": CIDR,
2267 "uuid": UUID,
2268 "bit": BIT,
2269 "bit varying": BIT,
2270 "macaddr": MACADDR,
2271 "macaddr8": MACADDR8,
2272 "money": MONEY,
2273 "oid": OID,
2274 "regclass": REGCLASS,
2275 "double precision": DOUBLE_PRECISION,
2276 "timestamp": TIMESTAMP,
2277 "timestamp with time zone": TIMESTAMP,
2278 "timestamp without time zone": TIMESTAMP,
2279 "time with time zone": TIME,
2280 "time without time zone": TIME,
2281 "date": DATE,
2282 "time": TIME,
2283 "bytea": BYTEA,
2284 "boolean": BOOLEAN,
2285 "interval": INTERVAL,
2286 "tsvector": TSVECTOR,
2287}
2288
2289
2290class PGCompiler(compiler.SQLCompiler):
2291 def visit_colon_cast(self, element, **kw):
2292 return "%s::%s" % (
2293 element.clause._compiler_dispatch(self, **kw),
2294 element.typeclause._compiler_dispatch(self, **kw),
2295 )
2296
2297 def visit_array(self, element, **kw):
2298 return "ARRAY[%s]" % self.visit_clauselist(element, **kw)
2299
2300 def visit_slice(self, element, **kw):
2301 return "%s:%s" % (
2302 self.process(element.start, **kw),
2303 self.process(element.stop, **kw),
2304 )
2305
2306 def visit_json_getitem_op_binary(
2307 self, binary, operator, _cast_applied=False, **kw
2308 ):
2309 if (
2310 not _cast_applied
2311 and binary.type._type_affinity is not sqltypes.JSON
2312 ):
2313 kw["_cast_applied"] = True
2314 return self.process(sql.cast(binary, binary.type), **kw)
2315
2316 kw["eager_grouping"] = True
2317
2318 return self._generate_generic_binary(
2319 binary, " -> " if not _cast_applied else " ->> ", **kw
2320 )
2321
2322 def visit_json_path_getitem_op_binary(
2323 self, binary, operator, _cast_applied=False, **kw
2324 ):
2325 if (
2326 not _cast_applied
2327 and binary.type._type_affinity is not sqltypes.JSON
2328 ):
2329 kw["_cast_applied"] = True
2330 return self.process(sql.cast(binary, binary.type), **kw)
2331
2332 kw["eager_grouping"] = True
2333 return self._generate_generic_binary(
2334 binary, " #> " if not _cast_applied else " #>> ", **kw
2335 )
2336
2337 def visit_getitem_binary(self, binary, operator, **kw):
2338 return "%s[%s]" % (
2339 self.process(binary.left, **kw),
2340 self.process(binary.right, **kw),
2341 )
2342
2343 def visit_aggregate_order_by(self, element, **kw):
2344 return "%s ORDER BY %s" % (
2345 self.process(element.target, **kw),
2346 self.process(element.order_by, **kw),
2347 )
2348
2349 def visit_match_op_binary(self, binary, operator, **kw):
2350 if "postgresql_regconfig" in binary.modifiers:
2351 regconfig = self.render_literal_value(
2352 binary.modifiers["postgresql_regconfig"], sqltypes.STRINGTYPE
2353 )
2354 if regconfig:
2355 return "%s @@ to_tsquery(%s, %s)" % (
2356 self.process(binary.left, **kw),
2357 regconfig,
2358 self.process(binary.right, **kw),
2359 )
2360 return "%s @@ to_tsquery(%s)" % (
2361 self.process(binary.left, **kw),
2362 self.process(binary.right, **kw),
2363 )
2364
2365 def visit_ilike_op_binary(self, binary, operator, **kw):
2366 escape = binary.modifiers.get("escape", None)
2367
2368 return "%s ILIKE %s" % (
2369 self.process(binary.left, **kw),
2370 self.process(binary.right, **kw),
2371 ) + (
2372 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
2373 if escape
2374 else ""
2375 )
2376
2377 def visit_not_ilike_op_binary(self, binary, operator, **kw):
2378 escape = binary.modifiers.get("escape", None)
2379 return "%s NOT ILIKE %s" % (
2380 self.process(binary.left, **kw),
2381 self.process(binary.right, **kw),
2382 ) + (
2383 " ESCAPE " + self.render_literal_value(escape, sqltypes.STRINGTYPE)
2384 if escape
2385 else ""
2386 )
2387
2388 def _regexp_match(self, base_op, binary, operator, kw):
2389 flags = binary.modifiers["flags"]
2390 if flags is None:
2391 return self._generate_generic_binary(
2392 binary, " %s " % base_op, **kw
2393 )
2394 if flags == "i":
2395 return self._generate_generic_binary(
2396 binary, " %s* " % base_op, **kw
2397 )
2398 return "%s %s CONCAT('(?', %s, ')', %s)" % (
2399 self.process(binary.left, **kw),
2400 base_op,
2401 self.render_literal_value(flags, sqltypes.STRINGTYPE),
2402 self.process(binary.right, **kw),
2403 )
2404
2405 def visit_regexp_match_op_binary(self, binary, operator, **kw):
2406 return self._regexp_match("~", binary, operator, kw)
2407
2408 def visit_not_regexp_match_op_binary(self, binary, operator, **kw):
2409 return self._regexp_match("!~", binary, operator, kw)
2410
2411 def visit_regexp_replace_op_binary(self, binary, operator, **kw):
2412 string = self.process(binary.left, **kw)
2413 pattern_replace = self.process(binary.right, **kw)
2414 flags = binary.modifiers["flags"]
2415 if flags is None:
2416 return "REGEXP_REPLACE(%s, %s)" % (
2417 string,
2418 pattern_replace,
2419 )
2420 else:
2421 return "REGEXP_REPLACE(%s, %s, %s)" % (
2422 string,
2423 pattern_replace,
2424 self.render_literal_value(flags, sqltypes.STRINGTYPE),
2425 )
2426
2427 def visit_empty_set_expr(self, element_types):
2428 # cast the empty set to the type we are comparing against. if
2429 # we are comparing against the null type, pick an arbitrary
2430 # datatype for the empty set
2431 return "SELECT %s WHERE 1!=1" % (
2432 ", ".join(
2433 "CAST(NULL AS %s)"
2434 % self.dialect.type_compiler.process(
2435 INTEGER() if type_._isnull else type_
2436 )
2437 for type_ in element_types or [INTEGER()]
2438 ),
2439 )
2440
2441 def render_literal_value(self, value, type_):
2442 value = super(PGCompiler, self).render_literal_value(value, type_)
2443
2444 if self.dialect._backslash_escapes:
2445 value = value.replace("\\", "\\\\")
2446 return value
2447
2448 def visit_sequence(self, seq, **kw):
2449 return "nextval('%s')" % self.preparer.format_sequence(seq)
2450
2451 def limit_clause(self, select, **kw):
2452 text = ""
2453 if select._limit_clause is not None:
2454 text += " \n LIMIT " + self.process(select._limit_clause, **kw)
2455 if select._offset_clause is not None:
2456 if select._limit_clause is None:
2457 text += "\n LIMIT ALL"
2458 text += " OFFSET " + self.process(select._offset_clause, **kw)
2459 return text
2460
2461 def format_from_hint_text(self, sqltext, table, hint, iscrud):
2462 if hint.upper() != "ONLY":
2463 raise exc.CompileError("Unrecognized hint: %r" % hint)
2464 return "ONLY " + sqltext
2465
2466 def get_select_precolumns(self, select, **kw):
2467 # Do not call super().get_select_precolumns because
2468 # it will warn/raise when distinct on is present
2469 if select._distinct or select._distinct_on:
2470 if select._distinct_on:
2471 return (
2472 "DISTINCT ON ("
2473 + ", ".join(
2474 [
2475 self.process(col, **kw)
2476 for col in select._distinct_on
2477 ]
2478 )
2479 + ") "
2480 )
2481 else:
2482 return "DISTINCT "
2483 else:
2484 return ""
2485
2486 def for_update_clause(self, select, **kw):
2487
2488 if select._for_update_arg.read:
2489 if select._for_update_arg.key_share:
2490 tmp = " FOR KEY SHARE"
2491 else:
2492 tmp = " FOR SHARE"
2493 elif select._for_update_arg.key_share:
2494 tmp = " FOR NO KEY UPDATE"
2495 else:
2496 tmp = " FOR UPDATE"
2497
2498 if select._for_update_arg.of:
2499
2500 tables = util.OrderedSet()
2501 for c in select._for_update_arg.of:
2502 tables.update(sql_util.surface_selectables_only(c))
2503
2504 tmp += " OF " + ", ".join(
2505 self.process(table, ashint=True, use_schema=False, **kw)
2506 for table in tables
2507 )
2508
2509 if select._for_update_arg.nowait:
2510 tmp += " NOWAIT"
2511 if select._for_update_arg.skip_locked:
2512 tmp += " SKIP LOCKED"
2513
2514 return tmp
2515
2516 def returning_clause(self, stmt, returning_cols):
2517
2518 columns = [
2519 self._label_returning_column(
2520 stmt, c, fallback_label_name=c._non_anon_label
2521 )
2522 for c in expression._select_iterables(returning_cols)
2523 ]
2524
2525 return "RETURNING " + ", ".join(columns)
2526
2527 def visit_substring_func(self, func, **kw):
2528 s = self.process(func.clauses.clauses[0], **kw)
2529 start = self.process(func.clauses.clauses[1], **kw)
2530 if len(func.clauses.clauses) > 2:
2531 length = self.process(func.clauses.clauses[2], **kw)
2532 return "SUBSTRING(%s FROM %s FOR %s)" % (s, start, length)
2533 else:
2534 return "SUBSTRING(%s FROM %s)" % (s, start)
2535
2536 def _on_conflict_target(self, clause, **kw):
2537
2538 if clause.constraint_target is not None:
2539 # target may be a name of an Index, UniqueConstraint or
2540 # ExcludeConstraint. While there is a separate
2541 # "max_identifier_length" for indexes, PostgreSQL uses the same
2542 # length for all objects so we can use
2543 # truncate_and_render_constraint_name
2544 target_text = (
2545 "ON CONSTRAINT %s"
2546 % self.preparer.truncate_and_render_constraint_name(
2547 clause.constraint_target
2548 )
2549 )
2550 elif clause.inferred_target_elements is not None:
2551 target_text = "(%s)" % ", ".join(
2552 (
2553 self.preparer.quote(c)
2554 if isinstance(c, util.string_types)
2555 else self.process(c, include_table=False, use_schema=False)
2556 )
2557 for c in clause.inferred_target_elements
2558 )
2559 if clause.inferred_target_whereclause is not None:
2560 target_text += " WHERE %s" % self.process(
2561 clause.inferred_target_whereclause,
2562 include_table=False,
2563 use_schema=False,
2564 )
2565 else:
2566 target_text = ""
2567
2568 return target_text
2569
2570 @util.memoized_property
2571 def _is_safe_for_fast_insert_values_helper(self):
2572 # don't allow fast executemany if _post_values_clause is
2573 # present and is not an OnConflictDoNothing. what this means
2574 # concretely is that the
2575 # "fast insert executemany helper" won't be used, in other
2576 # words we won't convert "executemany()" of many parameter
2577 # sets into a single INSERT with many elements in VALUES.
2578 # We can't apply that optimization safely if for example the
2579 # statement includes a clause like "ON CONFLICT DO UPDATE"
2580
2581 return self.insert_single_values_expr is not None and (
2582 self.statement._post_values_clause is None
2583 or isinstance(
2584 self.statement._post_values_clause, dml.OnConflictDoNothing
2585 )
2586 )
2587
2588 def visit_on_conflict_do_nothing(self, on_conflict, **kw):
2589
2590 target_text = self._on_conflict_target(on_conflict, **kw)
2591
2592 if target_text:
2593 return "ON CONFLICT %s DO NOTHING" % target_text
2594 else:
2595 return "ON CONFLICT DO NOTHING"
2596
2597 def visit_on_conflict_do_update(self, on_conflict, **kw):
2598
2599 clause = on_conflict
2600
2601 target_text = self._on_conflict_target(on_conflict, **kw)
2602
2603 action_set_ops = []
2604
2605 set_parameters = dict(clause.update_values_to_set)
2606 # create a list of column assignment clauses as tuples
2607
2608 insert_statement = self.stack[-1]["selectable"]
2609 cols = insert_statement.table.c
2610 for c in cols:
2611 col_key = c.key
2612
2613 if col_key in set_parameters:
2614 value = set_parameters.pop(col_key)
2615 elif c in set_parameters:
2616 value = set_parameters.pop(c)
2617 else:
2618 continue
2619
2620 if coercions._is_literal(value):
2621 value = elements.BindParameter(None, value, type_=c.type)
2622
2623 else:
2624 if (
2625 isinstance(value, elements.BindParameter)
2626 and value.type._isnull
2627 ):
2628 value = value._clone()
2629 value.type = c.type
2630 value_text = self.process(value.self_group(), use_schema=False)
2631
2632 key_text = self.preparer.quote(c.name)
2633 action_set_ops.append("%s = %s" % (key_text, value_text))
2634
2635 # check for names that don't match columns
2636 if set_parameters:
2637 util.warn(
2638 "Additional column names not matching "
2639 "any column keys in table '%s': %s"
2640 % (
2641 self.current_executable.table.name,
2642 (", ".join("'%s'" % c for c in set_parameters)),
2643 )
2644 )
2645 for k, v in set_parameters.items():
2646 key_text = (
2647 self.preparer.quote(k)
2648 if isinstance(k, util.string_types)
2649 else self.process(k, use_schema=False)
2650 )
2651 value_text = self.process(
2652 coercions.expect(roles.ExpressionElementRole, v),
2653 use_schema=False,
2654 )
2655 action_set_ops.append("%s = %s" % (key_text, value_text))
2656
2657 action_text = ", ".join(action_set_ops)
2658 if clause.update_whereclause is not None:
2659 action_text += " WHERE %s" % self.process(
2660 clause.update_whereclause, include_table=True, use_schema=False
2661 )
2662
2663 return "ON CONFLICT %s DO UPDATE SET %s" % (target_text, action_text)
2664
2665 def update_from_clause(
2666 self, update_stmt, from_table, extra_froms, from_hints, **kw
2667 ):
2668 kw["asfrom"] = True
2669 return "FROM " + ", ".join(
2670 t._compiler_dispatch(self, fromhints=from_hints, **kw)
2671 for t in extra_froms
2672 )
2673
2674 def delete_extra_from_clause(
2675 self, delete_stmt, from_table, extra_froms, from_hints, **kw
2676 ):
2677 """Render the DELETE .. USING clause specific to PostgreSQL."""
2678 kw["asfrom"] = True
2679 return "USING " + ", ".join(
2680 t._compiler_dispatch(self, fromhints=from_hints, **kw)
2681 for t in extra_froms
2682 )
2683
2684 def fetch_clause(self, select, **kw):
2685 # pg requires parens for non literal clauses. It's also required for
2686 # bind parameters if a ::type casts is used by the driver (asyncpg),
2687 # so it's easiest to just always add it
2688 text = ""
2689 if select._offset_clause is not None:
2690 text += "\n OFFSET (%s) ROWS" % self.process(
2691 select._offset_clause, **kw
2692 )
2693 if select._fetch_clause is not None:
2694 text += "\n FETCH FIRST (%s)%s ROWS %s" % (
2695 self.process(select._fetch_clause, **kw),
2696 " PERCENT" if select._fetch_clause_options["percent"] else "",
2697 "WITH TIES"
2698 if select._fetch_clause_options["with_ties"]
2699 else "ONLY",
2700 )
2701 return text
2702
2703
2704class PGDDLCompiler(compiler.DDLCompiler):
2705 def get_column_specification(self, column, **kwargs):
2706
2707 colspec = self.preparer.format_column(column)
2708 impl_type = column.type.dialect_impl(self.dialect)
2709 if isinstance(impl_type, sqltypes.TypeDecorator):
2710 impl_type = impl_type.impl
2711
2712 has_identity = (
2713 column.identity is not None
2714 and self.dialect.supports_identity_columns
2715 )
2716
2717 if (
2718 column.primary_key
2719 and column is column.table._autoincrement_column
2720 and (
2721 self.dialect.supports_smallserial
2722 or not isinstance(impl_type, sqltypes.SmallInteger)
2723 )
2724 and not has_identity
2725 and (
2726 column.default is None
2727 or (
2728 isinstance(column.default, schema.Sequence)
2729 and column.default.optional
2730 )
2731 )
2732 ):
2733 if isinstance(impl_type, sqltypes.BigInteger):
2734 colspec += " BIGSERIAL"
2735 elif isinstance(impl_type, sqltypes.SmallInteger):
2736 colspec += " SMALLSERIAL"
2737 else:
2738 colspec += " SERIAL"
2739 else:
2740 colspec += " " + self.dialect.type_compiler.process(
2741 column.type,
2742 type_expression=column,
2743 identifier_preparer=self.preparer,
2744 )
2745 default = self.get_column_default_string(column)
2746 if default is not None:
2747 colspec += " DEFAULT " + default
2748
2749 if column.computed is not None:
2750 colspec += " " + self.process(column.computed)
2751 if has_identity:
2752 colspec += " " + self.process(column.identity)
2753
2754 if not column.nullable and not has_identity:
2755 colspec += " NOT NULL"
2756 elif column.nullable and has_identity:
2757 colspec += " NULL"
2758 return colspec
2759
2760 def _define_constraint_validity(self, constraint):
2761 not_valid = constraint.dialect_options["postgresql"]["not_valid"]
2762 return " NOT VALID" if not_valid else ""
2763
2764 def visit_check_constraint(self, constraint):
2765 if constraint._type_bound:
2766 typ = list(constraint.columns)[0].type
2767 if (
2768 isinstance(typ, sqltypes.ARRAY)
2769 and isinstance(typ.item_type, sqltypes.Enum)
2770 and not typ.item_type.native_enum
2771 ):
2772 raise exc.CompileError(
2773 "PostgreSQL dialect cannot produce the CHECK constraint "
2774 "for ARRAY of non-native ENUM; please specify "
2775 "create_constraint=False on this Enum datatype."
2776 )
2777
2778 text = super(PGDDLCompiler, self).visit_check_constraint(constraint)
2779 text += self._define_constraint_validity(constraint)
2780 return text
2781
2782 def visit_foreign_key_constraint(self, constraint):
2783 text = super(PGDDLCompiler, self).visit_foreign_key_constraint(
2784 constraint
2785 )
2786 text += self._define_constraint_validity(constraint)
2787 return text
2788
2789 def visit_drop_table_comment(self, drop):
2790 return "COMMENT ON TABLE %s IS NULL" % self.preparer.format_table(
2791 drop.element
2792 )
2793
2794 def visit_create_enum_type(self, create):
2795 type_ = create.element
2796
2797 return "CREATE TYPE %s AS ENUM (%s)" % (
2798 self.preparer.format_type(type_),
2799 ", ".join(
2800 self.sql_compiler.process(sql.literal(e), literal_binds=True)
2801 for e in type_.enums
2802 ),
2803 )
2804
2805 def visit_drop_enum_type(self, drop):
2806 type_ = drop.element
2807
2808 return "DROP TYPE %s" % (self.preparer.format_type(type_))
2809
2810 def visit_create_index(self, create):
2811 preparer = self.preparer
2812 index = create.element
2813 self._verify_index_table(index)
2814 text = "CREATE "
2815 if index.unique:
2816 text += "UNIQUE "
2817 text += "INDEX "
2818
2819 if self.dialect._supports_create_index_concurrently:
2820 concurrently = index.dialect_options["postgresql"]["concurrently"]
2821 if concurrently:
2822 text += "CONCURRENTLY "
2823
2824 if create.if_not_exists:
2825 text += "IF NOT EXISTS "
2826
2827 text += "%s ON %s " % (
2828 self._prepared_index_name(index, include_schema=False),
2829 preparer.format_table(index.table),
2830 )
2831
2832 using = index.dialect_options["postgresql"]["using"]
2833 if using:
2834 text += (
2835 "USING %s "
2836 % self.preparer.validate_sql_phrase(using, IDX_USING).lower()
2837 )
2838
2839 ops = index.dialect_options["postgresql"]["ops"]
2840 text += "(%s)" % (
2841 ", ".join(
2842 [
2843 self.sql_compiler.process(
2844 expr.self_group()
2845 if not isinstance(expr, expression.ColumnClause)
2846 else expr,
2847 include_table=False,
2848 literal_binds=True,
2849 )
2850 + (
2851 (" " + ops[expr.key])
2852 if hasattr(expr, "key") and expr.key in ops
2853 else ""
2854 )
2855 for expr in index.expressions
2856 ]
2857 )
2858 )
2859
2860 includeclause = index.dialect_options["postgresql"]["include"]
2861 if includeclause:
2862 inclusions = [
2863 index.table.c[col]
2864 if isinstance(col, util.string_types)
2865 else col
2866 for col in includeclause
2867 ]
2868 text += " INCLUDE (%s)" % ", ".join(
2869 [preparer.quote(c.name) for c in inclusions]
2870 )
2871
2872 withclause = index.dialect_options["postgresql"]["with"]
2873 if withclause:
2874 text += " WITH (%s)" % (
2875 ", ".join(
2876 [
2877 "%s = %s" % storage_parameter
2878 for storage_parameter in withclause.items()
2879 ]
2880 )
2881 )
2882
2883 tablespace_name = index.dialect_options["postgresql"]["tablespace"]
2884 if tablespace_name:
2885 text += " TABLESPACE %s" % preparer.quote(tablespace_name)
2886
2887 whereclause = index.dialect_options["postgresql"]["where"]
2888 if whereclause is not None:
2889 whereclause = coercions.expect(
2890 roles.DDLExpressionRole, whereclause
2891 )
2892
2893 where_compiled = self.sql_compiler.process(
2894 whereclause, include_table=False, literal_binds=True
2895 )
2896 text += " WHERE " + where_compiled
2897
2898 return text
2899
2900 def visit_drop_index(self, drop):
2901 index = drop.element
2902
2903 text = "\nDROP INDEX "
2904
2905 if self.dialect._supports_drop_index_concurrently:
2906 concurrently = index.dialect_options["postgresql"]["concurrently"]
2907 if concurrently:
2908 text += "CONCURRENTLY "
2909
2910 if drop.if_exists:
2911 text += "IF EXISTS "
2912
2913 text += self._prepared_index_name(index, include_schema=True)
2914 return text
2915
2916 def visit_exclude_constraint(self, constraint, **kw):
2917 text = ""
2918 if constraint.name is not None:
2919 text += "CONSTRAINT %s " % self.preparer.format_constraint(
2920 constraint
2921 )
2922 elements = []
2923 for expr, name, op in constraint._render_exprs:
2924 kw["include_table"] = False
2925 exclude_element = self.sql_compiler.process(expr, **kw) + (
2926 (" " + constraint.ops[expr.key])
2927 if hasattr(expr, "key") and expr.key in constraint.ops
2928 else ""
2929 )
2930
2931 elements.append("%s WITH %s" % (exclude_element, op))
2932 text += "EXCLUDE USING %s (%s)" % (
2933 self.preparer.validate_sql_phrase(
2934 constraint.using, IDX_USING
2935 ).lower(),
2936 ", ".join(elements),
2937 )
2938 if constraint.where is not None:
2939 text += " WHERE (%s)" % self.sql_compiler.process(
2940 constraint.where, literal_binds=True
2941 )
2942 text += self.define_constraint_deferrability(constraint)
2943 return text
2944
2945 def post_create_table(self, table):
2946 table_opts = []
2947 pg_opts = table.dialect_options["postgresql"]
2948
2949 inherits = pg_opts.get("inherits")
2950 if inherits is not None:
2951 if not isinstance(inherits, (list, tuple)):
2952 inherits = (inherits,)
2953 table_opts.append(
2954 "\n INHERITS ( "
2955 + ", ".join(self.preparer.quote(name) for name in inherits)
2956 + " )"
2957 )
2958
2959 if pg_opts["partition_by"]:
2960 table_opts.append("\n PARTITION BY %s" % pg_opts["partition_by"])
2961
2962 if pg_opts["with_oids"] is True:
2963 table_opts.append("\n WITH OIDS")
2964 elif pg_opts["with_oids"] is False:
2965 table_opts.append("\n WITHOUT OIDS")
2966
2967 if pg_opts["on_commit"]:
2968 on_commit_options = pg_opts["on_commit"].replace("_", " ").upper()
2969 table_opts.append("\n ON COMMIT %s" % on_commit_options)
2970
2971 if pg_opts["tablespace"]:
2972 tablespace_name = pg_opts["tablespace"]
2973 table_opts.append(
2974 "\n TABLESPACE %s" % self.preparer.quote(tablespace_name)
2975 )
2976
2977 return "".join(table_opts)
2978
2979 def visit_computed_column(self, generated):
2980 if generated.persisted is False:
2981 raise exc.CompileError(
2982 "PostrgreSQL computed columns do not support 'virtual' "
2983 "persistence; set the 'persisted' flag to None or True for "
2984 "PostgreSQL support."
2985 )
2986
2987 return "GENERATED ALWAYS AS (%s) STORED" % self.sql_compiler.process(
2988 generated.sqltext, include_table=False, literal_binds=True
2989 )
2990
2991 def visit_create_sequence(self, create, **kw):
2992 prefix = None
2993 if create.element.data_type is not None:
2994 prefix = " AS %s" % self.type_compiler.process(
2995 create.element.data_type
2996 )
2997
2998 return super(PGDDLCompiler, self).visit_create_sequence(
2999 create, prefix=prefix, **kw
3000 )
3001
3002
3003class PGTypeCompiler(compiler.GenericTypeCompiler):
3004 def visit_TSVECTOR(self, type_, **kw):
3005 return "TSVECTOR"
3006
3007 def visit_INET(self, type_, **kw):
3008 return "INET"
3009
3010 def visit_CIDR(self, type_, **kw):
3011 return "CIDR"
3012
3013 def visit_MACADDR(self, type_, **kw):
3014 return "MACADDR"
3015
3016 def visit_MACADDR8(self, type_, **kw):
3017 return "MACADDR8"
3018
3019 def visit_MONEY(self, type_, **kw):
3020 return "MONEY"
3021
3022 def visit_OID(self, type_, **kw):
3023 return "OID"
3024
3025 def visit_REGCLASS(self, type_, **kw):
3026 return "REGCLASS"
3027
3028 def visit_FLOAT(self, type_, **kw):
3029 if not type_.precision:
3030 return "FLOAT"
3031 else:
3032 return "FLOAT(%(precision)s)" % {"precision": type_.precision}
3033
3034 def visit_DOUBLE_PRECISION(self, type_, **kw):
3035 return "DOUBLE PRECISION"
3036
3037 def visit_BIGINT(self, type_, **kw):
3038 return "BIGINT"
3039
3040 def visit_HSTORE(self, type_, **kw):
3041 return "HSTORE"
3042
3043 def visit_JSON(self, type_, **kw):
3044 return "JSON"
3045
3046 def visit_JSONB(self, type_, **kw):
3047 return "JSONB"
3048
3049 def visit_INT4RANGE(self, type_, **kw):
3050 return "INT4RANGE"
3051
3052 def visit_INT8RANGE(self, type_, **kw):
3053 return "INT8RANGE"
3054
3055 def visit_NUMRANGE(self, type_, **kw):
3056 return "NUMRANGE"
3057
3058 def visit_DATERANGE(self, type_, **kw):
3059 return "DATERANGE"
3060
3061 def visit_TSRANGE(self, type_, **kw):
3062 return "TSRANGE"
3063
3064 def visit_TSTZRANGE(self, type_, **kw):
3065 return "TSTZRANGE"
3066
3067 def visit_datetime(self, type_, **kw):
3068 return self.visit_TIMESTAMP(type_, **kw)
3069
3070 def visit_enum(self, type_, **kw):
3071 if not type_.native_enum or not self.dialect.supports_native_enum:
3072 return super(PGTypeCompiler, self).visit_enum(type_, **kw)
3073 else:
3074 return self.visit_ENUM(type_, **kw)
3075
3076 def visit_ENUM(self, type_, identifier_preparer=None, **kw):
3077 if identifier_preparer is None:
3078 identifier_preparer = self.dialect.identifier_preparer
3079 return identifier_preparer.format_type(type_)
3080
3081 def visit_TIMESTAMP(self, type_, **kw):
3082 return "TIMESTAMP%s %s" % (
3083 "(%d)" % type_.precision
3084 if getattr(type_, "precision", None) is not None
3085 else "",
3086 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
3087 )
3088
3089 def visit_TIME(self, type_, **kw):
3090 return "TIME%s %s" % (
3091 "(%d)" % type_.precision
3092 if getattr(type_, "precision", None) is not None
3093 else "",
3094 (type_.timezone and "WITH" or "WITHOUT") + " TIME ZONE",
3095 )
3096
3097 def visit_INTERVAL(self, type_, **kw):
3098 text = "INTERVAL"
3099 if type_.fields is not None:
3100 text += " " + type_.fields
3101 if type_.precision is not None:
3102 text += " (%d)" % type_.precision
3103 return text
3104
3105 def visit_BIT(self, type_, **kw):
3106 if type_.varying:
3107 compiled = "BIT VARYING"
3108 if type_.length is not None:
3109 compiled += "(%d)" % type_.length
3110 else:
3111 compiled = "BIT(%d)" % type_.length
3112 return compiled
3113
3114 def visit_UUID(self, type_, **kw):
3115 return "UUID"
3116
3117 def visit_large_binary(self, type_, **kw):
3118 return self.visit_BYTEA(type_, **kw)
3119
3120 def visit_BYTEA(self, type_, **kw):
3121 return "BYTEA"
3122
3123 def visit_ARRAY(self, type_, **kw):
3124
3125 inner = self.process(type_.item_type, **kw)
3126 return re.sub(
3127 r"((?: COLLATE.*)?)$",
3128 (
3129 r"%s\1"
3130 % (
3131 "[]"
3132 * (type_.dimensions if type_.dimensions is not None else 1)
3133 )
3134 ),
3135 inner,
3136 count=1,
3137 )
3138
3139
3140class PGIdentifierPreparer(compiler.IdentifierPreparer):
3141
3142 reserved_words = RESERVED_WORDS
3143
3144 def _unquote_identifier(self, value):
3145 if value[0] == self.initial_quote:
3146 value = value[1:-1].replace(
3147 self.escape_to_quote, self.escape_quote
3148 )
3149 return value
3150
3151 def format_type(self, type_, use_schema=True):
3152 if not type_.name:
3153 raise exc.CompileError("PostgreSQL ENUM type requires a name.")
3154
3155 name = self.quote(type_.name)
3156 effective_schema = self.schema_for_object(type_)
3157
3158 if (
3159 not self.omit_schema
3160 and use_schema
3161 and effective_schema is not None
3162 ):
3163 name = self.quote_schema(effective_schema) + "." + name
3164 return name
3165
3166
3167class PGInspector(reflection.Inspector):
3168 def get_table_oid(self, table_name, schema=None):
3169 """Return the OID for the given table name."""
3170
3171 with self._operation_context() as conn:
3172 return self.dialect.get_table_oid(
3173 conn, table_name, schema, info_cache=self.info_cache
3174 )
3175
3176 def get_enums(self, schema=None):
3177 """Return a list of ENUM objects.
3178
3179 Each member is a dictionary containing these fields:
3180
3181 * name - name of the enum
3182 * schema - the schema name for the enum.
3183 * visible - boolean, whether or not this enum is visible
3184 in the default search path.
3185 * labels - a list of string labels that apply to the enum.
3186
3187 :param schema: schema name. If None, the default schema
3188 (typically 'public') is used. May also be set to '*' to
3189 indicate load enums for all schemas.
3190
3191 .. versionadded:: 1.0.0
3192
3193 """
3194 schema = schema or self.default_schema_name
3195 with self._operation_context() as conn:
3196 return self.dialect._load_enums(conn, schema)
3197
3198 def get_foreign_table_names(self, schema=None):
3199 """Return a list of FOREIGN TABLE names.
3200
3201 Behavior is similar to that of
3202 :meth:`_reflection.Inspector.get_table_names`,
3203 except that the list is limited to those tables that report a
3204 ``relkind`` value of ``f``.
3205
3206 .. versionadded:: 1.0.0
3207
3208 """
3209 schema = schema or self.default_schema_name
3210 with self._operation_context() as conn:
3211 return self.dialect._get_foreign_table_names(conn, schema)
3212
3213 def get_view_names(self, schema=None, include=("plain", "materialized")):
3214 """Return all view names in `schema`.
3215
3216 :param schema: Optional, retrieve names from a non-default schema.
3217 For special quoting, use :class:`.quoted_name`.
3218
3219 :param include: specify which types of views to return. Passed
3220 as a string value (for a single type) or a tuple (for any number
3221 of types). Defaults to ``('plain', 'materialized')``.
3222
3223 .. versionadded:: 1.1
3224
3225 """
3226
3227 with self._operation_context() as conn:
3228 return self.dialect.get_view_names(
3229 conn, schema, info_cache=self.info_cache, include=include
3230 )
3231
3232
3233class CreateEnumType(schema._CreateDropBase):
3234 __visit_name__ = "create_enum_type"
3235
3236
3237class DropEnumType(schema._CreateDropBase):
3238 __visit_name__ = "drop_enum_type"
3239
3240
3241class PGExecutionContext(default.DefaultExecutionContext):
3242 def fire_sequence(self, seq, type_):
3243 return self._execute_scalar(
3244 (
3245 "select nextval('%s')"
3246 % self.identifier_preparer.format_sequence(seq)
3247 ),
3248 type_,
3249 )
3250
3251 def get_insert_default(self, column):
3252 if column.primary_key and column is column.table._autoincrement_column:
3253 if column.server_default and column.server_default.has_argument:
3254
3255 # pre-execute passive defaults on primary key columns
3256 return self._execute_scalar(
3257 "select %s" % column.server_default.arg, column.type
3258 )
3259
3260 elif column.default is None or (
3261 column.default.is_sequence and column.default.optional
3262 ):
3263 # execute the sequence associated with a SERIAL primary
3264 # key column. for non-primary-key SERIAL, the ID just
3265 # generates server side.
3266
3267 try:
3268 seq_name = column._postgresql_seq_name
3269 except AttributeError:
3270 tab = column.table.name
3271 col = column.name
3272 tab = tab[0 : 29 + max(0, (29 - len(col)))]
3273 col = col[0 : 29 + max(0, (29 - len(tab)))]
3274 name = "%s_%s_seq" % (tab, col)
3275 column._postgresql_seq_name = seq_name = name
3276
3277 if column.table is not None:
3278 effective_schema = self.connection.schema_for_object(
3279 column.table
3280 )
3281 else:
3282 effective_schema = None
3283
3284 if effective_schema is not None:
3285 exc = 'select nextval(\'"%s"."%s"\')' % (
3286 effective_schema,
3287 seq_name,
3288 )
3289 else:
3290 exc = "select nextval('\"%s\"')" % (seq_name,)
3291
3292 return self._execute_scalar(exc, column.type)
3293
3294 return super(PGExecutionContext, self).get_insert_default(column)
3295
3296 def should_autocommit_text(self, statement):
3297 return AUTOCOMMIT_REGEXP.match(statement)
3298
3299
3300class PGReadOnlyConnectionCharacteristic(
3301 characteristics.ConnectionCharacteristic
3302):
3303 transactional = True
3304
3305 def reset_characteristic(self, dialect, dbapi_conn):
3306 dialect.set_readonly(dbapi_conn, False)
3307
3308 def set_characteristic(self, dialect, dbapi_conn, value):
3309 dialect.set_readonly(dbapi_conn, value)
3310
3311 def get_characteristic(self, dialect, dbapi_conn):
3312 return dialect.get_readonly(dbapi_conn)
3313
3314
3315class PGDeferrableConnectionCharacteristic(
3316 characteristics.ConnectionCharacteristic
3317):
3318 transactional = True
3319
3320 def reset_characteristic(self, dialect, dbapi_conn):
3321 dialect.set_deferrable(dbapi_conn, False)
3322
3323 def set_characteristic(self, dialect, dbapi_conn, value):
3324 dialect.set_deferrable(dbapi_conn, value)
3325
3326 def get_characteristic(self, dialect, dbapi_conn):
3327 return dialect.get_deferrable(dbapi_conn)
3328
3329
3330class PGDialect(default.DefaultDialect):
3331 name = "postgresql"
3332 supports_statement_cache = True
3333 supports_alter = True
3334 max_identifier_length = 63
3335 supports_sane_rowcount = True
3336
3337 supports_native_enum = True
3338 supports_native_boolean = True
3339 supports_smallserial = True
3340
3341 supports_sequences = True
3342 sequences_optional = True
3343 preexecute_autoincrement_sequences = True
3344 postfetch_lastrowid = False
3345
3346 supports_comments = True
3347 supports_default_values = True
3348
3349 supports_default_metavalue = True
3350
3351 supports_empty_insert = False
3352 supports_multivalues_insert = True
3353 supports_identity_columns = True
3354
3355 default_paramstyle = "pyformat"
3356 ischema_names = ischema_names
3357 colspecs = colspecs
3358
3359 statement_compiler = PGCompiler
3360 ddl_compiler = PGDDLCompiler
3361 type_compiler = PGTypeCompiler
3362 preparer = PGIdentifierPreparer
3363 execution_ctx_cls = PGExecutionContext
3364 inspector = PGInspector
3365 isolation_level = None
3366
3367 implicit_returning = True
3368 full_returning = True
3369
3370 connection_characteristics = (
3371 default.DefaultDialect.connection_characteristics
3372 )
3373 connection_characteristics = connection_characteristics.union(
3374 {
3375 "postgresql_readonly": PGReadOnlyConnectionCharacteristic(),
3376 "postgresql_deferrable": PGDeferrableConnectionCharacteristic(),
3377 }
3378 )
3379
3380 construct_arguments = [
3381 (
3382 schema.Index,
3383 {
3384 "using": False,
3385 "include": None,
3386 "where": None,
3387 "ops": {},
3388 "concurrently": False,
3389 "with": {},
3390 "tablespace": None,
3391 },
3392 ),
3393 (
3394 schema.Table,
3395 {
3396 "ignore_search_path": False,
3397 "tablespace": None,
3398 "partition_by": None,
3399 "with_oids": None,
3400 "on_commit": None,
3401 "inherits": None,
3402 },
3403 ),
3404 (
3405 schema.CheckConstraint,
3406 {
3407 "not_valid": False,
3408 },
3409 ),
3410 (
3411 schema.ForeignKeyConstraint,
3412 {
3413 "not_valid": False,
3414 },
3415 ),
3416 ]
3417
3418 reflection_options = ("postgresql_ignore_search_path",)
3419
3420 _backslash_escapes = True
3421 _supports_create_index_concurrently = True
3422 _supports_drop_index_concurrently = True
3423
3424 def __init__(
3425 self,
3426 isolation_level=None,
3427 json_serializer=None,
3428 json_deserializer=None,
3429 **kwargs
3430 ):
3431 default.DefaultDialect.__init__(self, **kwargs)
3432
3433 # the isolation_level parameter to the PGDialect itself is legacy.
3434 # still works however the execution_options method is the one that
3435 # is documented.
3436 self.isolation_level = isolation_level
3437 self._json_deserializer = json_deserializer
3438 self._json_serializer = json_serializer
3439
3440 def initialize(self, connection):
3441 super(PGDialect, self).initialize(connection)
3442
3443 if self.server_version_info <= (8, 2):
3444 self.full_returning = self.implicit_returning = False
3445
3446 self.supports_native_enum = self.server_version_info >= (8, 3)
3447 if not self.supports_native_enum:
3448 self.colspecs = self.colspecs.copy()
3449 # pop base Enum type
3450 self.colspecs.pop(sqltypes.Enum, None)
3451 # psycopg2, others may have placed ENUM here as well
3452 self.colspecs.pop(ENUM, None)
3453
3454 # https://www.postgresql.org/docs/9.3/static/release-9-2.html#AEN116689
3455 self.supports_smallserial = self.server_version_info >= (9, 2)
3456
3457 if self.server_version_info < (8, 2):
3458 self._backslash_escapes = False
3459 else:
3460 # ensure this query is not emitted on server version < 8.2
3461 # as it will fail
3462 std_string = connection.exec_driver_sql(
3463 "show standard_conforming_strings"
3464 ).scalar()
3465 self._backslash_escapes = std_string == "off"
3466
3467 self._supports_create_index_concurrently = (
3468 self.server_version_info >= (8, 2)
3469 )
3470 self._supports_drop_index_concurrently = self.server_version_info >= (
3471 9,
3472 2,
3473 )
3474 self.supports_identity_columns = self.server_version_info >= (10,)
3475
3476 def on_connect(self):
3477 if self.isolation_level is not None:
3478
3479 def connect(conn):
3480 self.set_isolation_level(conn, self.isolation_level)
3481
3482 return connect
3483 else:
3484 return None
3485
3486 _isolation_lookup = set(
3487 [
3488 "SERIALIZABLE",
3489 "READ UNCOMMITTED",
3490 "READ COMMITTED",
3491 "REPEATABLE READ",
3492 ]
3493 )
3494
3495 def set_isolation_level(self, connection, level):
3496 level = level.replace("_", " ")
3497 if level not in self._isolation_lookup:
3498 raise exc.ArgumentError(
3499 "Invalid value '%s' for isolation_level. "
3500 "Valid isolation levels for %s are %s"
3501 % (level, self.name, ", ".join(self._isolation_lookup))
3502 )
3503 cursor = connection.cursor()
3504 cursor.execute(
3505 "SET SESSION CHARACTERISTICS AS TRANSACTION "
3506 "ISOLATION LEVEL %s" % level
3507 )
3508 cursor.execute("COMMIT")
3509 cursor.close()
3510
3511 def get_isolation_level(self, connection):
3512 cursor = connection.cursor()
3513 cursor.execute("show transaction isolation level")
3514 val = cursor.fetchone()[0]
3515 cursor.close()
3516 return val.upper()
3517
3518 def set_readonly(self, connection, value):
3519 raise NotImplementedError()
3520
3521 def get_readonly(self, connection):
3522 raise NotImplementedError()
3523
3524 def set_deferrable(self, connection, value):
3525 raise NotImplementedError()
3526
3527 def get_deferrable(self, connection):
3528 raise NotImplementedError()
3529
3530 def do_begin_twophase(self, connection, xid):
3531 self.do_begin(connection.connection)
3532
3533 def do_prepare_twophase(self, connection, xid):
3534 connection.exec_driver_sql("PREPARE TRANSACTION '%s'" % xid)
3535
3536 def do_rollback_twophase(
3537 self, connection, xid, is_prepared=True, recover=False
3538 ):
3539 if is_prepared:
3540 if recover:
3541 # FIXME: ugly hack to get out of transaction
3542 # context when committing recoverable transactions
3543 # Must find out a way how to make the dbapi not
3544 # open a transaction.
3545 connection.exec_driver_sql("ROLLBACK")
3546 connection.exec_driver_sql("ROLLBACK PREPARED '%s'" % xid)
3547 connection.exec_driver_sql("BEGIN")
3548 self.do_rollback(connection.connection)
3549 else:
3550 self.do_rollback(connection.connection)
3551
3552 def do_commit_twophase(
3553 self, connection, xid, is_prepared=True, recover=False
3554 ):
3555 if is_prepared:
3556 if recover:
3557 connection.exec_driver_sql("ROLLBACK")
3558 connection.exec_driver_sql("COMMIT PREPARED '%s'" % xid)
3559 connection.exec_driver_sql("BEGIN")
3560 self.do_rollback(connection.connection)
3561 else:
3562 self.do_commit(connection.connection)
3563
3564 def do_recover_twophase(self, connection):
3565 resultset = connection.execute(
3566 sql.text("SELECT gid FROM pg_prepared_xacts")
3567 )
3568 return [row[0] for row in resultset]
3569
3570 def _get_default_schema_name(self, connection):
3571 return connection.exec_driver_sql("select current_schema()").scalar()
3572
3573 def has_schema(self, connection, schema):
3574 query = (
3575 "select nspname from pg_namespace " "where lower(nspname)=:schema"
3576 )
3577 cursor = connection.execute(
3578 sql.text(query).bindparams(
3579 sql.bindparam(
3580 "schema",
3581 util.text_type(schema.lower()),
3582 type_=sqltypes.Unicode,
3583 )
3584 )
3585 )
3586
3587 return bool(cursor.first())
3588
3589 def has_table(self, connection, table_name, schema=None):
3590 self._ensure_has_table_connection(connection)
3591 # seems like case gets folded in pg_class...
3592 if schema is None:
3593 cursor = connection.execute(
3594 sql.text(
3595 "select relname from pg_class c join pg_namespace n on "
3596 "n.oid=c.relnamespace where "
3597 "pg_catalog.pg_table_is_visible(c.oid) "
3598 "and relname=:name"
3599 ).bindparams(
3600 sql.bindparam(
3601 "name",
3602 util.text_type(table_name),
3603 type_=sqltypes.Unicode,
3604 )
3605 )
3606 )
3607 else:
3608 cursor = connection.execute(
3609 sql.text(
3610 "select relname from pg_class c join pg_namespace n on "
3611 "n.oid=c.relnamespace where n.nspname=:schema and "
3612 "relname=:name"
3613 ).bindparams(
3614 sql.bindparam(
3615 "name",
3616 util.text_type(table_name),
3617 type_=sqltypes.Unicode,
3618 ),
3619 sql.bindparam(
3620 "schema",
3621 util.text_type(schema),
3622 type_=sqltypes.Unicode,
3623 ),
3624 )
3625 )
3626 return bool(cursor.first())
3627
3628 def has_sequence(self, connection, sequence_name, schema=None):
3629 if schema is None:
3630 schema = self.default_schema_name
3631 cursor = connection.execute(
3632 sql.text(
3633 "SELECT relname FROM pg_class c join pg_namespace n on "
3634 "n.oid=c.relnamespace where relkind='S' and "
3635 "n.nspname=:schema and relname=:name"
3636 ).bindparams(
3637 sql.bindparam(
3638 "name",
3639 util.text_type(sequence_name),
3640 type_=sqltypes.Unicode,
3641 ),
3642 sql.bindparam(
3643 "schema",
3644 util.text_type(schema),
3645 type_=sqltypes.Unicode,
3646 ),
3647 )
3648 )
3649
3650 return bool(cursor.first())
3651
3652 def has_type(self, connection, type_name, schema=None):
3653 if schema is not None:
3654 query = """
3655 SELECT EXISTS (
3656 SELECT * FROM pg_catalog.pg_type t, pg_catalog.pg_namespace n
3657 WHERE t.typnamespace = n.oid
3658 AND t.typname = :typname
3659 AND n.nspname = :nspname
3660 )
3661 """
3662 query = sql.text(query)
3663 else:
3664 query = """
3665 SELECT EXISTS (
3666 SELECT * FROM pg_catalog.pg_type t
3667 WHERE t.typname = :typname
3668 AND pg_type_is_visible(t.oid)
3669 )
3670 """
3671 query = sql.text(query)
3672 query = query.bindparams(
3673 sql.bindparam(
3674 "typname", util.text_type(type_name), type_=sqltypes.Unicode
3675 )
3676 )
3677 if schema is not None:
3678 query = query.bindparams(
3679 sql.bindparam(
3680 "nspname", util.text_type(schema), type_=sqltypes.Unicode
3681 )
3682 )
3683 cursor = connection.execute(query)
3684 return bool(cursor.scalar())
3685
3686 def _get_server_version_info(self, connection):
3687 v = connection.exec_driver_sql("select pg_catalog.version()").scalar()
3688 m = re.match(
3689 r".*(?:PostgreSQL|EnterpriseDB) "
3690 r"(\d+)\.?(\d+)?(?:\.(\d+))?(?:\.\d+)?(?:devel|beta)?",
3691 v,
3692 )
3693 if not m:
3694 raise AssertionError(
3695 "Could not determine version from string '%s'" % v
3696 )
3697 return tuple([int(x) for x in m.group(1, 2, 3) if x is not None])
3698
3699 @reflection.cache
3700 def get_table_oid(self, connection, table_name, schema=None, **kw):
3701 """Fetch the oid for schema.table_name.
3702
3703 Several reflection methods require the table oid. The idea for using
3704 this method is that it can be fetched one time and cached for
3705 subsequent calls.
3706
3707 """
3708 table_oid = None
3709 if schema is not None:
3710 schema_where_clause = "n.nspname = :schema"
3711 else:
3712 schema_where_clause = "pg_catalog.pg_table_is_visible(c.oid)"
3713 query = (
3714 """
3715 SELECT c.oid
3716 FROM pg_catalog.pg_class c
3717 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
3718 WHERE (%s)
3719 AND c.relname = :table_name AND c.relkind in
3720 ('r', 'v', 'm', 'f', 'p')
3721 """
3722 % schema_where_clause
3723 )
3724 # Since we're binding to unicode, table_name and schema_name must be
3725 # unicode.
3726 table_name = util.text_type(table_name)
3727 if schema is not None:
3728 schema = util.text_type(schema)
3729 s = sql.text(query).bindparams(table_name=sqltypes.Unicode)
3730 s = s.columns(oid=sqltypes.Integer)
3731 if schema:
3732 s = s.bindparams(sql.bindparam("schema", type_=sqltypes.Unicode))
3733 c = connection.execute(s, dict(table_name=table_name, schema=schema))
3734 table_oid = c.scalar()
3735 if table_oid is None:
3736 raise exc.NoSuchTableError(table_name)
3737 return table_oid
3738
3739 @reflection.cache
3740 def get_schema_names(self, connection, **kw):
3741 result = connection.execute(
3742 sql.text(
3743 "SELECT nspname FROM pg_namespace "
3744 "WHERE nspname NOT LIKE 'pg_%' "
3745 "ORDER BY nspname"
3746 ).columns(nspname=sqltypes.Unicode)
3747 )
3748 return [name for name, in result]
3749
3750 @reflection.cache
3751 def get_table_names(self, connection, schema=None, **kw):
3752 result = connection.execute(
3753 sql.text(
3754 "SELECT c.relname FROM pg_class c "
3755 "JOIN pg_namespace n ON n.oid = c.relnamespace "
3756 "WHERE n.nspname = :schema AND c.relkind in ('r', 'p')"
3757 ).columns(relname=sqltypes.Unicode),
3758 dict(
3759 schema=schema
3760 if schema is not None
3761 else self.default_schema_name
3762 ),
3763 )
3764 return [name for name, in result]
3765
3766 @reflection.cache
3767 def _get_foreign_table_names(self, connection, schema=None, **kw):
3768 result = connection.execute(
3769 sql.text(
3770 "SELECT c.relname FROM pg_class c "
3771 "JOIN pg_namespace n ON n.oid = c.relnamespace "
3772 "WHERE n.nspname = :schema AND c.relkind = 'f'"
3773 ).columns(relname=sqltypes.Unicode),
3774 dict(
3775 schema=schema
3776 if schema is not None
3777 else self.default_schema_name
3778 ),
3779 )
3780 return [name for name, in result]
3781
3782 @reflection.cache
3783 def get_view_names(
3784 self, connection, schema=None, include=("plain", "materialized"), **kw
3785 ):
3786
3787 include_kind = {"plain": "v", "materialized": "m"}
3788 try:
3789 kinds = [include_kind[i] for i in util.to_list(include)]
3790 except KeyError:
3791 raise ValueError(
3792 "include %r unknown, needs to be a sequence containing "
3793 "one or both of 'plain' and 'materialized'" % (include,)
3794 )
3795 if not kinds:
3796 raise ValueError(
3797 "empty include, needs to be a sequence containing "
3798 "one or both of 'plain' and 'materialized'"
3799 )
3800
3801 result = connection.execute(
3802 sql.text(
3803 "SELECT c.relname FROM pg_class c "
3804 "JOIN pg_namespace n ON n.oid = c.relnamespace "
3805 "WHERE n.nspname = :schema AND c.relkind IN (%s)"
3806 % (", ".join("'%s'" % elem for elem in kinds))
3807 ).columns(relname=sqltypes.Unicode),
3808 dict(
3809 schema=schema
3810 if schema is not None
3811 else self.default_schema_name
3812 ),
3813 )
3814 return [name for name, in result]
3815
3816 @reflection.cache
3817 def get_sequence_names(self, connection, schema=None, **kw):
3818 if not schema:
3819 schema = self.default_schema_name
3820 cursor = connection.execute(
3821 sql.text(
3822 "SELECT relname FROM pg_class c join pg_namespace n on "
3823 "n.oid=c.relnamespace where relkind='S' and "
3824 "n.nspname=:schema"
3825 ).bindparams(
3826 sql.bindparam(
3827 "schema",
3828 util.text_type(schema),
3829 type_=sqltypes.Unicode,
3830 ),
3831 )
3832 )
3833 return [row[0] for row in cursor]
3834
3835 @reflection.cache
3836 def get_view_definition(self, connection, view_name, schema=None, **kw):
3837 view_def = connection.scalar(
3838 sql.text(
3839 "SELECT pg_get_viewdef(c.oid) view_def FROM pg_class c "
3840 "JOIN pg_namespace n ON n.oid = c.relnamespace "
3841 "WHERE n.nspname = :schema AND c.relname = :view_name "
3842 "AND c.relkind IN ('v', 'm')"
3843 ).columns(view_def=sqltypes.Unicode),
3844 dict(
3845 schema=schema
3846 if schema is not None
3847 else self.default_schema_name,
3848 view_name=view_name,
3849 ),
3850 )
3851 return view_def
3852
3853 @reflection.cache
3854 def get_columns(self, connection, table_name, schema=None, **kw):
3855
3856 table_oid = self.get_table_oid(
3857 connection, table_name, schema, info_cache=kw.get("info_cache")
3858 )
3859
3860 generated = (
3861 "a.attgenerated as generated"
3862 if self.server_version_info >= (12,)
3863 else "NULL as generated"
3864 )
3865 if self.server_version_info >= (10,):
3866 # a.attidentity != '' is required or it will reflect also
3867 # serial columns as identity.
3868 identity = """\
3869 (SELECT json_build_object(
3870 'always', a.attidentity = 'a',
3871 'start', s.seqstart,
3872 'increment', s.seqincrement,
3873 'minvalue', s.seqmin,
3874 'maxvalue', s.seqmax,
3875 'cache', s.seqcache,
3876 'cycle', s.seqcycle)
3877 FROM pg_catalog.pg_sequence s
3878 JOIN pg_catalog.pg_class c on s.seqrelid = c."oid"
3879 WHERE c.relkind = 'S'
3880 AND a.attidentity != ''
3881 AND s.seqrelid = pg_catalog.pg_get_serial_sequence(
3882 a.attrelid::regclass::text, a.attname
3883 )::regclass::oid
3884 ) as identity_options\
3885 """
3886 else:
3887 identity = "NULL as identity_options"
3888
3889 SQL_COLS = """
3890 SELECT a.attname,
3891 pg_catalog.format_type(a.atttypid, a.atttypmod),
3892 (
3893 SELECT pg_catalog.pg_get_expr(d.adbin, d.adrelid)
3894 FROM pg_catalog.pg_attrdef d
3895 WHERE d.adrelid = a.attrelid AND d.adnum = a.attnum
3896 AND a.atthasdef
3897 ) AS DEFAULT,
3898 a.attnotnull,
3899 a.attrelid as table_oid,
3900 pgd.description as comment,
3901 %s,
3902 %s
3903 FROM pg_catalog.pg_attribute a
3904 LEFT JOIN pg_catalog.pg_description pgd ON (
3905 pgd.objoid = a.attrelid AND pgd.objsubid = a.attnum)
3906 WHERE a.attrelid = :table_oid
3907 AND a.attnum > 0 AND NOT a.attisdropped
3908 ORDER BY a.attnum
3909 """ % (
3910 generated,
3911 identity,
3912 )
3913 s = (
3914 sql.text(SQL_COLS)
3915 .bindparams(sql.bindparam("table_oid", type_=sqltypes.Integer))
3916 .columns(attname=sqltypes.Unicode, default=sqltypes.Unicode)
3917 )
3918 c = connection.execute(s, dict(table_oid=table_oid))
3919 rows = c.fetchall()
3920
3921 # dictionary with (name, ) if default search path or (schema, name)
3922 # as keys
3923 domains = self._load_domains(connection)
3924
3925 # dictionary with (name, ) if default search path or (schema, name)
3926 # as keys
3927 enums = dict(
3928 ((rec["name"],), rec)
3929 if rec["visible"]
3930 else ((rec["schema"], rec["name"]), rec)
3931 for rec in self._load_enums(connection, schema="*")
3932 )
3933
3934 # format columns
3935 columns = []
3936
3937 for (
3938 name,
3939 format_type,
3940 default_,
3941 notnull,
3942 table_oid,
3943 comment,
3944 generated,
3945 identity,
3946 ) in rows:
3947 column_info = self._get_column_info(
3948 name,
3949 format_type,
3950 default_,
3951 notnull,
3952 domains,
3953 enums,
3954 schema,
3955 comment,
3956 generated,
3957 identity,
3958 )
3959 columns.append(column_info)
3960 return columns
3961
3962 def _get_column_info(
3963 self,
3964 name,
3965 format_type,
3966 default,
3967 notnull,
3968 domains,
3969 enums,
3970 schema,
3971 comment,
3972 generated,
3973 identity,
3974 ):
3975 def _handle_array_type(attype):
3976 return (
3977 # strip '[]' from integer[], etc.
3978 re.sub(r"\[\]$", "", attype),
3979 attype.endswith("[]"),
3980 )
3981
3982 if format_type is None:
3983 no_format_type = True
3984 attype = format_type = "no format_type()"
3985 is_array = False
3986 else:
3987 no_format_type = False
3988
3989 # strip (*) from character varying(5), timestamp(5)
3990 # with time zone, geometry(POLYGON), etc.
3991 attype = re.sub(r"\(.*\)", "", format_type)
3992
3993 # strip '[]' from integer[], etc. and check if an array
3994 attype, is_array = _handle_array_type(attype)
3995
3996 # strip quotes from case sensitive enum or domain names
3997 enum_or_domain_key = tuple(util.quoted_token_parser(attype))
3998
3999 nullable = not notnull
4000
4001 charlen = re.search(r"\(([\d,]+)\)", format_type)
4002 if charlen:
4003 charlen = charlen.group(1)
4004 args = re.search(r"\((.*)\)", format_type)
4005 if args and args.group(1):
4006 args = tuple(re.split(r"\s*,\s*", args.group(1)))
4007 else:
4008 args = ()
4009 kwargs = {}
4010
4011 if attype == "numeric":
4012 if charlen:
4013 prec, scale = charlen.split(",")
4014 args = (int(prec), int(scale))
4015 else:
4016 args = ()
4017 elif attype == "double precision":
4018 args = (53,)
4019 elif attype == "integer":
4020 args = ()
4021 elif attype in ("timestamp with time zone", "time with time zone"):
4022 kwargs["timezone"] = True
4023 if charlen:
4024 kwargs["precision"] = int(charlen)
4025 args = ()
4026 elif attype in (
4027 "timestamp without time zone",
4028 "time without time zone",
4029 "time",
4030 ):
4031 kwargs["timezone"] = False
4032 if charlen:
4033 kwargs["precision"] = int(charlen)
4034 args = ()
4035 elif attype == "bit varying":
4036 kwargs["varying"] = True
4037 if charlen:
4038 args = (int(charlen),)
4039 else:
4040 args = ()
4041 elif attype.startswith("interval"):
4042 field_match = re.match(r"interval (.+)", attype, re.I)
4043 if charlen:
4044 kwargs["precision"] = int(charlen)
4045 if field_match:
4046 kwargs["fields"] = field_match.group(1)
4047 attype = "interval"
4048 args = ()
4049 elif charlen:
4050 args = (int(charlen),)
4051
4052 while True:
4053 # looping here to suit nested domains
4054 if attype in self.ischema_names:
4055 coltype = self.ischema_names[attype]
4056 break
4057 elif enum_or_domain_key in enums:
4058 enum = enums[enum_or_domain_key]
4059 coltype = ENUM
4060 kwargs["name"] = enum["name"]
4061 if not enum["visible"]:
4062 kwargs["schema"] = enum["schema"]
4063 args = tuple(enum["labels"])
4064 break
4065 elif enum_or_domain_key in domains:
4066 domain = domains[enum_or_domain_key]
4067 attype = domain["attype"]
4068 attype, is_array = _handle_array_type(attype)
4069 # strip quotes from case sensitive enum or domain names
4070 enum_or_domain_key = tuple(util.quoted_token_parser(attype))
4071 # A table can't override a not null on the domain,
4072 # but can override nullable
4073 nullable = nullable and domain["nullable"]
4074 if domain["default"] and not default:
4075 # It can, however, override the default
4076 # value, but can't set it to null.
4077 default = domain["default"]
4078 continue
4079 else:
4080 coltype = None
4081 break
4082
4083 if coltype:
4084 coltype = coltype(*args, **kwargs)
4085 if is_array:
4086 coltype = self.ischema_names["_array"](coltype)
4087 elif no_format_type:
4088 util.warn(
4089 "PostgreSQL format_type() returned NULL for column '%s'"
4090 % (name,)
4091 )
4092 coltype = sqltypes.NULLTYPE
4093 else:
4094 util.warn(
4095 "Did not recognize type '%s' of column '%s'" % (attype, name)
4096 )
4097 coltype = sqltypes.NULLTYPE
4098
4099 # If a zero byte or blank string depending on driver (is also absent
4100 # for older PG versions), then not a generated column. Otherwise, s =
4101 # stored. (Other values might be added in the future.)
4102 if generated not in (None, "", b"\x00"):
4103 computed = dict(
4104 sqltext=default, persisted=generated in ("s", b"s")
4105 )
4106 default = None
4107 else:
4108 computed = None
4109
4110 # adjust the default value
4111 autoincrement = False
4112 if default is not None:
4113 match = re.search(r"""(nextval\(')([^']+)('.*$)""", default)
4114 if match is not None:
4115 if issubclass(coltype._type_affinity, sqltypes.Integer):
4116 autoincrement = True
4117 # the default is related to a Sequence
4118 sch = schema
4119 if "." not in match.group(2) and sch is not None:
4120 # unconditionally quote the schema name. this could
4121 # later be enhanced to obey quoting rules /
4122 # "quote schema"
4123 default = (
4124 match.group(1)
4125 + ('"%s"' % sch)
4126 + "."
4127 + match.group(2)
4128 + match.group(3)
4129 )
4130
4131 column_info = dict(
4132 name=name,
4133 type=coltype,
4134 nullable=nullable,
4135 default=default,
4136 autoincrement=autoincrement or identity is not None,
4137 comment=comment,
4138 )
4139 if computed is not None:
4140 column_info["computed"] = computed
4141 if identity is not None:
4142 column_info["identity"] = identity
4143 return column_info
4144
4145 @reflection.cache
4146 def get_pk_constraint(self, connection, table_name, schema=None, **kw):
4147 table_oid = self.get_table_oid(
4148 connection, table_name, schema, info_cache=kw.get("info_cache")
4149 )
4150
4151 if self.server_version_info < (8, 4):
4152 PK_SQL = """
4153 SELECT a.attname
4154 FROM
4155 pg_class t
4156 join pg_index ix on t.oid = ix.indrelid
4157 join pg_attribute a
4158 on t.oid=a.attrelid AND %s
4159 WHERE
4160 t.oid = :table_oid and ix.indisprimary = 't'
4161 ORDER BY a.attnum
4162 """ % self._pg_index_any(
4163 "a.attnum", "ix.indkey"
4164 )
4165
4166 else:
4167 # unnest() and generate_subscripts() both introduced in
4168 # version 8.4
4169 PK_SQL = """
4170 SELECT a.attname
4171 FROM pg_attribute a JOIN (
4172 SELECT unnest(ix.indkey) attnum,
4173 generate_subscripts(ix.indkey, 1) ord
4174 FROM pg_index ix
4175 WHERE ix.indrelid = :table_oid AND ix.indisprimary
4176 ) k ON a.attnum=k.attnum
4177 WHERE a.attrelid = :table_oid
4178 ORDER BY k.ord
4179 """
4180 t = sql.text(PK_SQL).columns(attname=sqltypes.Unicode)
4181 c = connection.execute(t, dict(table_oid=table_oid))
4182 cols = [r[0] for r in c.fetchall()]
4183
4184 PK_CONS_SQL = """
4185 SELECT conname
4186 FROM pg_catalog.pg_constraint r
4187 WHERE r.conrelid = :table_oid AND r.contype = 'p'
4188 ORDER BY 1
4189 """
4190 t = sql.text(PK_CONS_SQL).columns(conname=sqltypes.Unicode)
4191 c = connection.execute(t, dict(table_oid=table_oid))
4192 name = c.scalar()
4193
4194 return {"constrained_columns": cols, "name": name}
4195
4196 @reflection.cache
4197 def get_foreign_keys(
4198 self,
4199 connection,
4200 table_name,
4201 schema=None,
4202 postgresql_ignore_search_path=False,
4203 **kw
4204 ):
4205 preparer = self.identifier_preparer
4206 table_oid = self.get_table_oid(
4207 connection, table_name, schema, info_cache=kw.get("info_cache")
4208 )
4209
4210 FK_SQL = """
4211 SELECT r.conname,
4212 pg_catalog.pg_get_constraintdef(r.oid, true) as condef,
4213 n.nspname as conschema
4214 FROM pg_catalog.pg_constraint r,
4215 pg_namespace n,
4216 pg_class c
4217
4218 WHERE r.conrelid = :table AND
4219 r.contype = 'f' AND
4220 c.oid = confrelid AND
4221 n.oid = c.relnamespace
4222 ORDER BY 1
4223 """
4224 # https://www.postgresql.org/docs/9.0/static/sql-createtable.html
4225 FK_REGEX = re.compile(
4226 r"FOREIGN KEY \((.*?)\) REFERENCES (?:(.*?)\.)?(.*?)\((.*?)\)"
4227 r"[\s]?(MATCH (FULL|PARTIAL|SIMPLE)+)?"
4228 r"[\s]?(ON UPDATE "
4229 r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
4230 r"[\s]?(ON DELETE "
4231 r"(CASCADE|RESTRICT|NO ACTION|SET NULL|SET DEFAULT)+)?"
4232 r"[\s]?(DEFERRABLE|NOT DEFERRABLE)?"
4233 r"[\s]?(INITIALLY (DEFERRED|IMMEDIATE)+)?"
4234 )
4235
4236 t = sql.text(FK_SQL).columns(
4237 conname=sqltypes.Unicode, condef=sqltypes.Unicode
4238 )
4239 c = connection.execute(t, dict(table=table_oid))
4240 fkeys = []
4241 for conname, condef, conschema in c.fetchall():
4242 m = re.search(FK_REGEX, condef).groups()
4243
4244 (
4245 constrained_columns,
4246 referred_schema,
4247 referred_table,
4248 referred_columns,
4249 _,
4250 match,
4251 _,
4252 onupdate,
4253 _,
4254 ondelete,
4255 deferrable,
4256 _,
4257 initially,
4258 ) = m
4259
4260 if deferrable is not None:
4261 deferrable = True if deferrable == "DEFERRABLE" else False
4262 constrained_columns = [
4263 preparer._unquote_identifier(x)
4264 for x in re.split(r"\s*,\s*", constrained_columns)
4265 ]
4266
4267 if postgresql_ignore_search_path:
4268 # when ignoring search path, we use the actual schema
4269 # provided it isn't the "default" schema
4270 if conschema != self.default_schema_name:
4271 referred_schema = conschema
4272 else:
4273 referred_schema = schema
4274 elif referred_schema:
4275 # referred_schema is the schema that we regexp'ed from
4276 # pg_get_constraintdef(). If the schema is in the search
4277 # path, pg_get_constraintdef() will give us None.
4278 referred_schema = preparer._unquote_identifier(referred_schema)
4279 elif schema is not None and schema == conschema:
4280 # If the actual schema matches the schema of the table
4281 # we're reflecting, then we will use that.
4282 referred_schema = schema
4283
4284 referred_table = preparer._unquote_identifier(referred_table)
4285 referred_columns = [
4286 preparer._unquote_identifier(x)
4287 for x in re.split(r"\s*,\s", referred_columns)
4288 ]
4289 options = {
4290 k: v
4291 for k, v in [
4292 ("onupdate", onupdate),
4293 ("ondelete", ondelete),
4294 ("initially", initially),
4295 ("deferrable", deferrable),
4296 ("match", match),
4297 ]
4298 if v is not None and v != "NO ACTION"
4299 }
4300 fkey_d = {
4301 "name": conname,
4302 "constrained_columns": constrained_columns,
4303 "referred_schema": referred_schema,
4304 "referred_table": referred_table,
4305 "referred_columns": referred_columns,
4306 "options": options,
4307 }
4308 fkeys.append(fkey_d)
4309 return fkeys
4310
4311 def _pg_index_any(self, col, compare_to):
4312 if self.server_version_info < (8, 1):
4313 # https://www.postgresql.org/message-id/10279.1124395722@sss.pgh.pa.us
4314 # "In CVS tip you could replace this with "attnum = ANY (indkey)".
4315 # Unfortunately, most array support doesn't work on int2vector in
4316 # pre-8.1 releases, so I think you're kinda stuck with the above
4317 # for now.
4318 # regards, tom lane"
4319 return "(%s)" % " OR ".join(
4320 "%s[%d] = %s" % (compare_to, ind, col) for ind in range(0, 10)
4321 )
4322 else:
4323 return "%s = ANY(%s)" % (col, compare_to)
4324
4325 @reflection.cache
4326 def get_indexes(self, connection, table_name, schema, **kw):
4327 table_oid = self.get_table_oid(
4328 connection, table_name, schema, info_cache=kw.get("info_cache")
4329 )
4330
4331 # cast indkey as varchar since it's an int2vector,
4332 # returned as a list by some drivers such as pypostgresql
4333
4334 if self.server_version_info < (8, 5):
4335 IDX_SQL = """
4336 SELECT
4337 i.relname as relname,
4338 ix.indisunique, ix.indexprs, ix.indpred,
4339 a.attname, a.attnum, NULL, ix.indkey%s,
4340 %s, %s, am.amname,
4341 NULL as indnkeyatts
4342 FROM
4343 pg_class t
4344 join pg_index ix on t.oid = ix.indrelid
4345 join pg_class i on i.oid = ix.indexrelid
4346 left outer join
4347 pg_attribute a
4348 on t.oid = a.attrelid and %s
4349 left outer join
4350 pg_am am
4351 on i.relam = am.oid
4352 WHERE
4353 t.relkind IN ('r', 'v', 'f', 'm')
4354 and t.oid = :table_oid
4355 and ix.indisprimary = 'f'
4356 ORDER BY
4357 t.relname,
4358 i.relname
4359 """ % (
4360 # version 8.3 here was based on observing the
4361 # cast does not work in PG 8.2.4, does work in 8.3.0.
4362 # nothing in PG changelogs regarding this.
4363 "::varchar" if self.server_version_info >= (8, 3) else "",
4364 "ix.indoption::varchar"
4365 if self.server_version_info >= (8, 3)
4366 else "NULL",
4367 "i.reloptions"
4368 if self.server_version_info >= (8, 2)
4369 else "NULL",
4370 self._pg_index_any("a.attnum", "ix.indkey"),
4371 )
4372 else:
4373 IDX_SQL = """
4374 SELECT
4375 i.relname as relname,
4376 ix.indisunique, ix.indexprs,
4377 a.attname, a.attnum, c.conrelid, ix.indkey::varchar,
4378 ix.indoption::varchar, i.reloptions, am.amname,
4379 pg_get_expr(ix.indpred, ix.indrelid),
4380 %s as indnkeyatts
4381 FROM
4382 pg_class t
4383 join pg_index ix on t.oid = ix.indrelid
4384 join pg_class i on i.oid = ix.indexrelid
4385 left outer join
4386 pg_attribute a
4387 on t.oid = a.attrelid and a.attnum = ANY(ix.indkey)
4388 left outer join
4389 pg_constraint c
4390 on (ix.indrelid = c.conrelid and
4391 ix.indexrelid = c.conindid and
4392 c.contype in ('p', 'u', 'x'))
4393 left outer join
4394 pg_am am
4395 on i.relam = am.oid
4396 WHERE
4397 t.relkind IN ('r', 'v', 'f', 'm', 'p')
4398 and t.oid = :table_oid
4399 and ix.indisprimary = 'f'
4400 ORDER BY
4401 t.relname,
4402 i.relname
4403 """ % (
4404 "ix.indnkeyatts"
4405 if self.server_version_info >= (11, 0)
4406 else "NULL",
4407 )
4408
4409 t = sql.text(IDX_SQL).columns(
4410 relname=sqltypes.Unicode, attname=sqltypes.Unicode
4411 )
4412 c = connection.execute(t, dict(table_oid=table_oid))
4413
4414 indexes = defaultdict(lambda: defaultdict(dict))
4415
4416 sv_idx_name = None
4417 for row in c.fetchall():
4418 (
4419 idx_name,
4420 unique,
4421 expr,
4422 col,
4423 col_num,
4424 conrelid,
4425 idx_key,
4426 idx_option,
4427 options,
4428 amname,
4429 filter_definition,
4430 indnkeyatts,
4431 ) = row
4432
4433 if expr:
4434 if idx_name != sv_idx_name:
4435 util.warn(
4436 "Skipped unsupported reflection of "
4437 "expression-based index %s" % idx_name
4438 )
4439 sv_idx_name = idx_name
4440 continue
4441
4442 has_idx = idx_name in indexes
4443 index = indexes[idx_name]
4444 if col is not None:
4445 index["cols"][col_num] = col
4446 if not has_idx:
4447 idx_keys = idx_key.split()
4448 # "The number of key columns in the index, not counting any
4449 # included columns, which are merely stored and do not
4450 # participate in the index semantics"
4451 if indnkeyatts and idx_keys[indnkeyatts:]:
4452 # this is a "covering index" which has INCLUDE columns
4453 # as well as regular index columns
4454 inc_keys = idx_keys[indnkeyatts:]
4455 idx_keys = idx_keys[:indnkeyatts]
4456 else:
4457 inc_keys = []
4458
4459 index["key"] = [int(k.strip()) for k in idx_keys]
4460 index["inc"] = [int(k.strip()) for k in inc_keys]
4461
4462 # (new in pg 8.3)
4463 # "pg_index.indoption" is list of ints, one per column/expr.
4464 # int acts as bitmask: 0x01=DESC, 0x02=NULLSFIRST
4465 sorting = {}
4466 for col_idx, col_flags in enumerate(
4467 (idx_option or "").split()
4468 ):
4469 col_flags = int(col_flags.strip())
4470 col_sorting = ()
4471 # try to set flags only if they differ from PG defaults...
4472 if col_flags & 0x01:
4473 col_sorting += ("desc",)
4474 if not (col_flags & 0x02):
4475 col_sorting += ("nulls_last",)
4476 else:
4477 if col_flags & 0x02:
4478 col_sorting += ("nulls_first",)
4479 if col_sorting:
4480 sorting[col_idx] = col_sorting
4481 if sorting:
4482 index["sorting"] = sorting
4483
4484 index["unique"] = unique
4485 if conrelid is not None:
4486 index["duplicates_constraint"] = idx_name
4487 if options:
4488 index["options"] = dict(
4489 [option.split("=") for option in options]
4490 )
4491
4492 # it *might* be nice to include that this is 'btree' in the
4493 # reflection info. But we don't want an Index object
4494 # to have a ``postgresql_using`` in it that is just the
4495 # default, so for the moment leaving this out.
4496 if amname and amname != "btree":
4497 index["amname"] = amname
4498
4499 if filter_definition:
4500 index["postgresql_where"] = filter_definition
4501
4502 result = []
4503 for name, idx in indexes.items():
4504 entry = {
4505 "name": name,
4506 "unique": idx["unique"],
4507 "column_names": [idx["cols"][i] for i in idx["key"]],
4508 }
4509 if self.server_version_info >= (11, 0):
4510 # NOTE: this is legacy, this is part of dialect_options now
4511 # as of #7382
4512 entry["include_columns"] = [idx["cols"][i] for i in idx["inc"]]
4513 if "duplicates_constraint" in idx:
4514 entry["duplicates_constraint"] = idx["duplicates_constraint"]
4515 if "sorting" in idx:
4516 entry["column_sorting"] = dict(
4517 (idx["cols"][idx["key"][i]], value)
4518 for i, value in idx["sorting"].items()
4519 )
4520 if "include_columns" in entry:
4521 entry.setdefault("dialect_options", {})[
4522 "postgresql_include"
4523 ] = entry["include_columns"]
4524 if "options" in idx:
4525 entry.setdefault("dialect_options", {})[
4526 "postgresql_with"
4527 ] = idx["options"]
4528 if "amname" in idx:
4529 entry.setdefault("dialect_options", {})[
4530 "postgresql_using"
4531 ] = idx["amname"]
4532 if "postgresql_where" in idx:
4533 entry.setdefault("dialect_options", {})[
4534 "postgresql_where"
4535 ] = idx["postgresql_where"]
4536 result.append(entry)
4537 return result
4538
4539 @reflection.cache
4540 def get_unique_constraints(
4541 self, connection, table_name, schema=None, **kw
4542 ):
4543 table_oid = self.get_table_oid(
4544 connection, table_name, schema, info_cache=kw.get("info_cache")
4545 )
4546
4547 UNIQUE_SQL = """
4548 SELECT
4549 cons.conname as name,
4550 cons.conkey as key,
4551 a.attnum as col_num,
4552 a.attname as col_name
4553 FROM
4554 pg_catalog.pg_constraint cons
4555 join pg_attribute a
4556 on cons.conrelid = a.attrelid AND
4557 a.attnum = ANY(cons.conkey)
4558 WHERE
4559 cons.conrelid = :table_oid AND
4560 cons.contype = 'u'
4561 """
4562
4563 t = sql.text(UNIQUE_SQL).columns(col_name=sqltypes.Unicode)
4564 c = connection.execute(t, dict(table_oid=table_oid))
4565
4566 uniques = defaultdict(lambda: defaultdict(dict))
4567 for row in c.fetchall():
4568 uc = uniques[row.name]
4569 uc["key"] = row.key
4570 uc["cols"][row.col_num] = row.col_name
4571
4572 return [
4573 {"name": name, "column_names": [uc["cols"][i] for i in uc["key"]]}
4574 for name, uc in uniques.items()
4575 ]
4576
4577 @reflection.cache
4578 def get_table_comment(self, connection, table_name, schema=None, **kw):
4579 table_oid = self.get_table_oid(
4580 connection, table_name, schema, info_cache=kw.get("info_cache")
4581 )
4582
4583 COMMENT_SQL = """
4584 SELECT
4585 pgd.description as table_comment
4586 FROM
4587 pg_catalog.pg_description pgd
4588 WHERE
4589 pgd.objsubid = 0 AND
4590 pgd.objoid = :table_oid
4591 """
4592
4593 c = connection.execute(
4594 sql.text(COMMENT_SQL), dict(table_oid=table_oid)
4595 )
4596 return {"text": c.scalar()}
4597
4598 @reflection.cache
4599 def get_check_constraints(self, connection, table_name, schema=None, **kw):
4600 table_oid = self.get_table_oid(
4601 connection, table_name, schema, info_cache=kw.get("info_cache")
4602 )
4603
4604 CHECK_SQL = """
4605 SELECT
4606 cons.conname as name,
4607 pg_get_constraintdef(cons.oid) as src
4608 FROM
4609 pg_catalog.pg_constraint cons
4610 WHERE
4611 cons.conrelid = :table_oid AND
4612 cons.contype = 'c'
4613 """
4614
4615 c = connection.execute(sql.text(CHECK_SQL), dict(table_oid=table_oid))
4616
4617 ret = []
4618 for name, src in c:
4619 # samples:
4620 # "CHECK (((a > 1) AND (a < 5)))"
4621 # "CHECK (((a = 1) OR ((a > 2) AND (a < 5))))"
4622 # "CHECK (((a > 1) AND (a < 5))) NOT VALID"
4623 # "CHECK (some_boolean_function(a))"
4624 # "CHECK (((a\n < 1)\n OR\n (a\n >= 5))\n)"
4625
4626 m = re.match(
4627 r"^CHECK *\((.+)\)( NOT VALID)?$", src, flags=re.DOTALL
4628 )
4629 if not m:
4630 util.warn("Could not parse CHECK constraint text: %r" % src)
4631 sqltext = ""
4632 else:
4633 sqltext = re.compile(
4634 r"^[\s\n]*\((.+)\)[\s\n]*$", flags=re.DOTALL
4635 ).sub(r"\1", m.group(1))
4636 entry = {"name": name, "sqltext": sqltext}
4637 if m and m.group(2):
4638 entry["dialect_options"] = {"not_valid": True}
4639
4640 ret.append(entry)
4641 return ret
4642
4643 def _load_enums(self, connection, schema=None):
4644 schema = schema or self.default_schema_name
4645 if not self.supports_native_enum:
4646 return {}
4647
4648 # Load data types for enums:
4649 SQL_ENUMS = """
4650 SELECT t.typname as "name",
4651 -- no enum defaults in 8.4 at least
4652 -- t.typdefault as "default",
4653 pg_catalog.pg_type_is_visible(t.oid) as "visible",
4654 n.nspname as "schema",
4655 e.enumlabel as "label"
4656 FROM pg_catalog.pg_type t
4657 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
4658 LEFT JOIN pg_catalog.pg_enum e ON t.oid = e.enumtypid
4659 WHERE t.typtype = 'e'
4660 """
4661
4662 if schema != "*":
4663 SQL_ENUMS += "AND n.nspname = :schema "
4664
4665 # e.oid gives us label order within an enum
4666 SQL_ENUMS += 'ORDER BY "schema", "name", e.oid'
4667
4668 s = sql.text(SQL_ENUMS).columns(
4669 attname=sqltypes.Unicode, label=sqltypes.Unicode
4670 )
4671
4672 if schema != "*":
4673 s = s.bindparams(schema=schema)
4674
4675 c = connection.execute(s)
4676
4677 enums = []
4678 enum_by_name = {}
4679 for enum in c.fetchall():
4680 key = (enum.schema, enum.name)
4681 if key in enum_by_name:
4682 enum_by_name[key]["labels"].append(enum.label)
4683 else:
4684 enum_by_name[key] = enum_rec = {
4685 "name": enum.name,
4686 "schema": enum.schema,
4687 "visible": enum.visible,
4688 "labels": [],
4689 }
4690 if enum.label is not None:
4691 enum_rec["labels"].append(enum.label)
4692 enums.append(enum_rec)
4693 return enums
4694
4695 def _load_domains(self, connection):
4696 # Load data types for domains:
4697 SQL_DOMAINS = """
4698 SELECT t.typname as "name",
4699 pg_catalog.format_type(t.typbasetype, t.typtypmod) as "attype",
4700 not t.typnotnull as "nullable",
4701 t.typdefault as "default",
4702 pg_catalog.pg_type_is_visible(t.oid) as "visible",
4703 n.nspname as "schema"
4704 FROM pg_catalog.pg_type t
4705 LEFT JOIN pg_catalog.pg_namespace n ON n.oid = t.typnamespace
4706 WHERE t.typtype = 'd'
4707 """
4708
4709 s = sql.text(SQL_DOMAINS)
4710 c = connection.execution_options(future_result=True).execute(s)
4711
4712 domains = {}
4713 for domain in c.mappings():
4714 domain = domain
4715 # strip (30) from character varying(30)
4716 attype = re.search(r"([^\(]+)", domain["attype"]).group(1)
4717 # 'visible' just means whether or not the domain is in a
4718 # schema that's on the search path -- or not overridden by
4719 # a schema with higher precedence. If it's not visible,
4720 # it will be prefixed with the schema-name when it's used.
4721 if domain["visible"]:
4722 key = (domain["name"],)
4723 else:
4724 key = (domain["schema"], domain["name"])
4725
4726 domains[key] = {
4727 "attype": attype,
4728 "nullable": domain["nullable"],
4729 "default": domain["default"],
4730 }
4731
4732 return domains