1# dialects/postgresql/psycopg.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9r"""
10.. dialect:: postgresql+psycopg
11 :name: psycopg (a.k.a. psycopg 3)
12 :dbapi: psycopg
13 :connectstring: postgresql+psycopg://user:password@host:port/dbname[?key=value&key=value...]
14 :url: https://pypi.org/project/psycopg/
15
16``psycopg`` is the package and module name for version 3 of the ``psycopg``
17database driver, formerly known as ``psycopg2``. This driver is different
18enough from its ``psycopg2`` predecessor that SQLAlchemy supports it
19via a totally separate dialect; support for ``psycopg2`` is expected to remain
20for as long as that package continues to function for modern Python versions,
21and also remains the default dialect for the ``postgresql://`` dialect
22series.
23
24The SQLAlchemy ``psycopg`` dialect provides both a sync and an async
25implementation under the same dialect name. The proper version is
26selected depending on how the engine is created:
27
28* calling :func:`_sa.create_engine` with ``postgresql+psycopg://...`` will
29 automatically select the sync version, e.g.::
30
31 from sqlalchemy import create_engine
32
33 sync_engine = create_engine(
34 "postgresql+psycopg://scott:tiger@localhost/test"
35 )
36
37* calling :func:`_asyncio.create_async_engine` with
38 ``postgresql+psycopg://...`` will automatically select the async version,
39 e.g.::
40
41 from sqlalchemy.ext.asyncio import create_async_engine
42
43 asyncio_engine = create_async_engine(
44 "postgresql+psycopg://scott:tiger@localhost/test"
45 )
46
47The asyncio version of the dialect may also be specified explicitly using the
48``psycopg_async`` suffix, as::
49
50 from sqlalchemy.ext.asyncio import create_async_engine
51
52 asyncio_engine = create_async_engine(
53 "postgresql+psycopg_async://scott:tiger@localhost/test"
54 )
55
56.. seealso::
57
58 :ref:`postgresql_psycopg2` - The SQLAlchemy ``psycopg``
59 dialect shares most of its behavior with the ``psycopg2`` dialect.
60 Further documentation is available there.
61
62Using psycopg Connection Pooling
63--------------------------------
64
65The ``psycopg`` driver provides its own connection pool implementation that
66may be used in place of SQLAlchemy's pooling functionality.
67This pool implementation provides support for fixed and dynamic pool sizes
68(including automatic downsizing for unused connections), connection health
69pre-checks, and support for both synchronous and asynchronous code
70environments.
71
72Here is an example that uses the sync version of the pool, using
73``psycopg_pool >= 3.3`` that introduces support for ``close_returns=True``::
74
75 import psycopg_pool
76 from sqlalchemy import create_engine
77 from sqlalchemy.pool import NullPool
78
79 # Create a psycopg_pool connection pool
80 my_pool = psycopg_pool.ConnectionPool(
81 conninfo="postgresql://scott:tiger@localhost/test",
82 close_returns=True, # Return "closed" active connections to the pool
83 # ... other pool parameters as desired ...
84 )
85
86 # Create an engine that uses the connection pool to get a connection
87 engine = create_engine(
88 url="postgresql+psycopg://", # Only need the dialect now
89 poolclass=NullPool, # Disable SQLAlchemy's default connection pool
90 creator=my_pool.getconn, # Use Psycopg 3 connection pool to obtain connections
91 )
92
93Similarly an the async example::
94
95 import psycopg_pool
96 from sqlalchemy.ext.asyncio import create_async_engine
97 from sqlalchemy.pool import NullPool
98
99
100 async def define_engine():
101 # Create a psycopg_pool connection pool
102 my_pool = psycopg_pool.AsyncConnectionPool(
103 conninfo="postgresql://scott:tiger@localhost/test",
104 open=False, # See comment below
105 close_returns=True, # Return "closed" active connections to the pool
106 # ... other pool parameters as desired ...
107 )
108
109 # Must explicitly open AsyncConnectionPool outside constructor
110 # https://www.psycopg.org/psycopg3/docs/api/pool.html#psycopg_pool.AsyncConnectionPool
111 await my_pool.open()
112
113 # Create an engine that uses the connection pool to get a connection
114 engine = create_async_engine(
115 url="postgresql+psycopg://", # Only need the dialect now
116 poolclass=NullPool, # Disable SQLAlchemy's default connection pool
117 async_creator=my_pool.getconn, # Use Psycopg 3 connection pool to obtain connections
118 )
119
120 return engine, my_pool
121
122The resulting engine may then be used normally. Internally, Psycopg 3 handles
123connection pooling::
124
125 with engine.connect() as conn:
126 print(conn.scalar(text("select 42")))
127
128.. seealso::
129
130 `Connection pools <https://www.psycopg.org/psycopg3/docs/advanced/pool.html>`_ -
131 the Psycopg 3 documentation for ``psycopg_pool.ConnectionPool``.
132
133 `Example for older version of psycopg_pool
134 <https://github.com/sqlalchemy/sqlalchemy/discussions/12522#discussioncomment-13024666>`_ -
135 An example about using the ``psycopg_pool<3.3`` that did not have the
136 ``close_returns``` parameter.
137
138Using a different Cursor class
139------------------------------
140
141One of the differences between ``psycopg`` and the older ``psycopg2``
142is how bound parameters are handled: ``psycopg2`` would bind them
143client side, while ``psycopg`` by default will bind them server side.
144
145It's possible to configure ``psycopg`` to do client side binding by
146specifying the ``cursor_factory`` to be ``ClientCursor`` when creating
147the engine::
148
149 from psycopg import ClientCursor
150
151 client_side_engine = create_engine(
152 "postgresql+psycopg://...",
153 connect_args={"cursor_factory": ClientCursor},
154 )
155
156Similarly when using an async engine the ``AsyncClientCursor`` can be
157specified::
158
159 from psycopg import AsyncClientCursor
160
161 client_side_engine = create_async_engine(
162 "postgresql+psycopg://...",
163 connect_args={"cursor_factory": AsyncClientCursor},
164 )
165
166.. seealso::
167
168 `Client-side-binding cursors <https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#client-side-binding-cursors>`_
169
170""" # noqa
171from __future__ import annotations
172
173from collections import deque
174import logging
175import re
176from typing import cast
177from typing import TYPE_CHECKING
178
179from . import ranges
180from ._psycopg_common import _PGDialect_common_psycopg
181from ._psycopg_common import _PGExecutionContext_common_psycopg
182from .base import INTERVAL
183from .base import PGCompiler
184from .base import PGIdentifierPreparer
185from .base import REGCONFIG
186from .json import JSON
187from .json import JSONB
188from .json import JSONPathType
189from .types import CITEXT
190from ... import pool
191from ... import util
192from ...engine import AdaptedConnection
193from ...sql import sqltypes
194from ...util.concurrency import await_fallback
195from ...util.concurrency import await_only
196
197if TYPE_CHECKING:
198 from typing import Iterable
199
200 from psycopg import AsyncConnection
201
202logger = logging.getLogger("sqlalchemy.dialects.postgresql")
203
204
205class _PGString(sqltypes.String):
206 render_bind_cast = True
207
208
209class _PGREGCONFIG(REGCONFIG):
210 render_bind_cast = True
211
212
213class _PGJSON(JSON):
214 def bind_processor(self, dialect):
215 return self._make_bind_processor(None, dialect._psycopg_Json)
216
217 def result_processor(self, dialect, coltype):
218 return None
219
220
221class _PGJSONB(JSONB):
222 def bind_processor(self, dialect):
223 return self._make_bind_processor(None, dialect._psycopg_Jsonb)
224
225 def result_processor(self, dialect, coltype):
226 return None
227
228
229class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType):
230 __visit_name__ = "json_int_index"
231
232 render_bind_cast = True
233
234
235class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType):
236 __visit_name__ = "json_str_index"
237
238 render_bind_cast = True
239
240
241class _PGJSONPathType(JSONPathType):
242 pass
243
244
245class _PGInterval(INTERVAL):
246 render_bind_cast = True
247
248
249class _PGTimeStamp(sqltypes.DateTime):
250 render_bind_cast = True
251
252
253class _PGDate(sqltypes.Date):
254 render_bind_cast = True
255
256
257class _PGTime(sqltypes.Time):
258 render_bind_cast = True
259
260
261class _PGInteger(sqltypes.Integer):
262 render_bind_cast = True
263
264
265class _PGSmallInteger(sqltypes.SmallInteger):
266 render_bind_cast = True
267
268
269class _PGNullType(sqltypes.NullType):
270 render_bind_cast = True
271
272
273class _PGBigInteger(sqltypes.BigInteger):
274 render_bind_cast = True
275
276
277class _PGBoolean(sqltypes.Boolean):
278 render_bind_cast = True
279
280
281class _PsycopgRange(ranges.AbstractSingleRangeImpl):
282 def bind_processor(self, dialect):
283 psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range
284
285 def to_range(value):
286 if isinstance(value, ranges.Range):
287 value = psycopg_Range(
288 value.lower, value.upper, value.bounds, value.empty
289 )
290 return value
291
292 return to_range
293
294 def result_processor(self, dialect, coltype):
295 def to_range(value):
296 if value is not None:
297 value = ranges.Range(
298 value._lower,
299 value._upper,
300 bounds=value._bounds if value._bounds else "[)",
301 empty=not value._bounds,
302 )
303 return value
304
305 return to_range
306
307
308class _PsycopgMultiRange(ranges.AbstractMultiRangeImpl):
309 def bind_processor(self, dialect):
310 psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range
311 psycopg_Multirange = cast(
312 PGDialect_psycopg, dialect
313 )._psycopg_Multirange
314
315 NoneType = type(None)
316
317 def to_range(value):
318 if isinstance(value, (str, NoneType, psycopg_Multirange)):
319 return value
320
321 return psycopg_Multirange(
322 [
323 psycopg_Range(
324 element.lower,
325 element.upper,
326 element.bounds,
327 element.empty,
328 )
329 for element in cast("Iterable[ranges.Range]", value)
330 ]
331 )
332
333 return to_range
334
335 def result_processor(self, dialect, coltype):
336 def to_range(value):
337 if value is None:
338 return None
339 else:
340 return ranges.MultiRange(
341 ranges.Range(
342 elem._lower,
343 elem._upper,
344 bounds=elem._bounds if elem._bounds else "[)",
345 empty=not elem._bounds,
346 )
347 for elem in value
348 )
349
350 return to_range
351
352
353class PGExecutionContext_psycopg(_PGExecutionContext_common_psycopg):
354 pass
355
356
357class PGCompiler_psycopg(PGCompiler):
358 pass
359
360
361class PGIdentifierPreparer_psycopg(PGIdentifierPreparer):
362 pass
363
364
365def _log_notices(diagnostic):
366 logger.info("%s: %s", diagnostic.severity, diagnostic.message_primary)
367
368
369class PGDialect_psycopg(_PGDialect_common_psycopg):
370 driver = "psycopg"
371
372 supports_statement_cache = True
373 supports_server_side_cursors = True
374 default_paramstyle = "pyformat"
375 supports_sane_multi_rowcount = True
376
377 execution_ctx_cls = PGExecutionContext_psycopg
378 statement_compiler = PGCompiler_psycopg
379 preparer = PGIdentifierPreparer_psycopg
380 psycopg_version = (0, 0)
381
382 _has_native_hstore = True
383 _psycopg_adapters_map = None
384
385 colspecs = util.update_copy(
386 _PGDialect_common_psycopg.colspecs,
387 {
388 sqltypes.String: _PGString,
389 REGCONFIG: _PGREGCONFIG,
390 JSON: _PGJSON,
391 CITEXT: CITEXT,
392 sqltypes.JSON: _PGJSON,
393 JSONB: _PGJSONB,
394 sqltypes.JSON.JSONPathType: _PGJSONPathType,
395 sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType,
396 sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType,
397 sqltypes.Interval: _PGInterval,
398 INTERVAL: _PGInterval,
399 sqltypes.Date: _PGDate,
400 sqltypes.DateTime: _PGTimeStamp,
401 sqltypes.Time: _PGTime,
402 sqltypes.Integer: _PGInteger,
403 sqltypes.SmallInteger: _PGSmallInteger,
404 sqltypes.BigInteger: _PGBigInteger,
405 ranges.AbstractSingleRange: _PsycopgRange,
406 ranges.AbstractMultiRange: _PsycopgMultiRange,
407 },
408 )
409
410 def __init__(self, **kwargs):
411 super().__init__(**kwargs)
412
413 if self.dbapi:
414 m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__)
415 if m:
416 self.psycopg_version = tuple(
417 int(x) for x in m.group(1, 2, 3) if x is not None
418 )
419
420 if self.psycopg_version < (3, 0, 2):
421 raise ImportError(
422 "psycopg version 3.0.2 or higher is required."
423 )
424
425 from psycopg.adapt import AdaptersMap
426
427 self._psycopg_adapters_map = adapters_map = AdaptersMap(
428 self.dbapi.adapters
429 )
430
431 if self._native_inet_types is False:
432 import psycopg.types.string
433
434 adapters_map.register_loader(
435 "inet", psycopg.types.string.TextLoader
436 )
437 adapters_map.register_loader(
438 "cidr", psycopg.types.string.TextLoader
439 )
440
441 if self._json_deserializer:
442 from psycopg.types.json import set_json_loads
443
444 set_json_loads(self._json_deserializer, adapters_map)
445
446 if self._json_serializer:
447 from psycopg.types.json import set_json_dumps
448
449 set_json_dumps(self._json_serializer, adapters_map)
450
451 def create_connect_args(self, url):
452 # see https://github.com/psycopg/psycopg/issues/83
453 cargs, cparams = super().create_connect_args(url)
454
455 if self._psycopg_adapters_map:
456 cparams["context"] = self._psycopg_adapters_map
457 if self.client_encoding is not None:
458 cparams["client_encoding"] = self.client_encoding
459 return cargs, cparams
460
461 def _type_info_fetch(self, connection, name):
462 from psycopg.types import TypeInfo
463
464 return TypeInfo.fetch(connection.connection.driver_connection, name)
465
466 def initialize(self, connection):
467 super().initialize(connection)
468
469 # PGDialect.initialize() checks server version for <= 8.2 and sets
470 # this flag to False if so
471 if not self.insert_returning:
472 self.insert_executemany_returning = False
473
474 # HSTORE can't be registered until we have a connection so that
475 # we can look up its OID, so we set up this adapter in
476 # initialize()
477 if self.use_native_hstore:
478 info = self._type_info_fetch(connection, "hstore")
479 self._has_native_hstore = info is not None
480 if self._has_native_hstore:
481 from psycopg.types.hstore import register_hstore
482
483 # register the adapter for connections made subsequent to
484 # this one
485 assert self._psycopg_adapters_map
486 register_hstore(info, self._psycopg_adapters_map)
487
488 # register the adapter for this connection
489 assert connection.connection
490 register_hstore(info, connection.connection.driver_connection)
491
492 @classmethod
493 def import_dbapi(cls):
494 import psycopg
495
496 return psycopg
497
498 @classmethod
499 def get_async_dialect_cls(cls, url):
500 return PGDialectAsync_psycopg
501
502 @util.memoized_property
503 def _isolation_lookup(self):
504 return {
505 "READ COMMITTED": self.dbapi.IsolationLevel.READ_COMMITTED,
506 "READ UNCOMMITTED": self.dbapi.IsolationLevel.READ_UNCOMMITTED,
507 "REPEATABLE READ": self.dbapi.IsolationLevel.REPEATABLE_READ,
508 "SERIALIZABLE": self.dbapi.IsolationLevel.SERIALIZABLE,
509 }
510
511 @util.memoized_property
512 def _psycopg_Json(self):
513 from psycopg.types import json
514
515 return json.Json
516
517 @util.memoized_property
518 def _psycopg_Jsonb(self):
519 from psycopg.types import json
520
521 return json.Jsonb
522
523 @util.memoized_property
524 def _psycopg_TransactionStatus(self):
525 from psycopg.pq import TransactionStatus
526
527 return TransactionStatus
528
529 @util.memoized_property
530 def _psycopg_Range(self):
531 from psycopg.types.range import Range
532
533 return Range
534
535 @util.memoized_property
536 def _psycopg_Multirange(self):
537 from psycopg.types.multirange import Multirange
538
539 return Multirange
540
541 def _do_isolation_level(self, connection, autocommit, isolation_level):
542 connection.autocommit = autocommit
543 connection.isolation_level = isolation_level
544
545 def get_isolation_level(self, dbapi_connection):
546 status_before = dbapi_connection.info.transaction_status
547 value = super().get_isolation_level(dbapi_connection)
548
549 # don't rely on psycopg providing enum symbols, compare with
550 # eq/ne
551 if status_before == self._psycopg_TransactionStatus.IDLE:
552 dbapi_connection.rollback()
553 return value
554
555 def set_isolation_level(self, dbapi_connection, level):
556 if level == "AUTOCOMMIT":
557 self._do_isolation_level(
558 dbapi_connection, autocommit=True, isolation_level=None
559 )
560 else:
561 self._do_isolation_level(
562 dbapi_connection,
563 autocommit=False,
564 isolation_level=self._isolation_lookup[level],
565 )
566
567 def set_readonly(self, connection, value):
568 connection.read_only = value
569
570 def get_readonly(self, connection):
571 return connection.read_only
572
573 def on_connect(self):
574 def notices(conn):
575 conn.add_notice_handler(_log_notices)
576
577 fns = [notices]
578
579 if self.isolation_level is not None:
580
581 def on_connect(conn):
582 self.set_isolation_level(conn, self.isolation_level)
583
584 fns.append(on_connect)
585
586 # fns always has the notices function
587 def on_connect(conn):
588 for fn in fns:
589 fn(conn)
590
591 return on_connect
592
593 def is_disconnect(self, e, connection, cursor):
594 if isinstance(e, self.dbapi.Error) and connection is not None:
595 if connection.closed or connection.broken:
596 return True
597 return False
598
599 def _do_prepared_twophase(self, connection, command, recover=False):
600 dbapi_conn = connection.connection.dbapi_connection
601 if (
602 recover
603 # don't rely on psycopg providing enum symbols, compare with
604 # eq/ne
605 or dbapi_conn.info.transaction_status
606 != self._psycopg_TransactionStatus.IDLE
607 ):
608 dbapi_conn.rollback()
609 before_autocommit = dbapi_conn.autocommit
610 try:
611 if not before_autocommit:
612 self._do_autocommit(dbapi_conn, True)
613 dbapi_conn.execute(command)
614 finally:
615 if not before_autocommit:
616 self._do_autocommit(dbapi_conn, before_autocommit)
617
618 def do_rollback_twophase(
619 self, connection, xid, is_prepared=True, recover=False
620 ):
621 if is_prepared:
622 self._do_prepared_twophase(
623 connection, f"ROLLBACK PREPARED '{xid}'", recover=recover
624 )
625 else:
626 self.do_rollback(connection.connection)
627
628 def do_commit_twophase(
629 self, connection, xid, is_prepared=True, recover=False
630 ):
631 if is_prepared:
632 self._do_prepared_twophase(
633 connection, f"COMMIT PREPARED '{xid}'", recover=recover
634 )
635 else:
636 self.do_commit(connection.connection)
637
638 @util.memoized_property
639 def _dialect_specific_select_one(self):
640 return ";"
641
642
643class AsyncAdapt_psycopg_cursor:
644 __slots__ = ("_cursor", "await_", "_rows")
645
646 _psycopg_ExecStatus = None
647
648 def __init__(self, cursor, await_) -> None:
649 self._cursor = cursor
650 self.await_ = await_
651 self._rows = deque()
652
653 def __getattr__(self, name):
654 return getattr(self._cursor, name)
655
656 @property
657 def arraysize(self):
658 return self._cursor.arraysize
659
660 @arraysize.setter
661 def arraysize(self, value):
662 self._cursor.arraysize = value
663
664 async def _async_soft_close(self) -> None:
665 return
666
667 def close(self):
668 self._rows.clear()
669 # Normal cursor just call _close() in a non-sync way.
670 self._cursor._close()
671
672 def execute(self, query, params=None, **kw):
673 result = self.await_(self._cursor.execute(query, params, **kw))
674 # sqlalchemy result is not async, so need to pull all rows here
675 res = self._cursor.pgresult
676
677 # don't rely on psycopg providing enum symbols, compare with
678 # eq/ne
679 if res and res.status == self._psycopg_ExecStatus.TUPLES_OK:
680 rows = self.await_(self._cursor.fetchall())
681 self._rows = deque(rows)
682 return result
683
684 def executemany(self, query, params_seq):
685 return self.await_(self._cursor.executemany(query, params_seq))
686
687 def __iter__(self):
688 while self._rows:
689 yield self._rows.popleft()
690
691 def fetchone(self):
692 if self._rows:
693 return self._rows.popleft()
694 else:
695 return None
696
697 def fetchmany(self, size=None):
698 if size is None:
699 size = self._cursor.arraysize
700
701 rr = self._rows
702 return [rr.popleft() for _ in range(min(size, len(rr)))]
703
704 def fetchall(self):
705 retval = list(self._rows)
706 self._rows.clear()
707 return retval
708
709
710class AsyncAdapt_psycopg_ss_cursor(AsyncAdapt_psycopg_cursor):
711 def execute(self, query, params=None, **kw):
712 self.await_(self._cursor.execute(query, params, **kw))
713 return self
714
715 def close(self):
716 self.await_(self._cursor.close())
717
718 def fetchone(self):
719 return self.await_(self._cursor.fetchone())
720
721 def fetchmany(self, size=0):
722 return self.await_(self._cursor.fetchmany(size))
723
724 def fetchall(self):
725 return self.await_(self._cursor.fetchall())
726
727 def __iter__(self):
728 iterator = self._cursor.__aiter__()
729 while True:
730 try:
731 yield self.await_(iterator.__anext__())
732 except StopAsyncIteration:
733 break
734
735
736class AsyncAdapt_psycopg_connection(AdaptedConnection):
737 _connection: AsyncConnection
738 __slots__ = ()
739 await_ = staticmethod(await_only)
740
741 def __init__(self, connection) -> None:
742 self._connection = connection
743
744 def __getattr__(self, name):
745 return getattr(self._connection, name)
746
747 def execute(self, query, params=None, **kw):
748 cursor = self.await_(self._connection.execute(query, params, **kw))
749 return AsyncAdapt_psycopg_cursor(cursor, self.await_)
750
751 def cursor(self, *args, **kw):
752 cursor = self._connection.cursor(*args, **kw)
753 if hasattr(cursor, "name"):
754 return AsyncAdapt_psycopg_ss_cursor(cursor, self.await_)
755 else:
756 return AsyncAdapt_psycopg_cursor(cursor, self.await_)
757
758 def commit(self):
759 self.await_(self._connection.commit())
760
761 def rollback(self):
762 self.await_(self._connection.rollback())
763
764 def close(self):
765 self.await_(self._connection.close())
766
767 @property
768 def autocommit(self):
769 return self._connection.autocommit
770
771 @autocommit.setter
772 def autocommit(self, value):
773 self.set_autocommit(value)
774
775 def set_autocommit(self, value):
776 self.await_(self._connection.set_autocommit(value))
777
778 def set_isolation_level(self, value):
779 self.await_(self._connection.set_isolation_level(value))
780
781 def set_read_only(self, value):
782 self.await_(self._connection.set_read_only(value))
783
784 def set_deferrable(self, value):
785 self.await_(self._connection.set_deferrable(value))
786
787
788class AsyncAdaptFallback_psycopg_connection(AsyncAdapt_psycopg_connection):
789 __slots__ = ()
790 await_ = staticmethod(await_fallback)
791
792
793class PsycopgAdaptDBAPI:
794 def __init__(self, psycopg) -> None:
795 self.psycopg = psycopg
796
797 for k, v in self.psycopg.__dict__.items():
798 if k != "connect":
799 self.__dict__[k] = v
800
801 def connect(self, *arg, **kw):
802 async_fallback = kw.pop("async_fallback", False)
803 creator_fn = kw.pop(
804 "async_creator_fn", self.psycopg.AsyncConnection.connect
805 )
806 if util.asbool(async_fallback):
807 return AsyncAdaptFallback_psycopg_connection(
808 await_fallback(creator_fn(*arg, **kw))
809 )
810 else:
811 return AsyncAdapt_psycopg_connection(
812 await_only(creator_fn(*arg, **kw))
813 )
814
815
816class PGDialectAsync_psycopg(PGDialect_psycopg):
817 is_async = True
818 supports_statement_cache = True
819
820 @classmethod
821 def import_dbapi(cls):
822 import psycopg
823 from psycopg.pq import ExecStatus
824
825 AsyncAdapt_psycopg_cursor._psycopg_ExecStatus = ExecStatus
826
827 return PsycopgAdaptDBAPI(psycopg)
828
829 @classmethod
830 def get_pool_class(cls, url):
831 async_fallback = url.query.get("async_fallback", False)
832
833 if util.asbool(async_fallback):
834 return pool.FallbackAsyncAdaptedQueuePool
835 else:
836 return pool.AsyncAdaptedQueuePool
837
838 def _type_info_fetch(self, connection, name):
839 from psycopg.types import TypeInfo
840
841 adapted = connection.connection
842 return adapted.await_(TypeInfo.fetch(adapted.driver_connection, name))
843
844 def _do_isolation_level(self, connection, autocommit, isolation_level):
845 connection.set_autocommit(autocommit)
846 connection.set_isolation_level(isolation_level)
847
848 def _do_autocommit(self, connection, value):
849 connection.set_autocommit(value)
850
851 def set_readonly(self, connection, value):
852 connection.set_read_only(value)
853
854 def set_deferrable(self, connection, value):
855 connection.set_deferrable(value)
856
857 def get_driver_connection(self, connection):
858 return connection._connection
859
860
861dialect = PGDialect_psycopg
862dialect_async = PGDialectAsync_psycopg