1# dialects/postgresql/psycopg.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9r"""
10.. dialect:: postgresql+psycopg
11 :name: psycopg (a.k.a. psycopg 3)
12 :dbapi: psycopg
13 :connectstring: postgresql+psycopg://user:password@host:port/dbname[?key=value&key=value...]
14 :url: https://pypi.org/project/psycopg/
15
16``psycopg`` is the package and module name for version 3 of the ``psycopg``
17database driver, formerly known as ``psycopg2``. This driver is different
18enough from its ``psycopg2`` predecessor that SQLAlchemy supports it
19via a totally separate dialect; support for ``psycopg2`` is expected to remain
20for as long as that package continues to function for modern Python versions,
21and also remains the default dialect for the ``postgresql://`` dialect
22series.
23
24The SQLAlchemy ``psycopg`` dialect provides both a sync and an async
25implementation under the same dialect name. The proper version is
26selected depending on how the engine is created:
27
28* calling :func:`_sa.create_engine` with ``postgresql+psycopg://...`` will
29 automatically select the sync version, e.g.::
30
31 from sqlalchemy import create_engine
32
33 sync_engine = create_engine(
34 "postgresql+psycopg://scott:tiger@localhost/test"
35 )
36
37* calling :func:`_asyncio.create_async_engine` with
38 ``postgresql+psycopg://...`` will automatically select the async version,
39 e.g.::
40
41 from sqlalchemy.ext.asyncio import create_async_engine
42
43 asyncio_engine = create_async_engine(
44 "postgresql+psycopg://scott:tiger@localhost/test"
45 )
46
47The asyncio version of the dialect may also be specified explicitly using the
48``psycopg_async`` suffix, as::
49
50 from sqlalchemy.ext.asyncio import create_async_engine
51
52 asyncio_engine = create_async_engine(
53 "postgresql+psycopg_async://scott:tiger@localhost/test"
54 )
55
56.. seealso::
57
58 :ref:`postgresql_psycopg2` - The SQLAlchemy ``psycopg``
59 dialect shares most of its behavior with the ``psycopg2`` dialect.
60 Further documentation is available there.
61
62Using a different Cursor class
63------------------------------
64
65One of the differences between ``psycopg`` and the older ``psycopg2``
66is how bound parameters are handled: ``psycopg2`` would bind them
67client side, while ``psycopg`` by default will bind them server side.
68
69It's possible to configure ``psycopg`` to do client side binding by
70specifying the ``cursor_factory`` to be ``ClientCursor`` when creating
71the engine::
72
73 from psycopg import ClientCursor
74
75 client_side_engine = create_engine(
76 "postgresql+psycopg://...",
77 connect_args={"cursor_factory": ClientCursor},
78 )
79
80Similarly when using an async engine the ``AsyncClientCursor`` can be
81specified::
82
83 from psycopg import AsyncClientCursor
84
85 client_side_engine = create_async_engine(
86 "postgresql+psycopg://...",
87 connect_args={"cursor_factory": AsyncClientCursor},
88 )
89
90.. seealso::
91
92 `Client-side-binding cursors <https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#client-side-binding-cursors>`_
93
94""" # noqa
95from __future__ import annotations
96
97from collections import deque
98import logging
99import re
100from typing import cast
101from typing import TYPE_CHECKING
102
103from . import ranges
104from ._psycopg_common import _PGDialect_common_psycopg
105from ._psycopg_common import _PGExecutionContext_common_psycopg
106from .base import INTERVAL
107from .base import PGCompiler
108from .base import PGIdentifierPreparer
109from .base import REGCONFIG
110from .json import JSON
111from .json import JSONB
112from .json import JSONPathType
113from .types import CITEXT
114from ... import pool
115from ... import util
116from ...engine import AdaptedConnection
117from ...sql import sqltypes
118from ...util.concurrency import await_fallback
119from ...util.concurrency import await_only
120
121if TYPE_CHECKING:
122 from typing import Iterable
123
124 from psycopg import AsyncConnection
125
126logger = logging.getLogger("sqlalchemy.dialects.postgresql")
127
128
129class _PGString(sqltypes.String):
130 render_bind_cast = True
131
132
133class _PGREGCONFIG(REGCONFIG):
134 render_bind_cast = True
135
136
137class _PGJSON(JSON):
138 def bind_processor(self, dialect):
139 return self._make_bind_processor(None, dialect._psycopg_Json)
140
141 def result_processor(self, dialect, coltype):
142 return None
143
144
145class _PGJSONB(JSONB):
146 def bind_processor(self, dialect):
147 return self._make_bind_processor(None, dialect._psycopg_Jsonb)
148
149 def result_processor(self, dialect, coltype):
150 return None
151
152
153class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType):
154 __visit_name__ = "json_int_index"
155
156 render_bind_cast = True
157
158
159class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType):
160 __visit_name__ = "json_str_index"
161
162 render_bind_cast = True
163
164
165class _PGJSONPathType(JSONPathType):
166 pass
167
168
169class _PGInterval(INTERVAL):
170 render_bind_cast = True
171
172
173class _PGTimeStamp(sqltypes.DateTime):
174 render_bind_cast = True
175
176
177class _PGDate(sqltypes.Date):
178 render_bind_cast = True
179
180
181class _PGTime(sqltypes.Time):
182 render_bind_cast = True
183
184
185class _PGInteger(sqltypes.Integer):
186 render_bind_cast = True
187
188
189class _PGSmallInteger(sqltypes.SmallInteger):
190 render_bind_cast = True
191
192
193class _PGNullType(sqltypes.NullType):
194 render_bind_cast = True
195
196
197class _PGBigInteger(sqltypes.BigInteger):
198 render_bind_cast = True
199
200
201class _PGBoolean(sqltypes.Boolean):
202 render_bind_cast = True
203
204
205class _PsycopgRange(ranges.AbstractSingleRangeImpl):
206 def bind_processor(self, dialect):
207 psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range
208
209 def to_range(value):
210 if isinstance(value, ranges.Range):
211 value = psycopg_Range(
212 value.lower, value.upper, value.bounds, value.empty
213 )
214 return value
215
216 return to_range
217
218 def result_processor(self, dialect, coltype):
219 def to_range(value):
220 if value is not None:
221 value = ranges.Range(
222 value._lower,
223 value._upper,
224 bounds=value._bounds if value._bounds else "[)",
225 empty=not value._bounds,
226 )
227 return value
228
229 return to_range
230
231
232class _PsycopgMultiRange(ranges.AbstractMultiRangeImpl):
233 def bind_processor(self, dialect):
234 psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range
235 psycopg_Multirange = cast(
236 PGDialect_psycopg, dialect
237 )._psycopg_Multirange
238
239 NoneType = type(None)
240
241 def to_range(value):
242 if isinstance(value, (str, NoneType, psycopg_Multirange)):
243 return value
244
245 return psycopg_Multirange(
246 [
247 psycopg_Range(
248 element.lower,
249 element.upper,
250 element.bounds,
251 element.empty,
252 )
253 for element in cast("Iterable[ranges.Range]", value)
254 ]
255 )
256
257 return to_range
258
259 def result_processor(self, dialect, coltype):
260 def to_range(value):
261 if value is None:
262 return None
263 else:
264 return ranges.MultiRange(
265 ranges.Range(
266 elem._lower,
267 elem._upper,
268 bounds=elem._bounds if elem._bounds else "[)",
269 empty=not elem._bounds,
270 )
271 for elem in value
272 )
273
274 return to_range
275
276
277class PGExecutionContext_psycopg(_PGExecutionContext_common_psycopg):
278 pass
279
280
281class PGCompiler_psycopg(PGCompiler):
282 pass
283
284
285class PGIdentifierPreparer_psycopg(PGIdentifierPreparer):
286 pass
287
288
289def _log_notices(diagnostic):
290 logger.info("%s: %s", diagnostic.severity, diagnostic.message_primary)
291
292
293class PGDialect_psycopg(_PGDialect_common_psycopg):
294 driver = "psycopg"
295
296 supports_statement_cache = True
297 supports_server_side_cursors = True
298 default_paramstyle = "pyformat"
299 supports_sane_multi_rowcount = True
300
301 execution_ctx_cls = PGExecutionContext_psycopg
302 statement_compiler = PGCompiler_psycopg
303 preparer = PGIdentifierPreparer_psycopg
304 psycopg_version = (0, 0)
305
306 _has_native_hstore = True
307 _psycopg_adapters_map = None
308
309 colspecs = util.update_copy(
310 _PGDialect_common_psycopg.colspecs,
311 {
312 sqltypes.String: _PGString,
313 REGCONFIG: _PGREGCONFIG,
314 JSON: _PGJSON,
315 CITEXT: CITEXT,
316 sqltypes.JSON: _PGJSON,
317 JSONB: _PGJSONB,
318 sqltypes.JSON.JSONPathType: _PGJSONPathType,
319 sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType,
320 sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType,
321 sqltypes.Interval: _PGInterval,
322 INTERVAL: _PGInterval,
323 sqltypes.Date: _PGDate,
324 sqltypes.DateTime: _PGTimeStamp,
325 sqltypes.Time: _PGTime,
326 sqltypes.Integer: _PGInteger,
327 sqltypes.SmallInteger: _PGSmallInteger,
328 sqltypes.BigInteger: _PGBigInteger,
329 ranges.AbstractSingleRange: _PsycopgRange,
330 ranges.AbstractMultiRange: _PsycopgMultiRange,
331 },
332 )
333
334 def __init__(self, **kwargs):
335 super().__init__(**kwargs)
336
337 if self.dbapi:
338 m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__)
339 if m:
340 self.psycopg_version = tuple(
341 int(x) for x in m.group(1, 2, 3) if x is not None
342 )
343
344 if self.psycopg_version < (3, 0, 2):
345 raise ImportError(
346 "psycopg version 3.0.2 or higher is required."
347 )
348
349 from psycopg.adapt import AdaptersMap
350
351 self._psycopg_adapters_map = adapters_map = AdaptersMap(
352 self.dbapi.adapters
353 )
354
355 if self._native_inet_types is False:
356 import psycopg.types.string
357
358 adapters_map.register_loader(
359 "inet", psycopg.types.string.TextLoader
360 )
361 adapters_map.register_loader(
362 "cidr", psycopg.types.string.TextLoader
363 )
364
365 if self._json_deserializer:
366 from psycopg.types.json import set_json_loads
367
368 set_json_loads(self._json_deserializer, adapters_map)
369
370 if self._json_serializer:
371 from psycopg.types.json import set_json_dumps
372
373 set_json_dumps(self._json_serializer, adapters_map)
374
375 def create_connect_args(self, url):
376 # see https://github.com/psycopg/psycopg/issues/83
377 cargs, cparams = super().create_connect_args(url)
378
379 if self._psycopg_adapters_map:
380 cparams["context"] = self._psycopg_adapters_map
381 if self.client_encoding is not None:
382 cparams["client_encoding"] = self.client_encoding
383 return cargs, cparams
384
385 def _type_info_fetch(self, connection, name):
386 from psycopg.types import TypeInfo
387
388 return TypeInfo.fetch(connection.connection.driver_connection, name)
389
390 def initialize(self, connection):
391 super().initialize(connection)
392
393 # PGDialect.initialize() checks server version for <= 8.2 and sets
394 # this flag to False if so
395 if not self.insert_returning:
396 self.insert_executemany_returning = False
397
398 # HSTORE can't be registered until we have a connection so that
399 # we can look up its OID, so we set up this adapter in
400 # initialize()
401 if self.use_native_hstore:
402 info = self._type_info_fetch(connection, "hstore")
403 self._has_native_hstore = info is not None
404 if self._has_native_hstore:
405 from psycopg.types.hstore import register_hstore
406
407 # register the adapter for connections made subsequent to
408 # this one
409 assert self._psycopg_adapters_map
410 register_hstore(info, self._psycopg_adapters_map)
411
412 # register the adapter for this connection
413 assert connection.connection
414 register_hstore(info, connection.connection.driver_connection)
415
416 @classmethod
417 def import_dbapi(cls):
418 import psycopg
419
420 return psycopg
421
422 @classmethod
423 def get_async_dialect_cls(cls, url):
424 return PGDialectAsync_psycopg
425
426 @util.memoized_property
427 def _isolation_lookup(self):
428 return {
429 "READ COMMITTED": self.dbapi.IsolationLevel.READ_COMMITTED,
430 "READ UNCOMMITTED": self.dbapi.IsolationLevel.READ_UNCOMMITTED,
431 "REPEATABLE READ": self.dbapi.IsolationLevel.REPEATABLE_READ,
432 "SERIALIZABLE": self.dbapi.IsolationLevel.SERIALIZABLE,
433 }
434
435 @util.memoized_property
436 def _psycopg_Json(self):
437 from psycopg.types import json
438
439 return json.Json
440
441 @util.memoized_property
442 def _psycopg_Jsonb(self):
443 from psycopg.types import json
444
445 return json.Jsonb
446
447 @util.memoized_property
448 def _psycopg_TransactionStatus(self):
449 from psycopg.pq import TransactionStatus
450
451 return TransactionStatus
452
453 @util.memoized_property
454 def _psycopg_Range(self):
455 from psycopg.types.range import Range
456
457 return Range
458
459 @util.memoized_property
460 def _psycopg_Multirange(self):
461 from psycopg.types.multirange import Multirange
462
463 return Multirange
464
465 def _do_isolation_level(self, connection, autocommit, isolation_level):
466 connection.autocommit = autocommit
467 connection.isolation_level = isolation_level
468
469 def get_isolation_level(self, dbapi_connection):
470 status_before = dbapi_connection.info.transaction_status
471 value = super().get_isolation_level(dbapi_connection)
472
473 # don't rely on psycopg providing enum symbols, compare with
474 # eq/ne
475 if status_before == self._psycopg_TransactionStatus.IDLE:
476 dbapi_connection.rollback()
477 return value
478
479 def set_isolation_level(self, dbapi_connection, level):
480 if level == "AUTOCOMMIT":
481 self._do_isolation_level(
482 dbapi_connection, autocommit=True, isolation_level=None
483 )
484 else:
485 self._do_isolation_level(
486 dbapi_connection,
487 autocommit=False,
488 isolation_level=self._isolation_lookup[level],
489 )
490
491 def set_readonly(self, connection, value):
492 connection.read_only = value
493
494 def get_readonly(self, connection):
495 return connection.read_only
496
497 def on_connect(self):
498 def notices(conn):
499 conn.add_notice_handler(_log_notices)
500
501 fns = [notices]
502
503 if self.isolation_level is not None:
504
505 def on_connect(conn):
506 self.set_isolation_level(conn, self.isolation_level)
507
508 fns.append(on_connect)
509
510 # fns always has the notices function
511 def on_connect(conn):
512 for fn in fns:
513 fn(conn)
514
515 return on_connect
516
517 def is_disconnect(self, e, connection, cursor):
518 if isinstance(e, self.dbapi.Error) and connection is not None:
519 if connection.closed or connection.broken:
520 return True
521 return False
522
523 def _do_prepared_twophase(self, connection, command, recover=False):
524 dbapi_conn = connection.connection.dbapi_connection
525 if (
526 recover
527 # don't rely on psycopg providing enum symbols, compare with
528 # eq/ne
529 or dbapi_conn.info.transaction_status
530 != self._psycopg_TransactionStatus.IDLE
531 ):
532 dbapi_conn.rollback()
533 before_autocommit = dbapi_conn.autocommit
534 try:
535 if not before_autocommit:
536 self._do_autocommit(dbapi_conn, True)
537 dbapi_conn.execute(command)
538 finally:
539 if not before_autocommit:
540 self._do_autocommit(dbapi_conn, before_autocommit)
541
542 def do_rollback_twophase(
543 self, connection, xid, is_prepared=True, recover=False
544 ):
545 if is_prepared:
546 self._do_prepared_twophase(
547 connection, f"ROLLBACK PREPARED '{xid}'", recover=recover
548 )
549 else:
550 self.do_rollback(connection.connection)
551
552 def do_commit_twophase(
553 self, connection, xid, is_prepared=True, recover=False
554 ):
555 if is_prepared:
556 self._do_prepared_twophase(
557 connection, f"COMMIT PREPARED '{xid}'", recover=recover
558 )
559 else:
560 self.do_commit(connection.connection)
561
562 @util.memoized_property
563 def _dialect_specific_select_one(self):
564 return ";"
565
566
567class AsyncAdapt_psycopg_cursor:
568 __slots__ = ("_cursor", "await_", "_rows")
569
570 _psycopg_ExecStatus = None
571
572 def __init__(self, cursor, await_) -> None:
573 self._cursor = cursor
574 self.await_ = await_
575 self._rows = deque()
576
577 def __getattr__(self, name):
578 return getattr(self._cursor, name)
579
580 @property
581 def arraysize(self):
582 return self._cursor.arraysize
583
584 @arraysize.setter
585 def arraysize(self, value):
586 self._cursor.arraysize = value
587
588 def close(self):
589 self._rows.clear()
590 # Normal cursor just call _close() in a non-sync way.
591 self._cursor._close()
592
593 def execute(self, query, params=None, **kw):
594 result = self.await_(self._cursor.execute(query, params, **kw))
595 # sqlalchemy result is not async, so need to pull all rows here
596 res = self._cursor.pgresult
597
598 # don't rely on psycopg providing enum symbols, compare with
599 # eq/ne
600 if res and res.status == self._psycopg_ExecStatus.TUPLES_OK:
601 rows = self.await_(self._cursor.fetchall())
602 self._rows = deque(rows)
603 return result
604
605 def executemany(self, query, params_seq):
606 return self.await_(self._cursor.executemany(query, params_seq))
607
608 def __iter__(self):
609 while self._rows:
610 yield self._rows.popleft()
611
612 def fetchone(self):
613 if self._rows:
614 return self._rows.popleft()
615 else:
616 return None
617
618 def fetchmany(self, size=None):
619 if size is None:
620 size = self._cursor.arraysize
621
622 rr = self._rows
623 return [rr.popleft() for _ in range(min(size, len(rr)))]
624
625 def fetchall(self):
626 retval = list(self._rows)
627 self._rows.clear()
628 return retval
629
630
631class AsyncAdapt_psycopg_ss_cursor(AsyncAdapt_psycopg_cursor):
632 def execute(self, query, params=None, **kw):
633 self.await_(self._cursor.execute(query, params, **kw))
634 return self
635
636 def close(self):
637 self.await_(self._cursor.close())
638
639 def fetchone(self):
640 return self.await_(self._cursor.fetchone())
641
642 def fetchmany(self, size=0):
643 return self.await_(self._cursor.fetchmany(size))
644
645 def fetchall(self):
646 return self.await_(self._cursor.fetchall())
647
648 def __iter__(self):
649 iterator = self._cursor.__aiter__()
650 while True:
651 try:
652 yield self.await_(iterator.__anext__())
653 except StopAsyncIteration:
654 break
655
656
657class AsyncAdapt_psycopg_connection(AdaptedConnection):
658 _connection: AsyncConnection
659 __slots__ = ()
660 await_ = staticmethod(await_only)
661
662 def __init__(self, connection) -> None:
663 self._connection = connection
664
665 def __getattr__(self, name):
666 return getattr(self._connection, name)
667
668 def execute(self, query, params=None, **kw):
669 cursor = self.await_(self._connection.execute(query, params, **kw))
670 return AsyncAdapt_psycopg_cursor(cursor, self.await_)
671
672 def cursor(self, *args, **kw):
673 cursor = self._connection.cursor(*args, **kw)
674 if hasattr(cursor, "name"):
675 return AsyncAdapt_psycopg_ss_cursor(cursor, self.await_)
676 else:
677 return AsyncAdapt_psycopg_cursor(cursor, self.await_)
678
679 def commit(self):
680 self.await_(self._connection.commit())
681
682 def rollback(self):
683 self.await_(self._connection.rollback())
684
685 def close(self):
686 self.await_(self._connection.close())
687
688 @property
689 def autocommit(self):
690 return self._connection.autocommit
691
692 @autocommit.setter
693 def autocommit(self, value):
694 self.set_autocommit(value)
695
696 def set_autocommit(self, value):
697 self.await_(self._connection.set_autocommit(value))
698
699 def set_isolation_level(self, value):
700 self.await_(self._connection.set_isolation_level(value))
701
702 def set_read_only(self, value):
703 self.await_(self._connection.set_read_only(value))
704
705 def set_deferrable(self, value):
706 self.await_(self._connection.set_deferrable(value))
707
708
709class AsyncAdaptFallback_psycopg_connection(AsyncAdapt_psycopg_connection):
710 __slots__ = ()
711 await_ = staticmethod(await_fallback)
712
713
714class PsycopgAdaptDBAPI:
715 def __init__(self, psycopg) -> None:
716 self.psycopg = psycopg
717
718 for k, v in self.psycopg.__dict__.items():
719 if k != "connect":
720 self.__dict__[k] = v
721
722 def connect(self, *arg, **kw):
723 async_fallback = kw.pop("async_fallback", False)
724 creator_fn = kw.pop(
725 "async_creator_fn", self.psycopg.AsyncConnection.connect
726 )
727 if util.asbool(async_fallback):
728 return AsyncAdaptFallback_psycopg_connection(
729 await_fallback(creator_fn(*arg, **kw))
730 )
731 else:
732 return AsyncAdapt_psycopg_connection(
733 await_only(creator_fn(*arg, **kw))
734 )
735
736
737class PGDialectAsync_psycopg(PGDialect_psycopg):
738 is_async = True
739 supports_statement_cache = True
740
741 @classmethod
742 def import_dbapi(cls):
743 import psycopg
744 from psycopg.pq import ExecStatus
745
746 AsyncAdapt_psycopg_cursor._psycopg_ExecStatus = ExecStatus
747
748 return PsycopgAdaptDBAPI(psycopg)
749
750 @classmethod
751 def get_pool_class(cls, url):
752 async_fallback = url.query.get("async_fallback", False)
753
754 if util.asbool(async_fallback):
755 return pool.FallbackAsyncAdaptedQueuePool
756 else:
757 return pool.AsyncAdaptedQueuePool
758
759 def _type_info_fetch(self, connection, name):
760 from psycopg.types import TypeInfo
761
762 adapted = connection.connection
763 return adapted.await_(TypeInfo.fetch(adapted.driver_connection, name))
764
765 def _do_isolation_level(self, connection, autocommit, isolation_level):
766 connection.set_autocommit(autocommit)
767 connection.set_isolation_level(isolation_level)
768
769 def _do_autocommit(self, connection, value):
770 connection.set_autocommit(value)
771
772 def set_readonly(self, connection, value):
773 connection.set_read_only(value)
774
775 def set_deferrable(self, connection, value):
776 connection.set_deferrable(value)
777
778 def get_driver_connection(self, connection):
779 return connection._connection
780
781
782dialect = PGDialect_psycopg
783dialect_async = PGDialectAsync_psycopg