Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/psycopg2.py: 32%
299 statements
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
« prev ^ index » next coverage.py v7.0.1, created at 2022-12-25 06:11 +0000
1# postgresql/psycopg2.py
2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7r"""
8.. dialect:: postgresql+psycopg2
9 :name: psycopg2
10 :dbapi: psycopg2
11 :connectstring: postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...]
12 :url: https://pypi.org/project/psycopg2/
14psycopg2 Connect Arguments
15--------------------------
17Keyword arguments that are specific to the SQLAlchemy psycopg2 dialect
18may be passed to :func:`_sa.create_engine()`, and include the following:
21* ``isolation_level``: This option, available for all PostgreSQL dialects,
22 includes the ``AUTOCOMMIT`` isolation level when using the psycopg2
23 dialect. This option sets the **default** isolation level for the
24 connection that is set immediately upon connection to the database before
25 the connection is pooled. This option is generally superseded by the more
26 modern :paramref:`_engine.Connection.execution_options.isolation_level`
27 execution option, detailed at :ref:`dbapi_autocommit`.
29 .. seealso::
31 :ref:`psycopg2_isolation_level`
33 :ref:`dbapi_autocommit`
36* ``client_encoding``: sets the client encoding in a libpq-agnostic way,
37 using psycopg2's ``set_client_encoding()`` method.
39 .. seealso::
41 :ref:`psycopg2_unicode`
43* ``use_native_unicode``: Under Python 2 only, this can be set to False to
44 disable the use of psycopg2's native Unicode support.
46 .. seealso::
48 :ref:`psycopg2_disable_native_unicode`
51* ``executemany_mode``, ``executemany_batch_page_size``,
52 ``executemany_values_page_size``: Allows use of psycopg2
53 extensions for optimizing "executemany"-style queries. See the referenced
54 section below for details.
56 .. seealso::
58 :ref:`psycopg2_executemany_mode`
60.. tip::
62 The above keyword arguments are **dialect** keyword arguments, meaning
63 that they are passed as explicit keyword arguments to :func:`_sa.create_engine()`::
65 engine = create_engine(
66 "postgresql+psycopg2://scott:tiger@localhost/test",
67 isolation_level="SERIALIZABLE",
68 )
70 These should not be confused with **DBAPI** connect arguments, which
71 are passed as part of the :paramref:`_sa.create_engine.connect_args`
72 dictionary and/or are passed in the URL query string, as detailed in
73 the section :ref:`custom_dbapi_args`.
75.. _psycopg2_ssl:
77SSL Connections
78---------------
80The psycopg2 module has a connection argument named ``sslmode`` for
81controlling its behavior regarding secure (SSL) connections. The default is
82``sslmode=prefer``; it will attempt an SSL connection and if that fails it
83will fall back to an unencrypted connection. ``sslmode=require`` may be used
84to ensure that only secure connections are established. Consult the
85psycopg2 / libpq documentation for further options that are available.
87Note that ``sslmode`` is specific to psycopg2 so it is included in the
88connection URI::
90 engine = sa.create_engine(
91 "postgresql+psycopg2://scott:tiger@192.168.0.199:5432/test?sslmode=require"
92 )
94Unix Domain Connections
95------------------------
97psycopg2 supports connecting via Unix domain connections. When the ``host``
98portion of the URL is omitted, SQLAlchemy passes ``None`` to psycopg2,
99which specifies Unix-domain communication rather than TCP/IP communication::
101 create_engine("postgresql+psycopg2://user:password@/dbname")
103By default, the socket file used is to connect to a Unix-domain socket
104in ``/tmp``, or whatever socket directory was specified when PostgreSQL
105was built. This value can be overridden by passing a pathname to psycopg2,
106using ``host`` as an additional keyword argument::
108 create_engine("postgresql+psycopg2://user:password@/dbname?host=/var/lib/postgresql")
110.. seealso::
112 `PQconnectdbParams \
113 <https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-PQCONNECTDBPARAMS>`_
115.. _psycopg2_multi_host:
117Specifying multiple fallback hosts
118-----------------------------------
120psycopg2 supports multiple connection points in the connection string.
121When the ``host`` parameter is used multiple times in the query section of
122the URL, SQLAlchemy will create a single string of the host and port
123information provided to make the connections. Tokens may consist of
124``host::port`` or just ``host``; in the latter case, the default port
125is selected by libpq. In the example below, three host connections
126are specified, for ``HostA::PortA``, ``HostB`` connecting to the default port,
127and ``HostC::PortC``::
129 create_engine(
130 "postgresql+psycopg2://user:password@/dbname?host=HostA:PortA&host=HostB&host=HostC:PortC"
131 )
133As an alternative, libpq query string format also may be used; this specifies
134``host`` and ``port`` as single query string arguments with comma-separated
135lists - the default port can be chosen by indicating an empty value
136in the comma separated list::
138 create_engine(
139 "postgresql+psycopg2://user:password@/dbname?host=HostA,HostB,HostC&port=PortA,,PortC"
140 )
142With either URL style, connections to each host is attempted based on a
143configurable strategy, which may be configured using the libpq
144``target_session_attrs`` parameter. Per libpq this defaults to ``any``
145which indicates a connection to each host is then attempted until a connection is successful.
146Other strategies include ``primary``, ``prefer-standby``, etc. The complete
147list is documented by PostgreSQL at
148`libpq connection strings <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING>`_.
150For example, to indicate two hosts using the ``primary`` strategy::
152 create_engine(
153 "postgresql+psycopg2://user:password@/dbname?host=HostA:PortA&host=HostB&host=HostC:PortC&target_session_attrs=primary"
154 )
156.. versionchanged:: 1.4.40 Port specification in psycopg2 multiple host format
157 is repaired, previously ports were not correctly interpreted in this context.
158 libpq comma-separated format is also now supported.
160.. versionadded:: 1.3.20 Support for multiple hosts in PostgreSQL connection
161 string.
163.. seealso::
165 `libpq connection strings <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING>`_ - please refer
166 to this section in the libpq documentation for complete background on multiple host support.
169Empty DSN Connections / Environment Variable Connections
170---------------------------------------------------------
172The psycopg2 DBAPI can connect to PostgreSQL by passing an empty DSN to the
173libpq client library, which by default indicates to connect to a localhost
174PostgreSQL database that is open for "trust" connections. This behavior can be
175further tailored using a particular set of environment variables which are
176prefixed with ``PG_...``, which are consumed by ``libpq`` to take the place of
177any or all elements of the connection string.
179For this form, the URL can be passed without any elements other than the
180initial scheme::
182 engine = create_engine('postgresql+psycopg2://')
184In the above form, a blank "dsn" string is passed to the ``psycopg2.connect()``
185function which in turn represents an empty DSN passed to libpq.
187.. versionadded:: 1.3.2 support for parameter-less connections with psycopg2.
189.. seealso::
191 `Environment Variables\
192 <https://www.postgresql.org/docs/current/libpq-envars.html>`_ -
193 PostgreSQL documentation on how to use ``PG_...``
194 environment variables for connections.
196.. _psycopg2_execution_options:
198Per-Statement/Connection Execution Options
199-------------------------------------------
201The following DBAPI-specific options are respected when used with
202:meth:`_engine.Connection.execution_options`,
203:meth:`.Executable.execution_options`,
204:meth:`_query.Query.execution_options`,
205in addition to those not specific to DBAPIs:
207* ``isolation_level`` - Set the transaction isolation level for the lifespan
208 of a :class:`_engine.Connection` (can only be set on a connection,
209 not a statement
210 or query). See :ref:`psycopg2_isolation_level`.
212* ``stream_results`` - Enable or disable usage of psycopg2 server side
213 cursors - this feature makes use of "named" cursors in combination with
214 special result handling methods so that result rows are not fully buffered.
215 Defaults to False, meaning cursors are buffered by default.
217* ``max_row_buffer`` - when using ``stream_results``, an integer value that
218 specifies the maximum number of rows to buffer at a time. This is
219 interpreted by the :class:`.BufferedRowCursorResult`, and if omitted the
220 buffer will grow to ultimately store 1000 rows at a time.
222 .. versionchanged:: 1.4 The ``max_row_buffer`` size can now be greater than
223 1000, and the buffer will grow to that size.
225.. _psycopg2_batch_mode:
227.. _psycopg2_executemany_mode:
229Psycopg2 Fast Execution Helpers
230-------------------------------
232Modern versions of psycopg2 include a feature known as
233`Fast Execution Helpers \
234<https://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which
235have been shown in benchmarking to improve psycopg2's executemany()
236performance, primarily with INSERT statements, by multiple orders of magnitude.
237SQLAlchemy internally makes use of these extensions for ``executemany()`` style
238calls, which correspond to lists of parameters being passed to
239:meth:`_engine.Connection.execute` as detailed in :ref:`multiple parameter
240sets <tutorial_multiple_parameters>`. The ORM also uses this mode internally whenever
241possible.
243The two available extensions on the psycopg2 side are the ``execute_values()``
244and ``execute_batch()`` functions. The psycopg2 dialect defaults to using the
245``execute_values()`` extension for all qualifying INSERT statements.
247.. versionchanged:: 1.4 The psycopg2 dialect now defaults to a new mode
248 ``"values_only"`` for ``executemany_mode``, which allows an order of
249 magnitude performance improvement for INSERT statements, but does not
250 include "batch" mode for UPDATE and DELETE statements which removes the
251 ability of ``cursor.rowcount`` to function correctly.
253The use of these extensions is controlled by the ``executemany_mode`` flag
254which may be passed to :func:`_sa.create_engine`::
256 engine = create_engine(
257 "postgresql+psycopg2://scott:tiger@host/dbname",
258 executemany_mode='values_plus_batch')
261Possible options for ``executemany_mode`` include:
263* ``values_only`` - this is the default value. the psycopg2 execute_values()
264 extension is used for qualifying INSERT statements, which rewrites the INSERT
265 to include multiple VALUES clauses so that many parameter sets can be
266 inserted with one statement.
268 .. versionadded:: 1.4 Added ``"values_only"`` setting for ``executemany_mode``
269 which is also now the default.
271* ``None`` - No psycopg2 extensions are not used, and the usual
272 ``cursor.executemany()`` method is used when invoking statements with
273 multiple parameter sets.
275* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` for all qualifying
276 INSERT, UPDATE and DELETE statements, so that multiple copies
277 of a SQL query, each one corresponding to a parameter set passed to
278 ``executemany()``, are joined into a single SQL string separated by a
279 semicolon. When using this mode, the :attr:`_engine.CursorResult.rowcount`
280 attribute will not contain a value for executemany-style executions.
282* ``'values_plus_batch'``- ``execute_values`` is used for qualifying INSERT
283 statements, ``execute_batch`` is used for UPDATE and DELETE.
284 When using this mode, the :attr:`_engine.CursorResult.rowcount`
285 attribute will not contain a value for executemany-style executions against
286 UPDATE and DELETE statements.
288By "qualifying statements", we mean that the statement being executed
289must be a Core :func:`_expression.insert`, :func:`_expression.update`
290or :func:`_expression.delete` construct, and not a plain textual SQL
291string or one constructed using :func:`_expression.text`. When using the
292ORM, all insert/update/delete statements used by the ORM flush process
293are qualifying.
295The "page size" for the "values" and "batch" strategies can be affected
296by using the ``executemany_batch_page_size`` and
297``executemany_values_page_size`` engine parameters. These
298control how many parameter sets
299should be represented in each execution. The "values" page size defaults
300to 1000, which is different that psycopg2's default. The "batch" page
301size defaults to 100. These can be affected by passing new values to
302:func:`_engine.create_engine`::
304 engine = create_engine(
305 "postgresql+psycopg2://scott:tiger@host/dbname",
306 executemany_mode='values',
307 executemany_values_page_size=10000, executemany_batch_page_size=500)
309.. versionchanged:: 1.4
311 The default for ``executemany_values_page_size`` is now 1000, up from
312 100.
314.. seealso::
316 :ref:`tutorial_multiple_parameters` - General information on using the
317 :class:`_engine.Connection`
318 object to execute statements in such a way as to make
319 use of the DBAPI ``.executemany()`` method.
322.. _psycopg2_unicode:
324Unicode with Psycopg2
325----------------------
327The psycopg2 DBAPI driver supports Unicode data transparently. Under Python 2
328only, the SQLAlchemy psycopg2 dialect will enable the
329``psycopg2.extensions.UNICODE`` extension by default to ensure Unicode is
330handled properly; under Python 3, this is psycopg2's default behavior.
332The client character encoding can be controlled for the psycopg2 dialect
333in the following ways:
335* For PostgreSQL 9.1 and above, the ``client_encoding`` parameter may be
336 passed in the database URL; this parameter is consumed by the underlying
337 ``libpq`` PostgreSQL client library::
339 engine = create_engine("postgresql+psycopg2://user:pass@host/dbname?client_encoding=utf8")
341 Alternatively, the above ``client_encoding`` value may be passed using
342 :paramref:`_sa.create_engine.connect_args` for programmatic establishment with
343 ``libpq``::
345 engine = create_engine(
346 "postgresql+psycopg2://user:pass@host/dbname",
347 connect_args={'client_encoding': 'utf8'}
348 )
350* For all PostgreSQL versions, psycopg2 supports a client-side encoding
351 value that will be passed to database connections when they are first
352 established. The SQLAlchemy psycopg2 dialect supports this using the
353 ``client_encoding`` parameter passed to :func:`_sa.create_engine`::
355 engine = create_engine(
356 "postgresql+psycopg2://user:pass@host/dbname",
357 client_encoding="utf8"
358 )
360 .. tip:: The above ``client_encoding`` parameter admittedly is very similar
361 in appearance to usage of the parameter within the
362 :paramref:`_sa.create_engine.connect_args` dictionary; the difference
363 above is that the parameter is consumed by psycopg2 and is
364 passed to the database connection using ``SET client_encoding TO
365 'utf8'``; in the previously mentioned style, the parameter is instead
366 passed through psycopg2 and consumed by the ``libpq`` library.
368* A common way to set up client encoding with PostgreSQL databases is to
369 ensure it is configured within the server-side postgresql.conf file;
370 this is the recommended way to set encoding for a server that is
371 consistently of one encoding in all databases::
373 # postgresql.conf file
375 # client_encoding = sql_ascii # actually, defaults to database
376 # encoding
377 client_encoding = utf8
379.. _psycopg2_disable_native_unicode:
381Disabling Native Unicode
382^^^^^^^^^^^^^^^^^^^^^^^^
384Under Python 2 only, SQLAlchemy can also be instructed to skip the usage of the
385psycopg2 ``UNICODE`` extension and to instead utilize its own unicode
386encode/decode services, which are normally reserved only for those DBAPIs that
387don't fully support unicode directly. Passing ``use_native_unicode=False`` to
388:func:`_sa.create_engine` will disable usage of ``psycopg2.extensions.
389UNICODE``. SQLAlchemy will instead encode data itself into Python bytestrings
390on the way in and coerce from bytes on the way back, using the value of the
391:func:`_sa.create_engine` ``encoding`` parameter, which defaults to ``utf-8``.
392SQLAlchemy's own unicode encode/decode functionality is steadily becoming
393obsolete as most DBAPIs now support unicode fully.
396Transactions
397------------
399The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations.
401.. _psycopg2_isolation_level:
403Psycopg2 Transaction Isolation Level
404-------------------------------------
406As discussed in :ref:`postgresql_isolation_level`,
407all PostgreSQL dialects support setting of transaction isolation level
408both via the ``isolation_level`` parameter passed to :func:`_sa.create_engine`
409,
410as well as the ``isolation_level`` argument used by
411:meth:`_engine.Connection.execution_options`. When using the psycopg2 dialect
412, these
413options make use of psycopg2's ``set_isolation_level()`` connection method,
414rather than emitting a PostgreSQL directive; this is because psycopg2's
415API-level setting is always emitted at the start of each transaction in any
416case.
418The psycopg2 dialect supports these constants for isolation level:
420* ``READ COMMITTED``
421* ``READ UNCOMMITTED``
422* ``REPEATABLE READ``
423* ``SERIALIZABLE``
424* ``AUTOCOMMIT``
426.. seealso::
428 :ref:`postgresql_isolation_level`
430 :ref:`pg8000_isolation_level`
433NOTICE logging
434---------------
436The psycopg2 dialect will log PostgreSQL NOTICE messages
437via the ``sqlalchemy.dialects.postgresql`` logger. When this logger
438is set to the ``logging.INFO`` level, notice messages will be logged::
440 import logging
442 logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO)
444Above, it is assumed that logging is configured externally. If this is not
445the case, configuration such as ``logging.basicConfig()`` must be utilized::
447 import logging
449 logging.basicConfig() # log messages to stdout
450 logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO)
452.. seealso::
454 `Logging HOWTO <https://docs.python.org/3/howto/logging.html>`_ - on the python.org website
456.. _psycopg2_hstore:
458HSTORE type
459------------
461The ``psycopg2`` DBAPI includes an extension to natively handle marshalling of
462the HSTORE type. The SQLAlchemy psycopg2 dialect will enable this extension
463by default when psycopg2 version 2.4 or greater is used, and
464it is detected that the target database has the HSTORE type set up for use.
465In other words, when the dialect makes the first
466connection, a sequence like the following is performed:
4681. Request the available HSTORE oids using
469 ``psycopg2.extras.HstoreAdapter.get_oids()``.
470 If this function returns a list of HSTORE identifiers, we then determine
471 that the ``HSTORE`` extension is present.
472 This function is **skipped** if the version of psycopg2 installed is
473 less than version 2.4.
4752. If the ``use_native_hstore`` flag is at its default of ``True``, and
476 we've detected that ``HSTORE`` oids are available, the
477 ``psycopg2.extensions.register_hstore()`` extension is invoked for all
478 connections.
480The ``register_hstore()`` extension has the effect of **all Python
481dictionaries being accepted as parameters regardless of the type of target
482column in SQL**. The dictionaries are converted by this extension into a
483textual HSTORE expression. If this behavior is not desired, disable the
484use of the hstore extension by setting ``use_native_hstore`` to ``False`` as
485follows::
487 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test",
488 use_native_hstore=False)
490The ``HSTORE`` type is **still supported** when the
491``psycopg2.extensions.register_hstore()`` extension is not used. It merely
492means that the coercion between Python dictionaries and the HSTORE
493string format, on both the parameter side and the result side, will take
494place within SQLAlchemy's own marshalling logic, and not that of ``psycopg2``
495which may be more performant.
497""" # noqa
498from __future__ import absolute_import
500import decimal
501import logging
502import re
503from uuid import UUID as _python_UUID
505from .array import ARRAY as PGARRAY
506from .base import _ColonCast
507from .base import _DECIMAL_TYPES
508from .base import _FLOAT_TYPES
509from .base import _INT_TYPES
510from .base import ENUM
511from .base import PGCompiler
512from .base import PGDialect
513from .base import PGExecutionContext
514from .base import PGIdentifierPreparer
515from .base import UUID
516from .hstore import HSTORE
517from .json import JSON
518from .json import JSONB
519from ... import exc
520from ... import processors
521from ... import types as sqltypes
522from ... import util
523from ...engine import cursor as _cursor
524from ...util import collections_abc
527logger = logging.getLogger("sqlalchemy.dialects.postgresql")
530class _PGNumeric(sqltypes.Numeric):
531 def bind_processor(self, dialect):
532 return None
534 def result_processor(self, dialect, coltype):
535 if self.asdecimal:
536 if coltype in _FLOAT_TYPES:
537 return processors.to_decimal_processor_factory(
538 decimal.Decimal, self._effective_decimal_return_scale
539 )
540 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
541 # pg8000 returns Decimal natively for 1700
542 return None
543 else:
544 raise exc.InvalidRequestError(
545 "Unknown PG numeric type: %d" % coltype
546 )
547 else:
548 if coltype in _FLOAT_TYPES:
549 # pg8000 returns float natively for 701
550 return None
551 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
552 return processors.to_float
553 else:
554 raise exc.InvalidRequestError(
555 "Unknown PG numeric type: %d" % coltype
556 )
559class _PGEnum(ENUM):
560 def result_processor(self, dialect, coltype):
561 if util.py2k and self._expect_unicode is True:
562 # for py2k, if the enum type needs unicode data (which is set up as
563 # part of the Enum() constructor based on values passed as py2k
564 # unicode objects) we have to use our own converters since
565 # psycopg2's don't work, a rare exception to the "modern DBAPIs
566 # support unicode everywhere" theme of deprecating
567 # convert_unicode=True. Use the special "force_nocheck" directive
568 # which forces unicode conversion to happen on the Python side
569 # without an isinstance() check. in py3k psycopg2 does the right
570 # thing automatically.
571 self._expect_unicode = "force_nocheck"
572 return super(_PGEnum, self).result_processor(dialect, coltype)
575class _PGHStore(HSTORE):
576 def bind_processor(self, dialect):
577 if dialect._has_native_hstore:
578 return None
579 else:
580 return super(_PGHStore, self).bind_processor(dialect)
582 def result_processor(self, dialect, coltype):
583 if dialect._has_native_hstore:
584 return None
585 else:
586 return super(_PGHStore, self).result_processor(dialect, coltype)
589class _PGARRAY(PGARRAY):
590 def bind_expression(self, bindvalue):
591 return _ColonCast(bindvalue, self)
594class _PGJSON(JSON):
595 def result_processor(self, dialect, coltype):
596 return None
599class _PGJSONB(JSONB):
600 def result_processor(self, dialect, coltype):
601 return None
604class _PGUUID(UUID):
605 def bind_processor(self, dialect):
606 if not self.as_uuid and dialect.use_native_uuid:
608 def process(value):
609 if value is not None:
610 value = _python_UUID(value)
611 return value
613 return process
615 def result_processor(self, dialect, coltype):
616 if not self.as_uuid and dialect.use_native_uuid:
618 def process(value):
619 if value is not None:
620 value = str(value)
621 return value
623 return process
626_server_side_id = util.counter()
629class PGExecutionContext_psycopg2(PGExecutionContext):
630 _psycopg2_fetched_rows = None
632 def create_server_side_cursor(self):
633 # use server-side cursors:
634 # https://lists.initd.org/pipermail/psycopg/2007-January/005251.html
635 ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:])
636 return self._dbapi_connection.cursor(ident)
638 def post_exec(self):
639 if (
640 self._psycopg2_fetched_rows
641 and self.compiled
642 and self.compiled.returning
643 ):
644 # psycopg2 execute_values will provide for a real cursor where
645 # cursor.description works correctly. however, it executes the
646 # INSERT statement multiple times for multiple pages of rows, so
647 # while this cursor also supports calling .fetchall() directly, in
648 # order to get the list of all rows inserted across multiple pages,
649 # we have to retrieve the aggregated list from the execute_values()
650 # function directly.
651 strat_cls = _cursor.FullyBufferedCursorFetchStrategy
652 self.cursor_fetch_strategy = strat_cls(
653 self.cursor, initial_buffer=self._psycopg2_fetched_rows
654 )
655 self._log_notices(self.cursor)
657 def _log_notices(self, cursor):
658 # check also that notices is an iterable, after it's already
659 # established that we will be iterating through it. This is to get
660 # around test suites such as SQLAlchemy's using a Mock object for
661 # cursor
662 if not cursor.connection.notices or not isinstance(
663 cursor.connection.notices, collections_abc.Iterable
664 ):
665 return
667 for notice in cursor.connection.notices:
668 # NOTICE messages have a
669 # newline character at the end
670 logger.info(notice.rstrip())
672 cursor.connection.notices[:] = []
675class PGCompiler_psycopg2(PGCompiler):
676 pass
679class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer):
680 pass
683EXECUTEMANY_PLAIN = util.symbol("executemany_plain", canonical=0)
684EXECUTEMANY_BATCH = util.symbol("executemany_batch", canonical=1)
685EXECUTEMANY_VALUES = util.symbol("executemany_values", canonical=2)
686EXECUTEMANY_VALUES_PLUS_BATCH = util.symbol(
687 "executemany_values_plus_batch",
688 canonical=EXECUTEMANY_BATCH | EXECUTEMANY_VALUES,
689)
692class PGDialect_psycopg2(PGDialect):
693 driver = "psycopg2"
695 supports_statement_cache = True
697 if util.py2k:
698 # turn off supports_unicode_statements for Python 2. psycopg2 supports
699 # unicode statements in Py2K. But! it does not support unicode *bound
700 # parameter names* because it uses the Python "%" operator to
701 # interpolate these into the string, and this fails. So for Py2K, we
702 # have to use full-on encoding for statements and parameters before
703 # passing to cursor.execute().
704 supports_unicode_statements = False
706 supports_server_side_cursors = True
708 default_paramstyle = "pyformat"
709 # set to true based on psycopg2 version
710 supports_sane_multi_rowcount = False
711 execution_ctx_cls = PGExecutionContext_psycopg2
712 statement_compiler = PGCompiler_psycopg2
713 preparer = PGIdentifierPreparer_psycopg2
714 psycopg2_version = (0, 0)
716 _has_native_hstore = True
718 engine_config_types = PGDialect.engine_config_types.union(
719 {"use_native_unicode": util.asbool}
720 )
722 colspecs = util.update_copy(
723 PGDialect.colspecs,
724 {
725 sqltypes.Numeric: _PGNumeric,
726 ENUM: _PGEnum, # needs force_unicode
727 sqltypes.Enum: _PGEnum, # needs force_unicode
728 HSTORE: _PGHStore,
729 JSON: _PGJSON,
730 sqltypes.JSON: _PGJSON,
731 JSONB: _PGJSONB,
732 UUID: _PGUUID,
733 sqltypes.ARRAY: _PGARRAY,
734 },
735 )
737 def __init__(
738 self,
739 use_native_unicode=True,
740 client_encoding=None,
741 use_native_hstore=True,
742 use_native_uuid=True,
743 executemany_mode="values_only",
744 executemany_batch_page_size=100,
745 executemany_values_page_size=1000,
746 **kwargs
747 ):
748 PGDialect.__init__(self, **kwargs)
749 self.use_native_unicode = use_native_unicode
750 if not use_native_unicode and not util.py2k:
751 raise exc.ArgumentError(
752 "psycopg2 native_unicode mode is required under Python 3"
753 )
754 if not use_native_hstore:
755 self._has_native_hstore = False
756 self.use_native_hstore = use_native_hstore
757 self.use_native_uuid = use_native_uuid
758 self.supports_unicode_binds = use_native_unicode
759 self.client_encoding = client_encoding
761 # Parse executemany_mode argument, allowing it to be only one of the
762 # symbol names
763 self.executemany_mode = util.symbol.parse_user_argument(
764 executemany_mode,
765 {
766 EXECUTEMANY_PLAIN: [None],
767 EXECUTEMANY_BATCH: ["batch"],
768 EXECUTEMANY_VALUES: ["values_only"],
769 EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch", "values"],
770 },
771 "executemany_mode",
772 )
774 if self.executemany_mode & EXECUTEMANY_VALUES:
775 self.insert_executemany_returning = True
777 self.executemany_batch_page_size = executemany_batch_page_size
778 self.executemany_values_page_size = executemany_values_page_size
780 if self.dbapi and hasattr(self.dbapi, "__version__"):
781 m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__)
782 if m:
783 self.psycopg2_version = tuple(
784 int(x) for x in m.group(1, 2, 3) if x is not None
785 )
787 if self.psycopg2_version < (2, 7):
788 raise ImportError(
789 "psycopg2 version 2.7 or higher is required."
790 )
792 def initialize(self, connection):
793 super(PGDialect_psycopg2, self).initialize(connection)
794 self._has_native_hstore = (
795 self.use_native_hstore
796 and self._hstore_oids(connection.connection) is not None
797 )
799 # PGDialect.initialize() checks server version for <= 8.2 and sets
800 # this flag to False if so
801 if not self.full_returning:
802 self.insert_executemany_returning = False
803 self.executemany_mode = EXECUTEMANY_PLAIN
805 self.supports_sane_multi_rowcount = not (
806 self.executemany_mode & EXECUTEMANY_BATCH
807 )
809 @classmethod
810 def dbapi(cls):
811 import psycopg2
813 return psycopg2
815 @classmethod
816 def _psycopg2_extensions(cls):
817 from psycopg2 import extensions
819 return extensions
821 @classmethod
822 def _psycopg2_extras(cls):
823 from psycopg2 import extras
825 return extras
827 @util.memoized_property
828 def _isolation_lookup(self):
829 extensions = self._psycopg2_extensions()
830 return {
831 "AUTOCOMMIT": extensions.ISOLATION_LEVEL_AUTOCOMMIT,
832 "READ COMMITTED": extensions.ISOLATION_LEVEL_READ_COMMITTED,
833 "READ UNCOMMITTED": extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
834 "REPEATABLE READ": extensions.ISOLATION_LEVEL_REPEATABLE_READ,
835 "SERIALIZABLE": extensions.ISOLATION_LEVEL_SERIALIZABLE,
836 }
838 def set_isolation_level(self, connection, level):
839 try:
840 level = self._isolation_lookup[level.replace("_", " ")]
841 except KeyError as err:
842 util.raise_(
843 exc.ArgumentError(
844 "Invalid value '%s' for isolation_level. "
845 "Valid isolation levels for %s are %s"
846 % (level, self.name, ", ".join(self._isolation_lookup))
847 ),
848 replace_context=err,
849 )
851 connection.set_isolation_level(level)
853 def set_readonly(self, connection, value):
854 connection.readonly = value
856 def get_readonly(self, connection):
857 return connection.readonly
859 def set_deferrable(self, connection, value):
860 connection.deferrable = value
862 def get_deferrable(self, connection):
863 return connection.deferrable
865 def do_ping(self, dbapi_connection):
866 cursor = None
867 before_autocommit = dbapi_connection.autocommit
868 try:
869 if not before_autocommit:
870 dbapi_connection.autocommit = True
871 cursor = dbapi_connection.cursor()
872 try:
873 cursor.execute(self._dialect_specific_select_one)
874 finally:
875 cursor.close()
876 if not before_autocommit and not dbapi_connection.closed:
877 dbapi_connection.autocommit = before_autocommit
878 except self.dbapi.Error as err:
879 if self.is_disconnect(err, dbapi_connection, cursor):
880 return False
881 else:
882 raise
883 else:
884 return True
886 def on_connect(self):
887 extras = self._psycopg2_extras()
888 extensions = self._psycopg2_extensions()
890 fns = []
891 if self.client_encoding is not None:
893 def on_connect(conn):
894 conn.set_client_encoding(self.client_encoding)
896 fns.append(on_connect)
898 if self.isolation_level is not None:
900 def on_connect(conn):
901 self.set_isolation_level(conn, self.isolation_level)
903 fns.append(on_connect)
905 if self.dbapi and self.use_native_uuid:
907 def on_connect(conn):
908 extras.register_uuid(None, conn)
910 fns.append(on_connect)
912 if util.py2k and self.dbapi and self.use_native_unicode:
914 def on_connect(conn):
915 extensions.register_type(extensions.UNICODE, conn)
916 extensions.register_type(extensions.UNICODEARRAY, conn)
918 fns.append(on_connect)
920 if self.dbapi and self.use_native_hstore:
922 def on_connect(conn):
923 hstore_oids = self._hstore_oids(conn)
924 if hstore_oids is not None:
925 oid, array_oid = hstore_oids
926 kw = {"oid": oid}
927 if util.py2k:
928 kw["unicode"] = True
929 kw["array_oid"] = array_oid
930 extras.register_hstore(conn, **kw)
932 fns.append(on_connect)
934 if self.dbapi and self._json_deserializer:
936 def on_connect(conn):
937 extras.register_default_json(
938 conn, loads=self._json_deserializer
939 )
940 extras.register_default_jsonb(
941 conn, loads=self._json_deserializer
942 )
944 fns.append(on_connect)
946 if fns:
948 def on_connect(conn):
949 for fn in fns:
950 fn(conn)
952 return on_connect
953 else:
954 return None
956 def do_executemany(self, cursor, statement, parameters, context=None):
957 if (
958 self.executemany_mode & EXECUTEMANY_VALUES
959 and context
960 and context.isinsert
961 and context.compiled._is_safe_for_fast_insert_values_helper
962 ):
963 executemany_values = (
964 "(%s)" % context.compiled.insert_single_values_expr
965 )
966 if not self.supports_unicode_statements:
967 executemany_values = executemany_values.encode(self.encoding)
969 # guard for statement that was altered via event hook or similar
970 if executemany_values not in statement:
971 executemany_values = None
972 else:
973 executemany_values = None
975 if executemany_values:
976 statement = statement.replace(executemany_values, "%s")
977 if self.executemany_values_page_size:
978 kwargs = {"page_size": self.executemany_values_page_size}
979 else:
980 kwargs = {}
981 xtras = self._psycopg2_extras()
982 context._psycopg2_fetched_rows = xtras.execute_values(
983 cursor,
984 statement,
985 parameters,
986 template=executemany_values,
987 fetch=bool(context.compiled.returning),
988 **kwargs
989 )
991 elif self.executemany_mode & EXECUTEMANY_BATCH:
992 if self.executemany_batch_page_size:
993 kwargs = {"page_size": self.executemany_batch_page_size}
994 else:
995 kwargs = {}
996 self._psycopg2_extras().execute_batch(
997 cursor, statement, parameters, **kwargs
998 )
999 else:
1000 cursor.executemany(statement, parameters)
1002 @util.memoized_instancemethod
1003 def _hstore_oids(self, conn):
1004 extras = self._psycopg2_extras()
1005 if hasattr(conn, "dbapi_connection"):
1006 conn = conn.dbapi_connection
1007 oids = extras.HstoreAdapter.get_oids(conn)
1008 if oids is not None and oids[0]:
1009 return oids[0:2]
1010 else:
1011 return None
1013 def create_connect_args(self, url):
1014 opts = url.translate_connect_args(username="user")
1016 is_multihost = False
1017 if "host" in url.query:
1018 is_multihost = isinstance(url.query["host"], (list, tuple))
1020 if opts or url.query:
1021 if not opts:
1022 opts = {}
1023 if "port" in opts:
1024 opts["port"] = int(opts["port"])
1025 opts.update(url.query)
1026 if is_multihost:
1027 hosts, ports = zip(
1028 *[
1029 token.split(":") if ":" in token else (token, "")
1030 for token in url.query["host"]
1031 ]
1032 )
1033 opts["host"] = ",".join(hosts)
1034 if "port" in opts:
1035 raise exc.ArgumentError(
1036 "Can't mix 'multihost' formats together; use "
1037 '"host=h1,h2,h3&port=p1,p2,p3" or '
1038 '"host=h1:p1&host=h2:p2&host=h3:p3" separately'
1039 )
1040 opts["port"] = ",".join(ports)
1041 return ([], opts)
1042 else:
1043 # no connection arguments whatsoever; psycopg2.connect()
1044 # requires that "dsn" be present as a blank string.
1045 return ([""], opts)
1047 def is_disconnect(self, e, connection, cursor):
1048 if isinstance(e, self.dbapi.Error):
1049 # check the "closed" flag. this might not be
1050 # present on old psycopg2 versions. Also,
1051 # this flag doesn't actually help in a lot of disconnect
1052 # situations, so don't rely on it.
1053 if getattr(connection, "closed", False):
1054 return True
1056 # checks based on strings. in the case that .closed
1057 # didn't cut it, fall back onto these.
1058 str_e = str(e).partition("\n")[0]
1059 for msg in [
1060 # these error messages from libpq: interfaces/libpq/fe-misc.c
1061 # and interfaces/libpq/fe-secure.c.
1062 "terminating connection",
1063 "closed the connection",
1064 "connection not open",
1065 "could not receive data from server",
1066 "could not send data to server",
1067 # psycopg2 client errors, psycopg2/connection.h,
1068 # psycopg2/cursor.h
1069 "connection already closed",
1070 "cursor already closed",
1071 # not sure where this path is originally from, it may
1072 # be obsolete. It really says "losed", not "closed".
1073 "losed the connection unexpectedly",
1074 # these can occur in newer SSL
1075 "connection has been closed unexpectedly",
1076 "SSL error: decryption failed or bad record mac",
1077 "SSL SYSCALL error: Bad file descriptor",
1078 "SSL SYSCALL error: EOF detected",
1079 "SSL SYSCALL error: Operation timed out",
1080 "SSL SYSCALL error: Bad address",
1081 ]:
1082 idx = str_e.find(msg)
1083 if idx >= 0 and '"' not in str_e[:idx]:
1084 return True
1085 return False
1088dialect = PGDialect_psycopg2