1# dialects/postgresql/psycopg2.py
2# Copyright (C) 2005-2024 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7r"""
8.. dialect:: postgresql+psycopg2
9 :name: psycopg2
10 :dbapi: psycopg2
11 :connectstring: postgresql+psycopg2://user:password@host:port/dbname[?key=value&key=value...]
12 :url: https://pypi.org/project/psycopg2/
13
14psycopg2 Connect Arguments
15--------------------------
16
17Keyword arguments that are specific to the SQLAlchemy psycopg2 dialect
18may be passed to :func:`_sa.create_engine()`, and include the following:
19
20
21* ``isolation_level``: This option, available for all PostgreSQL dialects,
22 includes the ``AUTOCOMMIT`` isolation level when using the psycopg2
23 dialect. This option sets the **default** isolation level for the
24 connection that is set immediately upon connection to the database before
25 the connection is pooled. This option is generally superseded by the more
26 modern :paramref:`_engine.Connection.execution_options.isolation_level`
27 execution option, detailed at :ref:`dbapi_autocommit`.
28
29 .. seealso::
30
31 :ref:`psycopg2_isolation_level`
32
33 :ref:`dbapi_autocommit`
34
35
36* ``client_encoding``: sets the client encoding in a libpq-agnostic way,
37 using psycopg2's ``set_client_encoding()`` method.
38
39 .. seealso::
40
41 :ref:`psycopg2_unicode`
42
43* ``use_native_unicode``: Under Python 2 only, this can be set to False to
44 disable the use of psycopg2's native Unicode support.
45
46 .. seealso::
47
48 :ref:`psycopg2_disable_native_unicode`
49
50
51* ``executemany_mode``, ``executemany_batch_page_size``,
52 ``executemany_values_page_size``: Allows use of psycopg2
53 extensions for optimizing "executemany"-style queries. See the referenced
54 section below for details.
55
56 .. seealso::
57
58 :ref:`psycopg2_executemany_mode`
59
60.. tip::
61
62 The above keyword arguments are **dialect** keyword arguments, meaning
63 that they are passed as explicit keyword arguments to :func:`_sa.create_engine()`::
64
65 engine = create_engine(
66 "postgresql+psycopg2://scott:tiger@localhost/test",
67 isolation_level="SERIALIZABLE",
68 )
69
70 These should not be confused with **DBAPI** connect arguments, which
71 are passed as part of the :paramref:`_sa.create_engine.connect_args`
72 dictionary and/or are passed in the URL query string, as detailed in
73 the section :ref:`custom_dbapi_args`.
74
75.. _psycopg2_ssl:
76
77SSL Connections
78---------------
79
80The psycopg2 module has a connection argument named ``sslmode`` for
81controlling its behavior regarding secure (SSL) connections. The default is
82``sslmode=prefer``; it will attempt an SSL connection and if that fails it
83will fall back to an unencrypted connection. ``sslmode=require`` may be used
84to ensure that only secure connections are established. Consult the
85psycopg2 / libpq documentation for further options that are available.
86
87Note that ``sslmode`` is specific to psycopg2 so it is included in the
88connection URI::
89
90 engine = sa.create_engine(
91 "postgresql+psycopg2://scott:tiger@192.168.0.199:5432/test?sslmode=require"
92 )
93
94Unix Domain Connections
95------------------------
96
97psycopg2 supports connecting via Unix domain connections. When the ``host``
98portion of the URL is omitted, SQLAlchemy passes ``None`` to psycopg2,
99which specifies Unix-domain communication rather than TCP/IP communication::
100
101 create_engine("postgresql+psycopg2://user:password@/dbname")
102
103By default, the socket file used is to connect to a Unix-domain socket
104in ``/tmp``, or whatever socket directory was specified when PostgreSQL
105was built. This value can be overridden by passing a pathname to psycopg2,
106using ``host`` as an additional keyword argument::
107
108 create_engine("postgresql+psycopg2://user:password@/dbname?host=/var/lib/postgresql")
109
110.. seealso::
111
112 `PQconnectdbParams \
113 <https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-PQCONNECTDBPARAMS>`_
114
115.. _psycopg2_multi_host:
116
117Specifying multiple fallback hosts
118-----------------------------------
119
120psycopg2 supports multiple connection points in the connection string.
121When the ``host`` parameter is used multiple times in the query section of
122the URL, SQLAlchemy will create a single string of the host and port
123information provided to make the connections. Tokens may consist of
124``host::port`` or just ``host``; in the latter case, the default port
125is selected by libpq. In the example below, three host connections
126are specified, for ``HostA::PortA``, ``HostB`` connecting to the default port,
127and ``HostC::PortC``::
128
129 create_engine(
130 "postgresql+psycopg2://user:password@/dbname?host=HostA:PortA&host=HostB&host=HostC:PortC"
131 )
132
133As an alternative, libpq query string format also may be used; this specifies
134``host`` and ``port`` as single query string arguments with comma-separated
135lists - the default port can be chosen by indicating an empty value
136in the comma separated list::
137
138 create_engine(
139 "postgresql+psycopg2://user:password@/dbname?host=HostA,HostB,HostC&port=PortA,,PortC"
140 )
141
142With either URL style, connections to each host is attempted based on a
143configurable strategy, which may be configured using the libpq
144``target_session_attrs`` parameter. Per libpq this defaults to ``any``
145which indicates a connection to each host is then attempted until a connection is successful.
146Other strategies include ``primary``, ``prefer-standby``, etc. The complete
147list is documented by PostgreSQL at
148`libpq connection strings <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING>`_.
149
150For example, to indicate two hosts using the ``primary`` strategy::
151
152 create_engine(
153 "postgresql+psycopg2://user:password@/dbname?host=HostA:PortA&host=HostB&host=HostC:PortC&target_session_attrs=primary"
154 )
155
156.. versionchanged:: 1.4.40 Port specification in psycopg2 multiple host format
157 is repaired, previously ports were not correctly interpreted in this context.
158 libpq comma-separated format is also now supported.
159
160.. versionadded:: 1.3.20 Support for multiple hosts in PostgreSQL connection
161 string.
162
163.. seealso::
164
165 `libpq connection strings <https://www.postgresql.org/docs/current/libpq-connect.html#LIBPQ-CONNSTRING>`_ - please refer
166 to this section in the libpq documentation for complete background on multiple host support.
167
168
169Empty DSN Connections / Environment Variable Connections
170---------------------------------------------------------
171
172The psycopg2 DBAPI can connect to PostgreSQL by passing an empty DSN to the
173libpq client library, which by default indicates to connect to a localhost
174PostgreSQL database that is open for "trust" connections. This behavior can be
175further tailored using a particular set of environment variables which are
176prefixed with ``PG_...``, which are consumed by ``libpq`` to take the place of
177any or all elements of the connection string.
178
179For this form, the URL can be passed without any elements other than the
180initial scheme::
181
182 engine = create_engine('postgresql+psycopg2://')
183
184In the above form, a blank "dsn" string is passed to the ``psycopg2.connect()``
185function which in turn represents an empty DSN passed to libpq.
186
187.. versionadded:: 1.3.2 support for parameter-less connections with psycopg2.
188
189.. seealso::
190
191 `Environment Variables\
192 <https://www.postgresql.org/docs/current/libpq-envars.html>`_ -
193 PostgreSQL documentation on how to use ``PG_...``
194 environment variables for connections.
195
196.. _psycopg2_execution_options:
197
198Per-Statement/Connection Execution Options
199-------------------------------------------
200
201The following DBAPI-specific options are respected when used with
202:meth:`_engine.Connection.execution_options`,
203:meth:`.Executable.execution_options`,
204:meth:`_query.Query.execution_options`,
205in addition to those not specific to DBAPIs:
206
207* ``isolation_level`` - Set the transaction isolation level for the lifespan
208 of a :class:`_engine.Connection` (can only be set on a connection,
209 not a statement
210 or query). See :ref:`psycopg2_isolation_level`.
211
212* ``stream_results`` - Enable or disable usage of psycopg2 server side
213 cursors - this feature makes use of "named" cursors in combination with
214 special result handling methods so that result rows are not fully buffered.
215 Defaults to False, meaning cursors are buffered by default.
216
217* ``max_row_buffer`` - when using ``stream_results``, an integer value that
218 specifies the maximum number of rows to buffer at a time. This is
219 interpreted by the :class:`.BufferedRowCursorResult`, and if omitted the
220 buffer will grow to ultimately store 1000 rows at a time.
221
222 .. versionchanged:: 1.4 The ``max_row_buffer`` size can now be greater than
223 1000, and the buffer will grow to that size.
224
225.. _psycopg2_batch_mode:
226
227.. _psycopg2_executemany_mode:
228
229Psycopg2 Fast Execution Helpers
230-------------------------------
231
232Modern versions of psycopg2 include a feature known as
233`Fast Execution Helpers \
234<https://initd.org/psycopg/docs/extras.html#fast-execution-helpers>`_, which
235have been shown in benchmarking to improve psycopg2's executemany()
236performance, primarily with INSERT statements, by multiple orders of magnitude.
237SQLAlchemy internally makes use of these extensions for ``executemany()`` style
238calls, which correspond to lists of parameters being passed to
239:meth:`_engine.Connection.execute` as detailed in :ref:`multiple parameter
240sets <tutorial_multiple_parameters>`. The ORM also uses this mode internally whenever
241possible.
242
243The two available extensions on the psycopg2 side are the ``execute_values()``
244and ``execute_batch()`` functions. The psycopg2 dialect defaults to using the
245``execute_values()`` extension for all qualifying INSERT statements.
246
247.. versionchanged:: 1.4 The psycopg2 dialect now defaults to a new mode
248 ``"values_only"`` for ``executemany_mode``, which allows an order of
249 magnitude performance improvement for INSERT statements, but does not
250 include "batch" mode for UPDATE and DELETE statements which removes the
251 ability of ``cursor.rowcount`` to function correctly.
252
253The use of these extensions is controlled by the ``executemany_mode`` flag
254which may be passed to :func:`_sa.create_engine`::
255
256 engine = create_engine(
257 "postgresql+psycopg2://scott:tiger@host/dbname",
258 executemany_mode='values_plus_batch')
259
260
261Possible options for ``executemany_mode`` include:
262
263* ``values_only`` - this is the default value. the psycopg2 execute_values()
264 extension is used for qualifying INSERT statements, which rewrites the INSERT
265 to include multiple VALUES clauses so that many parameter sets can be
266 inserted with one statement.
267
268 .. versionadded:: 1.4 Added ``"values_only"`` setting for ``executemany_mode``
269 which is also now the default.
270
271* ``None`` - No psycopg2 extensions are not used, and the usual
272 ``cursor.executemany()`` method is used when invoking statements with
273 multiple parameter sets.
274
275* ``'batch'`` - Uses ``psycopg2.extras.execute_batch`` for all qualifying
276 INSERT, UPDATE and DELETE statements, so that multiple copies
277 of a SQL query, each one corresponding to a parameter set passed to
278 ``executemany()``, are joined into a single SQL string separated by a
279 semicolon. When using this mode, the :attr:`_engine.CursorResult.rowcount`
280 attribute will not contain a value for executemany-style executions.
281
282* ``'values_plus_batch'``- ``execute_values`` is used for qualifying INSERT
283 statements, ``execute_batch`` is used for UPDATE and DELETE.
284 When using this mode, the :attr:`_engine.CursorResult.rowcount`
285 attribute will not contain a value for executemany-style executions against
286 UPDATE and DELETE statements.
287
288By "qualifying statements", we mean that the statement being executed
289must be a Core :func:`_expression.insert`, :func:`_expression.update`
290or :func:`_expression.delete` construct, and not a plain textual SQL
291string or one constructed using :func:`_expression.text`. When using the
292ORM, all insert/update/delete statements used by the ORM flush process
293are qualifying.
294
295The "page size" for the "values" and "batch" strategies can be affected
296by using the ``executemany_batch_page_size`` and
297``executemany_values_page_size`` engine parameters. These
298control how many parameter sets
299should be represented in each execution. The "values" page size defaults
300to 1000, which is different that psycopg2's default. The "batch" page
301size defaults to 100. These can be affected by passing new values to
302:func:`_engine.create_engine`::
303
304 engine = create_engine(
305 "postgresql+psycopg2://scott:tiger@host/dbname",
306 executemany_mode='values',
307 executemany_values_page_size=10000, executemany_batch_page_size=500)
308
309.. versionchanged:: 1.4
310
311 The default for ``executemany_values_page_size`` is now 1000, up from
312 100.
313
314.. seealso::
315
316 :ref:`tutorial_multiple_parameters` - General information on using the
317 :class:`_engine.Connection`
318 object to execute statements in such a way as to make
319 use of the DBAPI ``.executemany()`` method.
320
321
322.. _psycopg2_unicode:
323
324Unicode with Psycopg2
325----------------------
326
327The psycopg2 DBAPI driver supports Unicode data transparently. Under Python 2
328only, the SQLAlchemy psycopg2 dialect will enable the
329``psycopg2.extensions.UNICODE`` extension by default to ensure Unicode is
330handled properly; under Python 3, this is psycopg2's default behavior.
331
332The client character encoding can be controlled for the psycopg2 dialect
333in the following ways:
334
335* For PostgreSQL 9.1 and above, the ``client_encoding`` parameter may be
336 passed in the database URL; this parameter is consumed by the underlying
337 ``libpq`` PostgreSQL client library::
338
339 engine = create_engine("postgresql+psycopg2://user:pass@host/dbname?client_encoding=utf8")
340
341 Alternatively, the above ``client_encoding`` value may be passed using
342 :paramref:`_sa.create_engine.connect_args` for programmatic establishment with
343 ``libpq``::
344
345 engine = create_engine(
346 "postgresql+psycopg2://user:pass@host/dbname",
347 connect_args={'client_encoding': 'utf8'}
348 )
349
350* For all PostgreSQL versions, psycopg2 supports a client-side encoding
351 value that will be passed to database connections when they are first
352 established. The SQLAlchemy psycopg2 dialect supports this using the
353 ``client_encoding`` parameter passed to :func:`_sa.create_engine`::
354
355 engine = create_engine(
356 "postgresql+psycopg2://user:pass@host/dbname",
357 client_encoding="utf8"
358 )
359
360 .. tip:: The above ``client_encoding`` parameter admittedly is very similar
361 in appearance to usage of the parameter within the
362 :paramref:`_sa.create_engine.connect_args` dictionary; the difference
363 above is that the parameter is consumed by psycopg2 and is
364 passed to the database connection using ``SET client_encoding TO
365 'utf8'``; in the previously mentioned style, the parameter is instead
366 passed through psycopg2 and consumed by the ``libpq`` library.
367
368* A common way to set up client encoding with PostgreSQL databases is to
369 ensure it is configured within the server-side postgresql.conf file;
370 this is the recommended way to set encoding for a server that is
371 consistently of one encoding in all databases::
372
373 # postgresql.conf file
374
375 # client_encoding = sql_ascii # actually, defaults to database
376 # encoding
377 client_encoding = utf8
378
379.. _psycopg2_disable_native_unicode:
380
381Disabling Native Unicode
382^^^^^^^^^^^^^^^^^^^^^^^^
383
384Under Python 2 only, SQLAlchemy can also be instructed to skip the usage of the
385psycopg2 ``UNICODE`` extension and to instead utilize its own unicode
386encode/decode services, which are normally reserved only for those DBAPIs that
387don't fully support unicode directly. Passing ``use_native_unicode=False`` to
388:func:`_sa.create_engine` will disable usage of ``psycopg2.extensions.
389UNICODE``. SQLAlchemy will instead encode data itself into Python bytestrings
390on the way in and coerce from bytes on the way back, using the value of the
391:func:`_sa.create_engine` ``encoding`` parameter, which defaults to ``utf-8``.
392SQLAlchemy's own unicode encode/decode functionality is steadily becoming
393obsolete as most DBAPIs now support unicode fully.
394
395
396Transactions
397------------
398
399The psycopg2 dialect fully supports SAVEPOINT and two-phase commit operations.
400
401.. _psycopg2_isolation_level:
402
403Psycopg2 Transaction Isolation Level
404-------------------------------------
405
406As discussed in :ref:`postgresql_isolation_level`,
407all PostgreSQL dialects support setting of transaction isolation level
408both via the ``isolation_level`` parameter passed to :func:`_sa.create_engine`
409,
410as well as the ``isolation_level`` argument used by
411:meth:`_engine.Connection.execution_options`. When using the psycopg2 dialect
412, these
413options make use of psycopg2's ``set_isolation_level()`` connection method,
414rather than emitting a PostgreSQL directive; this is because psycopg2's
415API-level setting is always emitted at the start of each transaction in any
416case.
417
418The psycopg2 dialect supports these constants for isolation level:
419
420* ``READ COMMITTED``
421* ``READ UNCOMMITTED``
422* ``REPEATABLE READ``
423* ``SERIALIZABLE``
424* ``AUTOCOMMIT``
425
426.. seealso::
427
428 :ref:`postgresql_isolation_level`
429
430 :ref:`pg8000_isolation_level`
431
432
433NOTICE logging
434---------------
435
436The psycopg2 dialect will log PostgreSQL NOTICE messages
437via the ``sqlalchemy.dialects.postgresql`` logger. When this logger
438is set to the ``logging.INFO`` level, notice messages will be logged::
439
440 import logging
441
442 logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO)
443
444Above, it is assumed that logging is configured externally. If this is not
445the case, configuration such as ``logging.basicConfig()`` must be utilized::
446
447 import logging
448
449 logging.basicConfig() # log messages to stdout
450 logging.getLogger('sqlalchemy.dialects.postgresql').setLevel(logging.INFO)
451
452.. seealso::
453
454 `Logging HOWTO <https://docs.python.org/3/howto/logging.html>`_ - on the python.org website
455
456.. _psycopg2_hstore:
457
458HSTORE type
459------------
460
461The ``psycopg2`` DBAPI includes an extension to natively handle marshalling of
462the HSTORE type. The SQLAlchemy psycopg2 dialect will enable this extension
463by default when psycopg2 version 2.4 or greater is used, and
464it is detected that the target database has the HSTORE type set up for use.
465In other words, when the dialect makes the first
466connection, a sequence like the following is performed:
467
4681. Request the available HSTORE oids using
469 ``psycopg2.extras.HstoreAdapter.get_oids()``.
470 If this function returns a list of HSTORE identifiers, we then determine
471 that the ``HSTORE`` extension is present.
472 This function is **skipped** if the version of psycopg2 installed is
473 less than version 2.4.
474
4752. If the ``use_native_hstore`` flag is at its default of ``True``, and
476 we've detected that ``HSTORE`` oids are available, the
477 ``psycopg2.extensions.register_hstore()`` extension is invoked for all
478 connections.
479
480The ``register_hstore()`` extension has the effect of **all Python
481dictionaries being accepted as parameters regardless of the type of target
482column in SQL**. The dictionaries are converted by this extension into a
483textual HSTORE expression. If this behavior is not desired, disable the
484use of the hstore extension by setting ``use_native_hstore`` to ``False`` as
485follows::
486
487 engine = create_engine("postgresql+psycopg2://scott:tiger@localhost/test",
488 use_native_hstore=False)
489
490The ``HSTORE`` type is **still supported** when the
491``psycopg2.extensions.register_hstore()`` extension is not used. It merely
492means that the coercion between Python dictionaries and the HSTORE
493string format, on both the parameter side and the result side, will take
494place within SQLAlchemy's own marshalling logic, and not that of ``psycopg2``
495which may be more performant.
496
497""" # noqa
498from __future__ import absolute_import
499
500import decimal
501import logging
502import re
503from uuid import UUID as _python_UUID
504
505from .array import ARRAY as PGARRAY
506from .base import _ColonCast
507from .base import _DECIMAL_TYPES
508from .base import _FLOAT_TYPES
509from .base import _INT_TYPES
510from .base import ENUM
511from .base import PGCompiler
512from .base import PGDialect
513from .base import PGExecutionContext
514from .base import PGIdentifierPreparer
515from .base import UUID
516from .hstore import HSTORE
517from .json import JSON
518from .json import JSONB
519from ... import exc
520from ... import processors
521from ... import types as sqltypes
522from ... import util
523from ...engine import cursor as _cursor
524from ...util import collections_abc
525
526
527logger = logging.getLogger("sqlalchemy.dialects.postgresql")
528
529
530class _PGNumeric(sqltypes.Numeric):
531 def bind_processor(self, dialect):
532 return None
533
534 def result_processor(self, dialect, coltype):
535 if self.asdecimal:
536 if coltype in _FLOAT_TYPES:
537 return processors.to_decimal_processor_factory(
538 decimal.Decimal, self._effective_decimal_return_scale
539 )
540 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
541 # pg8000 returns Decimal natively for 1700
542 return None
543 else:
544 raise exc.InvalidRequestError(
545 "Unknown PG numeric type: %d" % coltype
546 )
547 else:
548 if coltype in _FLOAT_TYPES:
549 # pg8000 returns float natively for 701
550 return None
551 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
552 return processors.to_float
553 else:
554 raise exc.InvalidRequestError(
555 "Unknown PG numeric type: %d" % coltype
556 )
557
558
559class _PGEnum(ENUM):
560 def result_processor(self, dialect, coltype):
561 if util.py2k and self._expect_unicode is True:
562 # for py2k, if the enum type needs unicode data (which is set up as
563 # part of the Enum() constructor based on values passed as py2k
564 # unicode objects) we have to use our own converters since
565 # psycopg2's don't work, a rare exception to the "modern DBAPIs
566 # support unicode everywhere" theme of deprecating
567 # convert_unicode=True. Use the special "force_nocheck" directive
568 # which forces unicode conversion to happen on the Python side
569 # without an isinstance() check. in py3k psycopg2 does the right
570 # thing automatically.
571 self._expect_unicode = "force_nocheck"
572 return super(_PGEnum, self).result_processor(dialect, coltype)
573
574
575class _PGHStore(HSTORE):
576 def bind_processor(self, dialect):
577 if dialect._has_native_hstore:
578 return None
579 else:
580 return super(_PGHStore, self).bind_processor(dialect)
581
582 def result_processor(self, dialect, coltype):
583 if dialect._has_native_hstore:
584 return None
585 else:
586 return super(_PGHStore, self).result_processor(dialect, coltype)
587
588
589class _PGARRAY(PGARRAY):
590 def bind_expression(self, bindvalue):
591 return _ColonCast(bindvalue, self)
592
593
594class _PGJSON(JSON):
595 def result_processor(self, dialect, coltype):
596 return None
597
598
599class _PGJSONB(JSONB):
600 def result_processor(self, dialect, coltype):
601 return None
602
603
604class _PGUUID(UUID):
605 def bind_processor(self, dialect):
606 if not self.as_uuid and dialect.use_native_uuid:
607
608 def process(value):
609 if value is not None:
610 value = _python_UUID(value)
611 return value
612
613 return process
614
615 def result_processor(self, dialect, coltype):
616 if not self.as_uuid and dialect.use_native_uuid:
617
618 def process(value):
619 if value is not None:
620 value = str(value)
621 return value
622
623 return process
624
625
626_server_side_id = util.counter()
627
628
629class PGExecutionContext_psycopg2(PGExecutionContext):
630 _psycopg2_fetched_rows = None
631
632 def create_server_side_cursor(self):
633 # use server-side cursors:
634 # https://lists.initd.org/pipermail/psycopg/2007-January/005251.html
635 ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:])
636 return self._dbapi_connection.cursor(ident)
637
638 def post_exec(self):
639 if (
640 self._psycopg2_fetched_rows
641 and self.compiled
642 and self.compiled.returning
643 ):
644 # psycopg2 execute_values will provide for a real cursor where
645 # cursor.description works correctly. however, it executes the
646 # INSERT statement multiple times for multiple pages of rows, so
647 # while this cursor also supports calling .fetchall() directly, in
648 # order to get the list of all rows inserted across multiple pages,
649 # we have to retrieve the aggregated list from the execute_values()
650 # function directly.
651 strat_cls = _cursor.FullyBufferedCursorFetchStrategy
652 self.cursor_fetch_strategy = strat_cls(
653 self.cursor, initial_buffer=self._psycopg2_fetched_rows
654 )
655 self._log_notices(self.cursor)
656
657 def _log_notices(self, cursor):
658 # check also that notices is an iterable, after it's already
659 # established that we will be iterating through it. This is to get
660 # around test suites such as SQLAlchemy's using a Mock object for
661 # cursor
662 if not cursor.connection.notices or not isinstance(
663 cursor.connection.notices, collections_abc.Iterable
664 ):
665 return
666
667 for notice in cursor.connection.notices:
668 # NOTICE messages have a
669 # newline character at the end
670 logger.info(notice.rstrip())
671
672 cursor.connection.notices[:] = []
673
674
675class PGCompiler_psycopg2(PGCompiler):
676 pass
677
678
679class PGIdentifierPreparer_psycopg2(PGIdentifierPreparer):
680 pass
681
682
683EXECUTEMANY_PLAIN = util.symbol("executemany_plain", canonical=0)
684EXECUTEMANY_BATCH = util.symbol("executemany_batch", canonical=1)
685EXECUTEMANY_VALUES = util.symbol("executemany_values", canonical=2)
686EXECUTEMANY_VALUES_PLUS_BATCH = util.symbol(
687 "executemany_values_plus_batch",
688 canonical=EXECUTEMANY_BATCH | EXECUTEMANY_VALUES,
689)
690
691
692class PGDialect_psycopg2(PGDialect):
693 driver = "psycopg2"
694
695 supports_statement_cache = True
696
697 if util.py2k:
698 # turn off supports_unicode_statements for Python 2. psycopg2 supports
699 # unicode statements in Py2K. But! it does not support unicode *bound
700 # parameter names* because it uses the Python "%" operator to
701 # interpolate these into the string, and this fails. So for Py2K, we
702 # have to use full-on encoding for statements and parameters before
703 # passing to cursor.execute().
704 supports_unicode_statements = False
705
706 supports_server_side_cursors = True
707
708 default_paramstyle = "pyformat"
709 # set to true based on psycopg2 version
710 supports_sane_multi_rowcount = False
711 execution_ctx_cls = PGExecutionContext_psycopg2
712 statement_compiler = PGCompiler_psycopg2
713 preparer = PGIdentifierPreparer_psycopg2
714 psycopg2_version = (0, 0)
715
716 _has_native_hstore = True
717
718 engine_config_types = PGDialect.engine_config_types.union(
719 {"use_native_unicode": util.asbool}
720 )
721
722 colspecs = util.update_copy(
723 PGDialect.colspecs,
724 {
725 sqltypes.Numeric: _PGNumeric,
726 ENUM: _PGEnum, # needs force_unicode
727 sqltypes.Enum: _PGEnum, # needs force_unicode
728 HSTORE: _PGHStore,
729 JSON: _PGJSON,
730 sqltypes.JSON: _PGJSON,
731 JSONB: _PGJSONB,
732 UUID: _PGUUID,
733 sqltypes.ARRAY: _PGARRAY,
734 },
735 )
736
737 def __init__(
738 self,
739 use_native_unicode=True,
740 client_encoding=None,
741 use_native_hstore=True,
742 use_native_uuid=True,
743 executemany_mode="values_only",
744 executemany_batch_page_size=100,
745 executemany_values_page_size=1000,
746 **kwargs
747 ):
748 PGDialect.__init__(self, **kwargs)
749 self.use_native_unicode = use_native_unicode
750 if not use_native_unicode and not util.py2k:
751 raise exc.ArgumentError(
752 "psycopg2 native_unicode mode is required under Python 3"
753 )
754 if not use_native_hstore:
755 self._has_native_hstore = False
756 self.use_native_hstore = use_native_hstore
757 self.use_native_uuid = use_native_uuid
758 self.supports_unicode_binds = use_native_unicode
759 self.client_encoding = client_encoding
760
761 # Parse executemany_mode argument, allowing it to be only one of the
762 # symbol names
763 self.executemany_mode = util.symbol.parse_user_argument(
764 executemany_mode,
765 {
766 EXECUTEMANY_PLAIN: [None],
767 EXECUTEMANY_BATCH: ["batch"],
768 EXECUTEMANY_VALUES: ["values_only"],
769 EXECUTEMANY_VALUES_PLUS_BATCH: ["values_plus_batch", "values"],
770 },
771 "executemany_mode",
772 )
773
774 if self.executemany_mode & EXECUTEMANY_VALUES:
775 self.insert_executemany_returning = True
776
777 self.executemany_batch_page_size = executemany_batch_page_size
778 self.executemany_values_page_size = executemany_values_page_size
779
780 if self.dbapi and hasattr(self.dbapi, "__version__"):
781 m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__)
782 if m:
783 self.psycopg2_version = tuple(
784 int(x) for x in m.group(1, 2, 3) if x is not None
785 )
786
787 if self.psycopg2_version < (2, 7):
788 raise ImportError(
789 "psycopg2 version 2.7 or higher is required."
790 )
791
792 def initialize(self, connection):
793 super(PGDialect_psycopg2, self).initialize(connection)
794 self._has_native_hstore = (
795 self.use_native_hstore
796 and self._hstore_oids(connection.connection) is not None
797 )
798
799 # PGDialect.initialize() checks server version for <= 8.2 and sets
800 # this flag to False if so
801 if not self.full_returning:
802 self.insert_executemany_returning = False
803 self.executemany_mode = EXECUTEMANY_PLAIN
804
805 self.supports_sane_multi_rowcount = not (
806 self.executemany_mode & EXECUTEMANY_BATCH
807 )
808
809 @classmethod
810 def dbapi(cls):
811 import psycopg2
812
813 return psycopg2
814
815 @classmethod
816 def _psycopg2_extensions(cls):
817 from psycopg2 import extensions
818
819 return extensions
820
821 @classmethod
822 def _psycopg2_extras(cls):
823 from psycopg2 import extras
824
825 return extras
826
827 @util.memoized_property
828 def _isolation_lookup(self):
829 extensions = self._psycopg2_extensions()
830 return {
831 "AUTOCOMMIT": extensions.ISOLATION_LEVEL_AUTOCOMMIT,
832 "READ COMMITTED": extensions.ISOLATION_LEVEL_READ_COMMITTED,
833 "READ UNCOMMITTED": extensions.ISOLATION_LEVEL_READ_UNCOMMITTED,
834 "REPEATABLE READ": extensions.ISOLATION_LEVEL_REPEATABLE_READ,
835 "SERIALIZABLE": extensions.ISOLATION_LEVEL_SERIALIZABLE,
836 }
837
838 def set_isolation_level(self, connection, level):
839 try:
840 level = self._isolation_lookup[level.replace("_", " ")]
841 except KeyError as err:
842 util.raise_(
843 exc.ArgumentError(
844 "Invalid value '%s' for isolation_level. "
845 "Valid isolation levels for %s are %s"
846 % (level, self.name, ", ".join(self._isolation_lookup))
847 ),
848 replace_context=err,
849 )
850
851 connection.set_isolation_level(level)
852
853 def set_readonly(self, connection, value):
854 connection.readonly = value
855
856 def get_readonly(self, connection):
857 return connection.readonly
858
859 def set_deferrable(self, connection, value):
860 connection.deferrable = value
861
862 def get_deferrable(self, connection):
863 return connection.deferrable
864
865 def do_ping(self, dbapi_connection):
866 cursor = None
867 before_autocommit = dbapi_connection.autocommit
868 try:
869 if not before_autocommit:
870 dbapi_connection.autocommit = True
871 cursor = dbapi_connection.cursor()
872 try:
873 cursor.execute(self._dialect_specific_select_one)
874 finally:
875 cursor.close()
876 if not before_autocommit and not dbapi_connection.closed:
877 dbapi_connection.autocommit = before_autocommit
878 except self.dbapi.Error as err:
879 if self.is_disconnect(err, dbapi_connection, cursor):
880 return False
881 else:
882 raise
883 else:
884 return True
885
886 def on_connect(self):
887 extras = self._psycopg2_extras()
888 extensions = self._psycopg2_extensions()
889
890 fns = []
891 if self.client_encoding is not None:
892
893 def on_connect(conn):
894 conn.set_client_encoding(self.client_encoding)
895
896 fns.append(on_connect)
897
898 if self.isolation_level is not None:
899
900 def on_connect(conn):
901 self.set_isolation_level(conn, self.isolation_level)
902
903 fns.append(on_connect)
904
905 if self.dbapi and self.use_native_uuid:
906
907 def on_connect(conn):
908 extras.register_uuid(None, conn)
909
910 fns.append(on_connect)
911
912 if util.py2k and self.dbapi and self.use_native_unicode:
913
914 def on_connect(conn):
915 extensions.register_type(extensions.UNICODE, conn)
916 extensions.register_type(extensions.UNICODEARRAY, conn)
917
918 fns.append(on_connect)
919
920 if self.dbapi and self.use_native_hstore:
921
922 def on_connect(conn):
923 hstore_oids = self._hstore_oids(conn)
924 if hstore_oids is not None:
925 oid, array_oid = hstore_oids
926 kw = {"oid": oid}
927 if util.py2k:
928 kw["unicode"] = True
929 kw["array_oid"] = array_oid
930 extras.register_hstore(conn, **kw)
931
932 fns.append(on_connect)
933
934 if self.dbapi and self._json_deserializer:
935
936 def on_connect(conn):
937 extras.register_default_json(
938 conn, loads=self._json_deserializer
939 )
940 extras.register_default_jsonb(
941 conn, loads=self._json_deserializer
942 )
943
944 fns.append(on_connect)
945
946 if fns:
947
948 def on_connect(conn):
949 for fn in fns:
950 fn(conn)
951
952 return on_connect
953 else:
954 return None
955
956 def do_executemany(self, cursor, statement, parameters, context=None):
957 if (
958 self.executemany_mode & EXECUTEMANY_VALUES
959 and context
960 and context.isinsert
961 and context.compiled._is_safe_for_fast_insert_values_helper
962 ):
963 executemany_values = (
964 "(%s)" % context.compiled.insert_single_values_expr
965 )
966 if not self.supports_unicode_statements:
967 executemany_values = executemany_values.encode(self.encoding)
968
969 # guard for statement that was altered via event hook or similar
970 if executemany_values not in statement:
971 executemany_values = None
972 else:
973 executemany_values = None
974
975 if executemany_values:
976 statement = statement.replace(executemany_values, "%s")
977 if self.executemany_values_page_size:
978 kwargs = {"page_size": self.executemany_values_page_size}
979 else:
980 kwargs = {}
981 xtras = self._psycopg2_extras()
982 context._psycopg2_fetched_rows = xtras.execute_values(
983 cursor,
984 statement,
985 parameters,
986 template=executemany_values,
987 fetch=bool(context.compiled.returning),
988 **kwargs
989 )
990
991 elif self.executemany_mode & EXECUTEMANY_BATCH:
992 if self.executemany_batch_page_size:
993 kwargs = {"page_size": self.executemany_batch_page_size}
994 else:
995 kwargs = {}
996 self._psycopg2_extras().execute_batch(
997 cursor, statement, parameters, **kwargs
998 )
999 else:
1000 cursor.executemany(statement, parameters)
1001
1002 @util.memoized_instancemethod
1003 def _hstore_oids(self, conn):
1004 extras = self._psycopg2_extras()
1005 if hasattr(conn, "dbapi_connection"):
1006 conn = conn.dbapi_connection
1007 oids = extras.HstoreAdapter.get_oids(conn)
1008 if oids is not None and oids[0]:
1009 return oids[0:2]
1010 else:
1011 return None
1012
1013 def create_connect_args(self, url):
1014 opts = url.translate_connect_args(username="user")
1015
1016 is_multihost = False
1017 if "host" in url.query:
1018 is_multihost = isinstance(url.query["host"], (list, tuple))
1019
1020 if opts or url.query:
1021 if not opts:
1022 opts = {}
1023 if "port" in opts:
1024 opts["port"] = int(opts["port"])
1025 opts.update(url.query)
1026 if is_multihost:
1027 hosts, ports = zip(
1028 *[
1029 token.split(":") if ":" in token else (token, "")
1030 for token in url.query["host"]
1031 ]
1032 )
1033 opts["host"] = ",".join(hosts)
1034 if "port" in opts:
1035 raise exc.ArgumentError(
1036 "Can't mix 'multihost' formats together; use "
1037 '"host=h1,h2,h3&port=p1,p2,p3" or '
1038 '"host=h1:p1&host=h2:p2&host=h3:p3" separately'
1039 )
1040 opts["port"] = ",".join(ports)
1041 return ([], opts)
1042 else:
1043 # no connection arguments whatsoever; psycopg2.connect()
1044 # requires that "dsn" be present as a blank string.
1045 return ([""], opts)
1046
1047 def is_disconnect(self, e, connection, cursor):
1048 if isinstance(e, self.dbapi.Error):
1049 # check the "closed" flag. this might not be
1050 # present on old psycopg2 versions. Also,
1051 # this flag doesn't actually help in a lot of disconnect
1052 # situations, so don't rely on it.
1053 if getattr(connection, "closed", False):
1054 return True
1055
1056 # checks based on strings. in the case that .closed
1057 # didn't cut it, fall back onto these.
1058 str_e = str(e).partition("\n")[0]
1059 for msg in [
1060 # these error messages from libpq: interfaces/libpq/fe-misc.c
1061 # and interfaces/libpq/fe-secure.c.
1062 "terminating connection",
1063 "closed the connection",
1064 "connection not open",
1065 "could not receive data from server",
1066 "could not send data to server",
1067 # psycopg2 client errors, psycopg2/connection.h,
1068 # psycopg2/cursor.h
1069 "connection already closed",
1070 "cursor already closed",
1071 # not sure where this path is originally from, it may
1072 # be obsolete. It really says "losed", not "closed".
1073 "losed the connection unexpectedly",
1074 # these can occur in newer SSL
1075 "connection has been closed unexpectedly",
1076 "SSL error: decryption failed or bad record mac",
1077 "SSL SYSCALL error: Bad file descriptor",
1078 "SSL SYSCALL error: EOF detected",
1079 "SSL SYSCALL error: Operation timed out",
1080 "SSL SYSCALL error: Bad address",
1081 ]:
1082 idx = str_e.find(msg)
1083 if idx >= 0 and '"' not in str_e[:idx]:
1084 return True
1085 return False
1086
1087
1088dialect = PGDialect_psycopg2