1# dialects/postgresql/psycopg.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9r"""
10.. dialect:: postgresql+psycopg
11 :name: psycopg (a.k.a. psycopg 3)
12 :dbapi: psycopg
13 :connectstring: postgresql+psycopg://user:password@host:port/dbname[?key=value&key=value...]
14 :url: https://pypi.org/project/psycopg/
15
16``psycopg`` is the package and module name for version 3 of the ``psycopg``
17database driver, formerly known as ``psycopg2``. This driver is different
18enough from its ``psycopg2`` predecessor that SQLAlchemy supports it
19via a totally separate dialect; support for ``psycopg2`` is expected to remain
20for as long as that package continues to function for modern Python versions,
21and also remains the default dialect for the ``postgresql://`` dialect
22series.
23
24The SQLAlchemy ``psycopg`` dialect provides both a sync and an async
25implementation under the same dialect name. The proper version is
26selected depending on how the engine is created:
27
28* calling :func:`_sa.create_engine` with ``postgresql+psycopg://...`` will
29 automatically select the sync version, e.g.::
30
31 from sqlalchemy import create_engine
32
33 sync_engine = create_engine(
34 "postgresql+psycopg://scott:tiger@localhost/test"
35 )
36
37* calling :func:`_asyncio.create_async_engine` with
38 ``postgresql+psycopg://...`` will automatically select the async version,
39 e.g.::
40
41 from sqlalchemy.ext.asyncio import create_async_engine
42
43 asyncio_engine = create_async_engine(
44 "postgresql+psycopg://scott:tiger@localhost/test"
45 )
46
47The asyncio version of the dialect may also be specified explicitly using the
48``psycopg_async`` suffix, as::
49
50 from sqlalchemy.ext.asyncio import create_async_engine
51
52 asyncio_engine = create_async_engine(
53 "postgresql+psycopg_async://scott:tiger@localhost/test"
54 )
55
56.. seealso::
57
58 :ref:`postgresql_psycopg2` - The SQLAlchemy ``psycopg``
59 dialect shares most of its behavior with the ``psycopg2`` dialect.
60 Further documentation is available there.
61
62Using psycopg Connection Pooling
63--------------------------------
64
65The ``psycopg`` driver provides its own connection pool implementation that
66may be used in place of SQLAlchemy's pooling functionality.
67This pool implementation provides support for fixed and dynamic pool sizes
68(including automatic downsizing for unused connections), connection health
69pre-checks, and support for both synchronous and asynchronous code
70environments.
71
72Here is an example that uses the sync version of the pool, using
73``psycopg_pool >= 3.3`` that introduces support for ``close_returns=True``::
74
75 import psycopg_pool
76 from sqlalchemy import create_engine
77 from sqlalchemy.pool import NullPool
78
79 # Create a psycopg_pool connection pool
80 my_pool = psycopg_pool.ConnectionPool(
81 conninfo="postgresql://scott:tiger@localhost/test",
82 close_returns=True, # Return "closed" active connections to the pool
83 # ... other pool parameters as desired ...
84 )
85
86 # Create an engine that uses the connection pool to get a connection
87 engine = create_engine(
88 url="postgresql+psycopg://", # Only need the dialect now
89 poolclass=NullPool, # Disable SQLAlchemy's default connection pool
90 creator=my_pool.getconn, # Use Psycopg 3 connection pool to obtain connections
91 )
92
93Similarly an the async example::
94
95 import psycopg_pool
96 from sqlalchemy.ext.asyncio import create_async_engine
97 from sqlalchemy.pool import NullPool
98
99
100 async def define_engine():
101 # Create a psycopg_pool connection pool
102 my_pool = psycopg_pool.AsyncConnectionPool(
103 conninfo="postgresql://scott:tiger@localhost/test",
104 open=False, # See comment below
105 close_returns=True, # Return "closed" active connections to the pool
106 # ... other pool parameters as desired ...
107 )
108
109 # Must explicitly open AsyncConnectionPool outside constructor
110 # https://www.psycopg.org/psycopg3/docs/api/pool.html#psycopg_pool.AsyncConnectionPool
111 await my_pool.open()
112
113 # Create an engine that uses the connection pool to get a connection
114 engine = create_async_engine(
115 url="postgresql+psycopg://", # Only need the dialect now
116 poolclass=NullPool, # Disable SQLAlchemy's default connection pool
117 async_creator=my_pool.getconn, # Use Psycopg 3 connection pool to obtain connections
118 )
119
120 return engine, my_pool
121
122The resulting engine may then be used normally. Internally, Psycopg 3 handles
123connection pooling::
124
125 with engine.connect() as conn:
126 print(conn.scalar(text("select 42")))
127
128.. seealso::
129
130 `Connection pools <https://www.psycopg.org/psycopg3/docs/advanced/pool.html>`_ -
131 the Psycopg 3 documentation for ``psycopg_pool.ConnectionPool``.
132
133 `Example for older version of psycopg_pool
134 <https://github.com/sqlalchemy/sqlalchemy/discussions/12522#discussioncomment-13024666>`_ -
135 An example about using the ``psycopg_pool<3.3`` that did not have the
136 ``close_returns``` parameter.
137
138Using a different Cursor class
139------------------------------
140
141One of the differences between ``psycopg`` and the older ``psycopg2``
142is how bound parameters are handled: ``psycopg2`` would bind them
143client side, while ``psycopg`` by default will bind them server side.
144
145It's possible to configure ``psycopg`` to do client side binding by
146specifying the ``cursor_factory`` to be ``ClientCursor`` when creating
147the engine::
148
149 from psycopg import ClientCursor
150
151 client_side_engine = create_engine(
152 "postgresql+psycopg://...",
153 connect_args={"cursor_factory": ClientCursor},
154 )
155
156Similarly when using an async engine the ``AsyncClientCursor`` can be
157specified::
158
159 from psycopg import AsyncClientCursor
160
161 client_side_engine = create_async_engine(
162 "postgresql+psycopg://...",
163 connect_args={"cursor_factory": AsyncClientCursor},
164 )
165
166.. seealso::
167
168 `Client-side-binding cursors <https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#client-side-binding-cursors>`_
169
170""" # noqa
171
172from __future__ import annotations
173
174from collections import deque
175import logging
176import re
177from typing import cast
178from typing import TYPE_CHECKING
179
180from . import ranges
181from ._psycopg_common import _PGDialect_common_psycopg
182from ._psycopg_common import _PGExecutionContext_common_psycopg
183from .base import INTERVAL
184from .base import PGCompiler
185from .base import PGIdentifierPreparer
186from .base import REGCONFIG
187from .json import JSON
188from .json import JSONB
189from .json import JSONPathType
190from .types import CITEXT
191from ... import pool
192from ... import util
193from ...engine import AdaptedConnection
194from ...sql import sqltypes
195from ...util.concurrency import await_fallback
196from ...util.concurrency import await_only
197
198if TYPE_CHECKING:
199 from typing import Iterable
200
201 from psycopg import AsyncConnection
202
203logger = logging.getLogger("sqlalchemy.dialects.postgresql")
204
205
206class _PGString(sqltypes.String):
207 render_bind_cast = True
208
209
210class _PGREGCONFIG(REGCONFIG):
211 render_bind_cast = True
212
213
214class _PGJSON(JSON):
215 def bind_processor(self, dialect):
216 return self._make_bind_processor(None, dialect._psycopg_Json)
217
218 def result_processor(self, dialect, coltype):
219 return None
220
221
222class _PGJSONB(JSONB):
223 def bind_processor(self, dialect):
224 return self._make_bind_processor(None, dialect._psycopg_Jsonb)
225
226 def result_processor(self, dialect, coltype):
227 return None
228
229
230class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType):
231 __visit_name__ = "json_int_index"
232
233 render_bind_cast = True
234
235
236class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType):
237 __visit_name__ = "json_str_index"
238
239 render_bind_cast = True
240
241
242class _PGJSONPathType(JSONPathType):
243 pass
244
245
246class _PGInterval(INTERVAL):
247 render_bind_cast = True
248
249
250class _PGTimeStamp(sqltypes.DateTime):
251 render_bind_cast = True
252
253
254class _PGDate(sqltypes.Date):
255 render_bind_cast = True
256
257
258class _PGTime(sqltypes.Time):
259 render_bind_cast = True
260
261
262class _PGInteger(sqltypes.Integer):
263 render_bind_cast = True
264
265
266class _PGSmallInteger(sqltypes.SmallInteger):
267 render_bind_cast = True
268
269
270class _PGNullType(sqltypes.NullType):
271 render_bind_cast = True
272
273
274class _PGBigInteger(sqltypes.BigInteger):
275 render_bind_cast = True
276
277
278class _PGBoolean(sqltypes.Boolean):
279 render_bind_cast = True
280
281
282class _PsycopgRange(ranges.AbstractSingleRangeImpl):
283 def bind_processor(self, dialect):
284 psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range
285
286 def to_range(value):
287 if isinstance(value, ranges.Range):
288 value = psycopg_Range(
289 value.lower, value.upper, value.bounds, value.empty
290 )
291 return value
292
293 return to_range
294
295 def result_processor(self, dialect, coltype):
296 def to_range(value):
297 if value is not None:
298 value = ranges.Range(
299 value._lower,
300 value._upper,
301 bounds=value._bounds if value._bounds else "[)",
302 empty=not value._bounds,
303 )
304 return value
305
306 return to_range
307
308
309class _PsycopgMultiRange(ranges.AbstractMultiRangeImpl):
310 def bind_processor(self, dialect):
311 psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range
312 psycopg_Multirange = cast(
313 PGDialect_psycopg, dialect
314 )._psycopg_Multirange
315
316 NoneType = type(None)
317
318 def to_range(value):
319 if isinstance(value, (str, NoneType, psycopg_Multirange)):
320 return value
321
322 return psycopg_Multirange(
323 [
324 psycopg_Range(
325 element.lower,
326 element.upper,
327 element.bounds,
328 element.empty,
329 )
330 for element in cast("Iterable[ranges.Range]", value)
331 ]
332 )
333
334 return to_range
335
336 def result_processor(self, dialect, coltype):
337 def to_range(value):
338 if value is None:
339 return None
340 else:
341 return ranges.MultiRange(
342 ranges.Range(
343 elem._lower,
344 elem._upper,
345 bounds=elem._bounds if elem._bounds else "[)",
346 empty=not elem._bounds,
347 )
348 for elem in value
349 )
350
351 return to_range
352
353
354class PGExecutionContext_psycopg(_PGExecutionContext_common_psycopg):
355 pass
356
357
358class PGCompiler_psycopg(PGCompiler):
359 pass
360
361
362class PGIdentifierPreparer_psycopg(PGIdentifierPreparer):
363 pass
364
365
366def _log_notices(diagnostic):
367 logger.info("%s: %s", diagnostic.severity, diagnostic.message_primary)
368
369
370class PGDialect_psycopg(_PGDialect_common_psycopg):
371 driver = "psycopg"
372
373 supports_statement_cache = True
374 supports_server_side_cursors = True
375 default_paramstyle = "pyformat"
376 supports_sane_multi_rowcount = True
377
378 execution_ctx_cls = PGExecutionContext_psycopg
379 statement_compiler = PGCompiler_psycopg
380 preparer = PGIdentifierPreparer_psycopg
381 psycopg_version = (0, 0)
382
383 _has_native_hstore = True
384 _psycopg_adapters_map = None
385
386 colspecs = util.update_copy(
387 _PGDialect_common_psycopg.colspecs,
388 {
389 sqltypes.String: _PGString,
390 REGCONFIG: _PGREGCONFIG,
391 JSON: _PGJSON,
392 CITEXT: CITEXT,
393 sqltypes.JSON: _PGJSON,
394 JSONB: _PGJSONB,
395 sqltypes.JSON.JSONPathType: _PGJSONPathType,
396 sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType,
397 sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType,
398 sqltypes.Interval: _PGInterval,
399 INTERVAL: _PGInterval,
400 sqltypes.Date: _PGDate,
401 sqltypes.DateTime: _PGTimeStamp,
402 sqltypes.Time: _PGTime,
403 sqltypes.Integer: _PGInteger,
404 sqltypes.SmallInteger: _PGSmallInteger,
405 sqltypes.BigInteger: _PGBigInteger,
406 ranges.AbstractSingleRange: _PsycopgRange,
407 ranges.AbstractMultiRange: _PsycopgMultiRange,
408 },
409 )
410
411 def __init__(self, **kwargs):
412 super().__init__(**kwargs)
413
414 if self.dbapi:
415 m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__)
416 if m:
417 self.psycopg_version = tuple(
418 int(x) for x in m.group(1, 2, 3) if x is not None
419 )
420
421 if self.psycopg_version < (3, 0, 2):
422 raise ImportError(
423 "psycopg version 3.0.2 or higher is required."
424 )
425
426 from psycopg.adapt import AdaptersMap
427
428 self._psycopg_adapters_map = adapters_map = AdaptersMap(
429 self.dbapi.adapters
430 )
431
432 if self._native_inet_types is False:
433 import psycopg.types.string
434
435 adapters_map.register_loader(
436 "inet", psycopg.types.string.TextLoader
437 )
438 adapters_map.register_loader(
439 "cidr", psycopg.types.string.TextLoader
440 )
441
442 if self._json_deserializer:
443 from psycopg.types.json import set_json_loads
444
445 set_json_loads(self._json_deserializer, adapters_map)
446
447 if self._json_serializer:
448 from psycopg.types.json import set_json_dumps
449
450 set_json_dumps(self._json_serializer, adapters_map)
451
452 def create_connect_args(self, url):
453 # see https://github.com/psycopg/psycopg/issues/83
454 cargs, cparams = super().create_connect_args(url)
455
456 if self._psycopg_adapters_map:
457 cparams["context"] = self._psycopg_adapters_map
458 if self.client_encoding is not None:
459 cparams["client_encoding"] = self.client_encoding
460 return cargs, cparams
461
462 def _type_info_fetch(self, connection, name):
463 from psycopg.types import TypeInfo
464
465 return TypeInfo.fetch(connection.connection.driver_connection, name)
466
467 def initialize(self, connection):
468 super().initialize(connection)
469
470 # PGDialect.initialize() checks server version for <= 8.2 and sets
471 # this flag to False if so
472 if not self.insert_returning:
473 self.insert_executemany_returning = False
474
475 # HSTORE can't be registered until we have a connection so that
476 # we can look up its OID, so we set up this adapter in
477 # initialize()
478 if self.use_native_hstore:
479 info = self._type_info_fetch(connection, "hstore")
480 self._has_native_hstore = info is not None
481 if self._has_native_hstore:
482 from psycopg.types.hstore import register_hstore
483
484 # register the adapter for connections made subsequent to
485 # this one
486 assert self._psycopg_adapters_map
487 register_hstore(info, self._psycopg_adapters_map)
488
489 # register the adapter for this connection
490 assert connection.connection
491 register_hstore(info, connection.connection.driver_connection)
492
493 @classmethod
494 def import_dbapi(cls):
495 import psycopg
496
497 return psycopg
498
499 @classmethod
500 def get_async_dialect_cls(cls, url):
501 return PGDialectAsync_psycopg
502
503 @util.memoized_property
504 def _isolation_lookup(self):
505 return {
506 "READ COMMITTED": self.dbapi.IsolationLevel.READ_COMMITTED,
507 "READ UNCOMMITTED": self.dbapi.IsolationLevel.READ_UNCOMMITTED,
508 "REPEATABLE READ": self.dbapi.IsolationLevel.REPEATABLE_READ,
509 "SERIALIZABLE": self.dbapi.IsolationLevel.SERIALIZABLE,
510 }
511
512 @util.memoized_property
513 def _psycopg_Json(self):
514 from psycopg.types import json
515
516 return json.Json
517
518 @util.memoized_property
519 def _psycopg_Jsonb(self):
520 from psycopg.types import json
521
522 return json.Jsonb
523
524 @util.memoized_property
525 def _psycopg_TransactionStatus(self):
526 from psycopg.pq import TransactionStatus
527
528 return TransactionStatus
529
530 @util.memoized_property
531 def _psycopg_Range(self):
532 from psycopg.types.range import Range
533
534 return Range
535
536 @util.memoized_property
537 def _psycopg_Multirange(self):
538 from psycopg.types.multirange import Multirange
539
540 return Multirange
541
542 def _do_isolation_level(self, connection, autocommit, isolation_level):
543 connection.autocommit = autocommit
544 connection.isolation_level = isolation_level
545
546 def get_isolation_level(self, dbapi_connection):
547 status_before = dbapi_connection.info.transaction_status
548 value = super().get_isolation_level(dbapi_connection)
549
550 # don't rely on psycopg providing enum symbols, compare with
551 # eq/ne
552 if status_before == self._psycopg_TransactionStatus.IDLE:
553 dbapi_connection.rollback()
554 return value
555
556 def set_isolation_level(self, dbapi_connection, level):
557 if level == "AUTOCOMMIT":
558 self._do_isolation_level(
559 dbapi_connection, autocommit=True, isolation_level=None
560 )
561 else:
562 self._do_isolation_level(
563 dbapi_connection,
564 autocommit=False,
565 isolation_level=self._isolation_lookup[level],
566 )
567
568 def set_readonly(self, connection, value):
569 connection.read_only = value
570
571 def get_readonly(self, connection):
572 return connection.read_only
573
574 def on_connect(self):
575 def notices(conn):
576 conn.add_notice_handler(_log_notices)
577
578 fns = [notices]
579
580 if self.isolation_level is not None:
581
582 def on_connect(conn):
583 self.set_isolation_level(conn, self.isolation_level)
584
585 fns.append(on_connect)
586
587 # fns always has the notices function
588 def on_connect(conn):
589 for fn in fns:
590 fn(conn)
591
592 return on_connect
593
594 def is_disconnect(self, e, connection, cursor):
595 if isinstance(e, self.dbapi.Error) and connection is not None:
596 if connection.closed or connection.broken:
597 return True
598 return False
599
600 def _twophase_idle_check(self, dbapi_conn):
601 # don't rely on psycopg providing enum symbols, compare with eq/ne
602 return (
603 dbapi_conn.info.transaction_status
604 == self._psycopg_TransactionStatus.IDLE
605 )
606
607 @util.memoized_property
608 def _dialect_specific_select_one(self):
609 return ";"
610
611
612class AsyncAdapt_psycopg_cursor:
613 __slots__ = ("_cursor", "await_", "_rows")
614
615 _psycopg_ExecStatus = None
616
617 def __init__(self, cursor, await_) -> None:
618 self._cursor = cursor
619 self.await_ = await_
620 self._rows = deque()
621
622 def __getattr__(self, name):
623 return getattr(self._cursor, name)
624
625 @property
626 def arraysize(self):
627 return self._cursor.arraysize
628
629 @arraysize.setter
630 def arraysize(self, value):
631 self._cursor.arraysize = value
632
633 async def _async_soft_close(self) -> None:
634 return
635
636 def close(self):
637 self._rows.clear()
638 # Normal cursor just call _close() in a non-sync way.
639 self._cursor._close()
640
641 def execute(self, query, params=None, **kw):
642 result = self.await_(self._cursor.execute(query, params, **kw))
643 # sqlalchemy result is not async, so need to pull all rows here
644 res = self._cursor.pgresult
645
646 # don't rely on psycopg providing enum symbols, compare with
647 # eq/ne
648 if res and res.status == self._psycopg_ExecStatus.TUPLES_OK:
649 rows = self.await_(self._cursor.fetchall())
650 self._rows = deque(rows)
651 return result
652
653 def executemany(self, query, params_seq):
654 return self.await_(self._cursor.executemany(query, params_seq))
655
656 def __iter__(self):
657 while self._rows:
658 yield self._rows.popleft()
659
660 def fetchone(self):
661 if self._rows:
662 return self._rows.popleft()
663 else:
664 return None
665
666 def fetchmany(self, size=None):
667 if size is None:
668 size = self._cursor.arraysize
669
670 rr = self._rows
671 return [rr.popleft() for _ in range(min(size, len(rr)))]
672
673 def fetchall(self):
674 retval = list(self._rows)
675 self._rows.clear()
676 return retval
677
678
679class AsyncAdapt_psycopg_ss_cursor(AsyncAdapt_psycopg_cursor):
680 def execute(self, query, params=None, **kw):
681 self.await_(self._cursor.execute(query, params, **kw))
682 return self
683
684 def close(self):
685 self.await_(self._cursor.close())
686
687 def fetchone(self):
688 return self.await_(self._cursor.fetchone())
689
690 def fetchmany(self, size=0):
691 return self.await_(self._cursor.fetchmany(size))
692
693 def fetchall(self):
694 return self.await_(self._cursor.fetchall())
695
696 def __iter__(self):
697 iterator = self._cursor.__aiter__()
698 while True:
699 try:
700 yield self.await_(iterator.__anext__())
701 except StopAsyncIteration:
702 break
703
704
705class AsyncAdapt_psycopg_connection(AdaptedConnection):
706 _connection: AsyncConnection
707 __slots__ = ()
708 await_ = staticmethod(await_only)
709
710 def __init__(self, connection) -> None:
711 self._connection = connection
712
713 def __getattr__(self, name):
714 return getattr(self._connection, name)
715
716 def execute(self, query, params=None, **kw):
717 cursor = self.await_(self._connection.execute(query, params, **kw))
718 return AsyncAdapt_psycopg_cursor(cursor, self.await_)
719
720 def cursor(self, *args, **kw):
721 cursor = self._connection.cursor(*args, **kw)
722 if hasattr(cursor, "name"):
723 return AsyncAdapt_psycopg_ss_cursor(cursor, self.await_)
724 else:
725 return AsyncAdapt_psycopg_cursor(cursor, self.await_)
726
727 def commit(self):
728 self.await_(self._connection.commit())
729
730 def rollback(self):
731 self.await_(self._connection.rollback())
732
733 def close(self):
734 self.await_(self._connection.close())
735
736 @property
737 def autocommit(self):
738 return self._connection.autocommit
739
740 @autocommit.setter
741 def autocommit(self, value):
742 self.set_autocommit(value)
743
744 def set_autocommit(self, value):
745 self.await_(self._connection.set_autocommit(value))
746
747 def set_isolation_level(self, value):
748 self.await_(self._connection.set_isolation_level(value))
749
750 def set_read_only(self, value):
751 self.await_(self._connection.set_read_only(value))
752
753 def set_deferrable(self, value):
754 self.await_(self._connection.set_deferrable(value))
755
756 def tpc_begin(self, xid):
757 return self.await_(self._connection.tpc_begin(xid))
758
759 def tpc_prepare(self):
760 return self.await_(self._connection.tpc_prepare())
761
762 def tpc_commit(self, xid=None):
763 return self.await_(self._connection.tpc_commit(xid))
764
765 def tpc_rollback(self, xid=None):
766 return self.await_(self._connection.tpc_rollback(xid))
767
768 def tpc_recover(self):
769 return self.await_(self._connection.tpc_recover())
770
771
772class AsyncAdaptFallback_psycopg_connection(AsyncAdapt_psycopg_connection):
773 __slots__ = ()
774 await_ = staticmethod(await_fallback)
775
776
777class PsycopgAdaptDBAPI:
778 def __init__(self, psycopg) -> None:
779 self.psycopg = psycopg
780
781 for k, v in self.psycopg.__dict__.items():
782 if k != "connect":
783 self.__dict__[k] = v
784
785 def connect(self, *arg, **kw):
786 async_fallback = kw.pop("async_fallback", False)
787 creator_fn = kw.pop(
788 "async_creator_fn", self.psycopg.AsyncConnection.connect
789 )
790 if util.asbool(async_fallback):
791 return AsyncAdaptFallback_psycopg_connection(
792 await_fallback(creator_fn(*arg, **kw))
793 )
794 else:
795 return AsyncAdapt_psycopg_connection(
796 await_only(creator_fn(*arg, **kw))
797 )
798
799
800class PGDialectAsync_psycopg(PGDialect_psycopg):
801 is_async = True
802 supports_statement_cache = True
803
804 @classmethod
805 def import_dbapi(cls):
806 import psycopg
807 from psycopg.pq import ExecStatus
808
809 AsyncAdapt_psycopg_cursor._psycopg_ExecStatus = ExecStatus
810
811 return PsycopgAdaptDBAPI(psycopg)
812
813 @classmethod
814 def get_pool_class(cls, url):
815 async_fallback = url.query.get("async_fallback", False)
816
817 if util.asbool(async_fallback):
818 return pool.FallbackAsyncAdaptedQueuePool
819 else:
820 return pool.AsyncAdaptedQueuePool
821
822 def _type_info_fetch(self, connection, name):
823 from psycopg.types import TypeInfo
824
825 adapted = connection.connection
826 return adapted.await_(TypeInfo.fetch(adapted.driver_connection, name))
827
828 def _do_isolation_level(self, connection, autocommit, isolation_level):
829 connection.set_autocommit(autocommit)
830 connection.set_isolation_level(isolation_level)
831
832 def _do_autocommit(self, connection, value):
833 connection.set_autocommit(value)
834
835 def set_readonly(self, connection, value):
836 connection.set_read_only(value)
837
838 def set_deferrable(self, connection, value):
839 connection.set_deferrable(value)
840
841 def get_driver_connection(self, connection):
842 return connection._connection
843
844
845dialect = PGDialect_psycopg
846dialect_async = PGDialectAsync_psycopg