Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

389 statements  

1# dialects/postgresql/psycopg.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9r""" 

10.. dialect:: postgresql+psycopg 

11 :name: psycopg (a.k.a. psycopg 3) 

12 :dbapi: psycopg 

13 :connectstring: postgresql+psycopg://user:password@host:port/dbname[?key=value&key=value...] 

14 :url: https://pypi.org/project/psycopg/ 

15 

16``psycopg`` is the package and module name for version 3 of the ``psycopg`` 

17database driver, formerly known as ``psycopg2``. This driver is different 

18enough from its ``psycopg2`` predecessor that SQLAlchemy supports it 

19via a totally separate dialect; support for ``psycopg2`` is expected to remain 

20for as long as that package continues to function for modern Python versions, 

21and also remains the default dialect for the ``postgresql://`` dialect 

22series. 

23 

24The SQLAlchemy ``psycopg`` dialect provides both a sync and an async 

25implementation under the same dialect name. The proper version is 

26selected depending on how the engine is created: 

27 

28* calling :func:`_sa.create_engine` with ``postgresql+psycopg://...`` will 

29 automatically select the sync version, e.g.:: 

30 

31 from sqlalchemy import create_engine 

32 

33 sync_engine = create_engine( 

34 "postgresql+psycopg://scott:tiger@localhost/test" 

35 ) 

36 

37* calling :func:`_asyncio.create_async_engine` with 

38 ``postgresql+psycopg://...`` will automatically select the async version, 

39 e.g.:: 

40 

41 from sqlalchemy.ext.asyncio import create_async_engine 

42 

43 asyncio_engine = create_async_engine( 

44 "postgresql+psycopg://scott:tiger@localhost/test" 

45 ) 

46 

47The asyncio version of the dialect may also be specified explicitly using the 

48``psycopg_async`` suffix, as:: 

49 

50 from sqlalchemy.ext.asyncio import create_async_engine 

51 

52 asyncio_engine = create_async_engine( 

53 "postgresql+psycopg_async://scott:tiger@localhost/test" 

54 ) 

55 

56.. seealso:: 

57 

58 :ref:`postgresql_psycopg2` - The SQLAlchemy ``psycopg`` 

59 dialect shares most of its behavior with the ``psycopg2`` dialect. 

60 Further documentation is available there. 

61 

62Using psycopg Connection Pooling 

63-------------------------------- 

64 

65The ``psycopg`` driver provides its own connection pool implementation that 

66may be used in place of SQLAlchemy's pooling functionality. 

67This pool implementation provides support for fixed and dynamic pool sizes 

68(including automatic downsizing for unused connections), connection health 

69pre-checks, and support for both synchronous and asynchronous code 

70environments. 

71 

72Here is an example that uses the sync version of the pool, using 

73``psycopg_pool >= 3.3`` that introduces support for ``close_returns=True``:: 

74 

75 import psycopg_pool 

76 from sqlalchemy import create_engine 

77 from sqlalchemy.pool import NullPool 

78 

79 # Create a psycopg_pool connection pool 

80 my_pool = psycopg_pool.ConnectionPool( 

81 conninfo="postgresql://scott:tiger@localhost/test", 

82 close_returns=True, # Return "closed" active connections to the pool 

83 # ... other pool parameters as desired ... 

84 ) 

85 

86 # Create an engine that uses the connection pool to get a connection 

87 engine = create_engine( 

88 url="postgresql+psycopg://", # Only need the dialect now 

89 poolclass=NullPool, # Disable SQLAlchemy's default connection pool 

90 creator=my_pool.getconn, # Use Psycopg 3 connection pool to obtain connections 

91 ) 

92 

93Similarly an the async example:: 

94 

95 import psycopg_pool 

96 from sqlalchemy.ext.asyncio import create_async_engine 

97 from sqlalchemy.pool import NullPool 

98 

99 

100 async def define_engine(): 

101 # Create a psycopg_pool connection pool 

102 my_pool = psycopg_pool.AsyncConnectionPool( 

103 conninfo="postgresql://scott:tiger@localhost/test", 

104 open=False, # See comment below 

105 close_returns=True, # Return "closed" active connections to the pool 

106 # ... other pool parameters as desired ... 

107 ) 

108 

109 # Must explicitly open AsyncConnectionPool outside constructor 

110 # https://www.psycopg.org/psycopg3/docs/api/pool.html#psycopg_pool.AsyncConnectionPool 

111 await my_pool.open() 

112 

113 # Create an engine that uses the connection pool to get a connection 

114 engine = create_async_engine( 

115 url="postgresql+psycopg://", # Only need the dialect now 

116 poolclass=NullPool, # Disable SQLAlchemy's default connection pool 

117 async_creator=my_pool.getconn, # Use Psycopg 3 connection pool to obtain connections 

118 ) 

119 

120 return engine, my_pool 

121 

122The resulting engine may then be used normally. Internally, Psycopg 3 handles 

123connection pooling:: 

124 

125 with engine.connect() as conn: 

126 print(conn.scalar(text("select 42"))) 

127 

128.. seealso:: 

129 

130 `Connection pools <https://www.psycopg.org/psycopg3/docs/advanced/pool.html>`_ - 

131 the Psycopg 3 documentation for ``psycopg_pool.ConnectionPool``. 

132 

133 `Example for older version of psycopg_pool 

134 <https://github.com/sqlalchemy/sqlalchemy/discussions/12522#discussioncomment-13024666>`_ - 

135 An example about using the ``psycopg_pool<3.3`` that did not have the 

136 ``close_returns``` parameter. 

137 

138Using a different Cursor class 

139------------------------------ 

140 

141One of the differences between ``psycopg`` and the older ``psycopg2`` 

142is how bound parameters are handled: ``psycopg2`` would bind them 

143client side, while ``psycopg`` by default will bind them server side. 

144 

145It's possible to configure ``psycopg`` to do client side binding by 

146specifying the ``cursor_factory`` to be ``ClientCursor`` when creating 

147the engine:: 

148 

149 from psycopg import ClientCursor 

150 

151 client_side_engine = create_engine( 

152 "postgresql+psycopg://...", 

153 connect_args={"cursor_factory": ClientCursor}, 

154 ) 

155 

156Similarly when using an async engine the ``AsyncClientCursor`` can be 

157specified:: 

158 

159 from psycopg import AsyncClientCursor 

160 

161 client_side_engine = create_async_engine( 

162 "postgresql+psycopg://...", 

163 connect_args={"cursor_factory": AsyncClientCursor}, 

164 ) 

165 

166.. seealso:: 

167 

168 `Client-side-binding cursors <https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#client-side-binding-cursors>`_ 

169 

170""" # noqa 

171 

172from __future__ import annotations 

173 

174from collections import deque 

175import logging 

176import re 

177from typing import cast 

178from typing import TYPE_CHECKING 

179 

180from . import ranges 

181from ._psycopg_common import _PGDialect_common_psycopg 

182from ._psycopg_common import _PGExecutionContext_common_psycopg 

183from .base import INTERVAL 

184from .base import PGCompiler 

185from .base import PGIdentifierPreparer 

186from .base import REGCONFIG 

187from .json import JSON 

188from .json import JSONB 

189from .json import JSONPathType 

190from .types import CITEXT 

191from ... import pool 

192from ... import util 

193from ...engine import AdaptedConnection 

194from ...sql import sqltypes 

195from ...util.concurrency import await_fallback 

196from ...util.concurrency import await_only 

197 

198if TYPE_CHECKING: 

199 from typing import Iterable 

200 

201 from psycopg import AsyncConnection 

202 

203logger = logging.getLogger("sqlalchemy.dialects.postgresql") 

204 

205 

206class _PGString(sqltypes.String): 

207 render_bind_cast = True 

208 

209 

210class _PGREGCONFIG(REGCONFIG): 

211 render_bind_cast = True 

212 

213 

214class _PGJSON(JSON): 

215 def bind_processor(self, dialect): 

216 return self._make_bind_processor(None, dialect._psycopg_Json) 

217 

218 def result_processor(self, dialect, coltype): 

219 return None 

220 

221 

222class _PGJSONB(JSONB): 

223 def bind_processor(self, dialect): 

224 return self._make_bind_processor(None, dialect._psycopg_Jsonb) 

225 

226 def result_processor(self, dialect, coltype): 

227 return None 

228 

229 

230class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType): 

231 __visit_name__ = "json_int_index" 

232 

233 render_bind_cast = True 

234 

235 

236class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType): 

237 __visit_name__ = "json_str_index" 

238 

239 render_bind_cast = True 

240 

241 

242class _PGJSONPathType(JSONPathType): 

243 pass 

244 

245 

246class _PGInterval(INTERVAL): 

247 render_bind_cast = True 

248 

249 

250class _PGTimeStamp(sqltypes.DateTime): 

251 render_bind_cast = True 

252 

253 

254class _PGDate(sqltypes.Date): 

255 render_bind_cast = True 

256 

257 

258class _PGTime(sqltypes.Time): 

259 render_bind_cast = True 

260 

261 

262class _PGInteger(sqltypes.Integer): 

263 render_bind_cast = True 

264 

265 

266class _PGSmallInteger(sqltypes.SmallInteger): 

267 render_bind_cast = True 

268 

269 

270class _PGNullType(sqltypes.NullType): 

271 render_bind_cast = True 

272 

273 

274class _PGBigInteger(sqltypes.BigInteger): 

275 render_bind_cast = True 

276 

277 

278class _PGBoolean(sqltypes.Boolean): 

279 render_bind_cast = True 

280 

281 

282class _PsycopgRange(ranges.AbstractSingleRangeImpl): 

283 def bind_processor(self, dialect): 

284 psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range 

285 

286 def to_range(value): 

287 if isinstance(value, ranges.Range): 

288 value = psycopg_Range( 

289 value.lower, value.upper, value.bounds, value.empty 

290 ) 

291 return value 

292 

293 return to_range 

294 

295 def result_processor(self, dialect, coltype): 

296 def to_range(value): 

297 if value is not None: 

298 value = ranges.Range( 

299 value._lower, 

300 value._upper, 

301 bounds=value._bounds if value._bounds else "[)", 

302 empty=not value._bounds, 

303 ) 

304 return value 

305 

306 return to_range 

307 

308 

309class _PsycopgMultiRange(ranges.AbstractMultiRangeImpl): 

310 def bind_processor(self, dialect): 

311 psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range 

312 psycopg_Multirange = cast( 

313 PGDialect_psycopg, dialect 

314 )._psycopg_Multirange 

315 

316 NoneType = type(None) 

317 

318 def to_range(value): 

319 if isinstance(value, (str, NoneType, psycopg_Multirange)): 

320 return value 

321 

322 return psycopg_Multirange( 

323 [ 

324 psycopg_Range( 

325 element.lower, 

326 element.upper, 

327 element.bounds, 

328 element.empty, 

329 ) 

330 for element in cast("Iterable[ranges.Range]", value) 

331 ] 

332 ) 

333 

334 return to_range 

335 

336 def result_processor(self, dialect, coltype): 

337 def to_range(value): 

338 if value is None: 

339 return None 

340 else: 

341 return ranges.MultiRange( 

342 ranges.Range( 

343 elem._lower, 

344 elem._upper, 

345 bounds=elem._bounds if elem._bounds else "[)", 

346 empty=not elem._bounds, 

347 ) 

348 for elem in value 

349 ) 

350 

351 return to_range 

352 

353 

354class PGExecutionContext_psycopg(_PGExecutionContext_common_psycopg): 

355 pass 

356 

357 

358class PGCompiler_psycopg(PGCompiler): 

359 pass 

360 

361 

362class PGIdentifierPreparer_psycopg(PGIdentifierPreparer): 

363 pass 

364 

365 

366def _log_notices(diagnostic): 

367 logger.info("%s: %s", diagnostic.severity, diagnostic.message_primary) 

368 

369 

370class PGDialect_psycopg(_PGDialect_common_psycopg): 

371 driver = "psycopg" 

372 

373 supports_statement_cache = True 

374 supports_server_side_cursors = True 

375 default_paramstyle = "pyformat" 

376 supports_sane_multi_rowcount = True 

377 

378 execution_ctx_cls = PGExecutionContext_psycopg 

379 statement_compiler = PGCompiler_psycopg 

380 preparer = PGIdentifierPreparer_psycopg 

381 psycopg_version = (0, 0) 

382 

383 _has_native_hstore = True 

384 _psycopg_adapters_map = None 

385 

386 colspecs = util.update_copy( 

387 _PGDialect_common_psycopg.colspecs, 

388 { 

389 sqltypes.String: _PGString, 

390 REGCONFIG: _PGREGCONFIG, 

391 JSON: _PGJSON, 

392 CITEXT: CITEXT, 

393 sqltypes.JSON: _PGJSON, 

394 JSONB: _PGJSONB, 

395 sqltypes.JSON.JSONPathType: _PGJSONPathType, 

396 sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType, 

397 sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType, 

398 sqltypes.Interval: _PGInterval, 

399 INTERVAL: _PGInterval, 

400 sqltypes.Date: _PGDate, 

401 sqltypes.DateTime: _PGTimeStamp, 

402 sqltypes.Time: _PGTime, 

403 sqltypes.Integer: _PGInteger, 

404 sqltypes.SmallInteger: _PGSmallInteger, 

405 sqltypes.BigInteger: _PGBigInteger, 

406 ranges.AbstractSingleRange: _PsycopgRange, 

407 ranges.AbstractMultiRange: _PsycopgMultiRange, 

408 }, 

409 ) 

410 

411 def __init__(self, **kwargs): 

412 super().__init__(**kwargs) 

413 

414 if self.dbapi: 

415 m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__) 

416 if m: 

417 self.psycopg_version = tuple( 

418 int(x) for x in m.group(1, 2, 3) if x is not None 

419 ) 

420 

421 if self.psycopg_version < (3, 0, 2): 

422 raise ImportError( 

423 "psycopg version 3.0.2 or higher is required." 

424 ) 

425 

426 from psycopg.adapt import AdaptersMap 

427 

428 self._psycopg_adapters_map = adapters_map = AdaptersMap( 

429 self.dbapi.adapters 

430 ) 

431 

432 if self._native_inet_types is False: 

433 import psycopg.types.string 

434 

435 adapters_map.register_loader( 

436 "inet", psycopg.types.string.TextLoader 

437 ) 

438 adapters_map.register_loader( 

439 "cidr", psycopg.types.string.TextLoader 

440 ) 

441 

442 if self._json_deserializer: 

443 from psycopg.types.json import set_json_loads 

444 

445 set_json_loads(self._json_deserializer, adapters_map) 

446 

447 if self._json_serializer: 

448 from psycopg.types.json import set_json_dumps 

449 

450 set_json_dumps(self._json_serializer, adapters_map) 

451 

452 def create_connect_args(self, url): 

453 # see https://github.com/psycopg/psycopg/issues/83 

454 cargs, cparams = super().create_connect_args(url) 

455 

456 if self._psycopg_adapters_map: 

457 cparams["context"] = self._psycopg_adapters_map 

458 if self.client_encoding is not None: 

459 cparams["client_encoding"] = self.client_encoding 

460 return cargs, cparams 

461 

462 def _type_info_fetch(self, connection, name): 

463 from psycopg.types import TypeInfo 

464 

465 return TypeInfo.fetch(connection.connection.driver_connection, name) 

466 

467 def initialize(self, connection): 

468 super().initialize(connection) 

469 

470 # PGDialect.initialize() checks server version for <= 8.2 and sets 

471 # this flag to False if so 

472 if not self.insert_returning: 

473 self.insert_executemany_returning = False 

474 

475 # HSTORE can't be registered until we have a connection so that 

476 # we can look up its OID, so we set up this adapter in 

477 # initialize() 

478 if self.use_native_hstore: 

479 info = self._type_info_fetch(connection, "hstore") 

480 self._has_native_hstore = info is not None 

481 if self._has_native_hstore: 

482 from psycopg.types.hstore import register_hstore 

483 

484 # register the adapter for connections made subsequent to 

485 # this one 

486 assert self._psycopg_adapters_map 

487 register_hstore(info, self._psycopg_adapters_map) 

488 

489 # register the adapter for this connection 

490 assert connection.connection 

491 register_hstore(info, connection.connection.driver_connection) 

492 

493 @classmethod 

494 def import_dbapi(cls): 

495 import psycopg 

496 

497 return psycopg 

498 

499 @classmethod 

500 def get_async_dialect_cls(cls, url): 

501 return PGDialectAsync_psycopg 

502 

503 @util.memoized_property 

504 def _isolation_lookup(self): 

505 return { 

506 "READ COMMITTED": self.dbapi.IsolationLevel.READ_COMMITTED, 

507 "READ UNCOMMITTED": self.dbapi.IsolationLevel.READ_UNCOMMITTED, 

508 "REPEATABLE READ": self.dbapi.IsolationLevel.REPEATABLE_READ, 

509 "SERIALIZABLE": self.dbapi.IsolationLevel.SERIALIZABLE, 

510 } 

511 

512 @util.memoized_property 

513 def _psycopg_Json(self): 

514 from psycopg.types import json 

515 

516 return json.Json 

517 

518 @util.memoized_property 

519 def _psycopg_Jsonb(self): 

520 from psycopg.types import json 

521 

522 return json.Jsonb 

523 

524 @util.memoized_property 

525 def _psycopg_TransactionStatus(self): 

526 from psycopg.pq import TransactionStatus 

527 

528 return TransactionStatus 

529 

530 @util.memoized_property 

531 def _psycopg_Range(self): 

532 from psycopg.types.range import Range 

533 

534 return Range 

535 

536 @util.memoized_property 

537 def _psycopg_Multirange(self): 

538 from psycopg.types.multirange import Multirange 

539 

540 return Multirange 

541 

542 def _do_isolation_level(self, connection, autocommit, isolation_level): 

543 connection.autocommit = autocommit 

544 connection.isolation_level = isolation_level 

545 

546 def get_isolation_level(self, dbapi_connection): 

547 status_before = dbapi_connection.info.transaction_status 

548 value = super().get_isolation_level(dbapi_connection) 

549 

550 # don't rely on psycopg providing enum symbols, compare with 

551 # eq/ne 

552 if status_before == self._psycopg_TransactionStatus.IDLE: 

553 dbapi_connection.rollback() 

554 return value 

555 

556 def set_isolation_level(self, dbapi_connection, level): 

557 if level == "AUTOCOMMIT": 

558 self._do_isolation_level( 

559 dbapi_connection, autocommit=True, isolation_level=None 

560 ) 

561 else: 

562 self._do_isolation_level( 

563 dbapi_connection, 

564 autocommit=False, 

565 isolation_level=self._isolation_lookup[level], 

566 ) 

567 

568 def set_readonly(self, connection, value): 

569 connection.read_only = value 

570 

571 def get_readonly(self, connection): 

572 return connection.read_only 

573 

574 def on_connect(self): 

575 def notices(conn): 

576 conn.add_notice_handler(_log_notices) 

577 

578 fns = [notices] 

579 

580 if self.isolation_level is not None: 

581 

582 def on_connect(conn): 

583 self.set_isolation_level(conn, self.isolation_level) 

584 

585 fns.append(on_connect) 

586 

587 # fns always has the notices function 

588 def on_connect(conn): 

589 for fn in fns: 

590 fn(conn) 

591 

592 return on_connect 

593 

594 def is_disconnect(self, e, connection, cursor): 

595 if isinstance(e, self.dbapi.Error) and connection is not None: 

596 if connection.closed or connection.broken: 

597 return True 

598 return False 

599 

600 def _twophase_idle_check(self, dbapi_conn): 

601 # don't rely on psycopg providing enum symbols, compare with eq/ne 

602 return ( 

603 dbapi_conn.info.transaction_status 

604 == self._psycopg_TransactionStatus.IDLE 

605 ) 

606 

607 @util.memoized_property 

608 def _dialect_specific_select_one(self): 

609 return ";" 

610 

611 

612class AsyncAdapt_psycopg_cursor: 

613 __slots__ = ("_cursor", "await_", "_rows") 

614 

615 _psycopg_ExecStatus = None 

616 

617 def __init__(self, cursor, await_) -> None: 

618 self._cursor = cursor 

619 self.await_ = await_ 

620 self._rows = deque() 

621 

622 def __getattr__(self, name): 

623 return getattr(self._cursor, name) 

624 

625 @property 

626 def arraysize(self): 

627 return self._cursor.arraysize 

628 

629 @arraysize.setter 

630 def arraysize(self, value): 

631 self._cursor.arraysize = value 

632 

633 async def _async_soft_close(self) -> None: 

634 return 

635 

636 def close(self): 

637 self._rows.clear() 

638 # Normal cursor just call _close() in a non-sync way. 

639 self._cursor._close() 

640 

641 def execute(self, query, params=None, **kw): 

642 result = self.await_(self._cursor.execute(query, params, **kw)) 

643 # sqlalchemy result is not async, so need to pull all rows here 

644 res = self._cursor.pgresult 

645 

646 # don't rely on psycopg providing enum symbols, compare with 

647 # eq/ne 

648 if res and res.status == self._psycopg_ExecStatus.TUPLES_OK: 

649 rows = self.await_(self._cursor.fetchall()) 

650 self._rows = deque(rows) 

651 return result 

652 

653 def executemany(self, query, params_seq): 

654 return self.await_(self._cursor.executemany(query, params_seq)) 

655 

656 def __iter__(self): 

657 while self._rows: 

658 yield self._rows.popleft() 

659 

660 def fetchone(self): 

661 if self._rows: 

662 return self._rows.popleft() 

663 else: 

664 return None 

665 

666 def fetchmany(self, size=None): 

667 if size is None: 

668 size = self._cursor.arraysize 

669 

670 rr = self._rows 

671 return [rr.popleft() for _ in range(min(size, len(rr)))] 

672 

673 def fetchall(self): 

674 retval = list(self._rows) 

675 self._rows.clear() 

676 return retval 

677 

678 

679class AsyncAdapt_psycopg_ss_cursor(AsyncAdapt_psycopg_cursor): 

680 def execute(self, query, params=None, **kw): 

681 self.await_(self._cursor.execute(query, params, **kw)) 

682 return self 

683 

684 def close(self): 

685 self.await_(self._cursor.close()) 

686 

687 def fetchone(self): 

688 return self.await_(self._cursor.fetchone()) 

689 

690 def fetchmany(self, size=0): 

691 return self.await_(self._cursor.fetchmany(size)) 

692 

693 def fetchall(self): 

694 return self.await_(self._cursor.fetchall()) 

695 

696 def __iter__(self): 

697 iterator = self._cursor.__aiter__() 

698 while True: 

699 try: 

700 yield self.await_(iterator.__anext__()) 

701 except StopAsyncIteration: 

702 break 

703 

704 

705class AsyncAdapt_psycopg_connection(AdaptedConnection): 

706 _connection: AsyncConnection 

707 __slots__ = () 

708 await_ = staticmethod(await_only) 

709 

710 def __init__(self, connection) -> None: 

711 self._connection = connection 

712 

713 def __getattr__(self, name): 

714 return getattr(self._connection, name) 

715 

716 def execute(self, query, params=None, **kw): 

717 cursor = self.await_(self._connection.execute(query, params, **kw)) 

718 return AsyncAdapt_psycopg_cursor(cursor, self.await_) 

719 

720 def cursor(self, *args, **kw): 

721 cursor = self._connection.cursor(*args, **kw) 

722 if hasattr(cursor, "name"): 

723 return AsyncAdapt_psycopg_ss_cursor(cursor, self.await_) 

724 else: 

725 return AsyncAdapt_psycopg_cursor(cursor, self.await_) 

726 

727 def commit(self): 

728 self.await_(self._connection.commit()) 

729 

730 def rollback(self): 

731 self.await_(self._connection.rollback()) 

732 

733 def close(self): 

734 self.await_(self._connection.close()) 

735 

736 @property 

737 def autocommit(self): 

738 return self._connection.autocommit 

739 

740 @autocommit.setter 

741 def autocommit(self, value): 

742 self.set_autocommit(value) 

743 

744 def set_autocommit(self, value): 

745 self.await_(self._connection.set_autocommit(value)) 

746 

747 def set_isolation_level(self, value): 

748 self.await_(self._connection.set_isolation_level(value)) 

749 

750 def set_read_only(self, value): 

751 self.await_(self._connection.set_read_only(value)) 

752 

753 def set_deferrable(self, value): 

754 self.await_(self._connection.set_deferrable(value)) 

755 

756 def tpc_begin(self, xid): 

757 return self.await_(self._connection.tpc_begin(xid)) 

758 

759 def tpc_prepare(self): 

760 return self.await_(self._connection.tpc_prepare()) 

761 

762 def tpc_commit(self, xid=None): 

763 return self.await_(self._connection.tpc_commit(xid)) 

764 

765 def tpc_rollback(self, xid=None): 

766 return self.await_(self._connection.tpc_rollback(xid)) 

767 

768 def tpc_recover(self): 

769 return self.await_(self._connection.tpc_recover()) 

770 

771 

772class AsyncAdaptFallback_psycopg_connection(AsyncAdapt_psycopg_connection): 

773 __slots__ = () 

774 await_ = staticmethod(await_fallback) 

775 

776 

777class PsycopgAdaptDBAPI: 

778 def __init__(self, psycopg) -> None: 

779 self.psycopg = psycopg 

780 

781 for k, v in self.psycopg.__dict__.items(): 

782 if k != "connect": 

783 self.__dict__[k] = v 

784 

785 def connect(self, *arg, **kw): 

786 async_fallback = kw.pop("async_fallback", False) 

787 creator_fn = kw.pop( 

788 "async_creator_fn", self.psycopg.AsyncConnection.connect 

789 ) 

790 if util.asbool(async_fallback): 

791 return AsyncAdaptFallback_psycopg_connection( 

792 await_fallback(creator_fn(*arg, **kw)) 

793 ) 

794 else: 

795 return AsyncAdapt_psycopg_connection( 

796 await_only(creator_fn(*arg, **kw)) 

797 ) 

798 

799 

800class PGDialectAsync_psycopg(PGDialect_psycopg): 

801 is_async = True 

802 supports_statement_cache = True 

803 

804 @classmethod 

805 def import_dbapi(cls): 

806 import psycopg 

807 from psycopg.pq import ExecStatus 

808 

809 AsyncAdapt_psycopg_cursor._psycopg_ExecStatus = ExecStatus 

810 

811 return PsycopgAdaptDBAPI(psycopg) 

812 

813 @classmethod 

814 def get_pool_class(cls, url): 

815 async_fallback = url.query.get("async_fallback", False) 

816 

817 if util.asbool(async_fallback): 

818 return pool.FallbackAsyncAdaptedQueuePool 

819 else: 

820 return pool.AsyncAdaptedQueuePool 

821 

822 def _type_info_fetch(self, connection, name): 

823 from psycopg.types import TypeInfo 

824 

825 adapted = connection.connection 

826 return adapted.await_(TypeInfo.fetch(adapted.driver_connection, name)) 

827 

828 def _do_isolation_level(self, connection, autocommit, isolation_level): 

829 connection.set_autocommit(autocommit) 

830 connection.set_isolation_level(isolation_level) 

831 

832 def _do_autocommit(self, connection, value): 

833 connection.set_autocommit(value) 

834 

835 def set_readonly(self, connection, value): 

836 connection.set_read_only(value) 

837 

838 def set_deferrable(self, connection, value): 

839 connection.set_deferrable(value) 

840 

841 def get_driver_connection(self, connection): 

842 return connection._connection 

843 

844 

845dialect = PGDialect_psycopg 

846dialect_async = PGDialectAsync_psycopg