Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

396 statements  

1# dialects/postgresql/psycopg.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9r""" 

10.. dialect:: postgresql+psycopg 

11 :name: psycopg (a.k.a. psycopg 3) 

12 :dbapi: psycopg 

13 :connectstring: postgresql+psycopg://user:password@host:port/dbname[?key=value&key=value...] 

14 :url: https://pypi.org/project/psycopg/ 

15 

16``psycopg`` is the package and module name for version 3 of the ``psycopg`` 

17database driver, formerly known as ``psycopg2``. This driver is different 

18enough from its ``psycopg2`` predecessor that SQLAlchemy supports it 

19via a totally separate dialect; support for ``psycopg2`` is expected to remain 

20for as long as that package continues to function for modern Python versions, 

21and also remains the default dialect for the ``postgresql://`` dialect 

22series. 

23 

24The SQLAlchemy ``psycopg`` dialect provides both a sync and an async 

25implementation under the same dialect name. The proper version is 

26selected depending on how the engine is created: 

27 

28* calling :func:`_sa.create_engine` with ``postgresql+psycopg://...`` will 

29 automatically select the sync version, e.g.:: 

30 

31 from sqlalchemy import create_engine 

32 

33 sync_engine = create_engine( 

34 "postgresql+psycopg://scott:tiger@localhost/test" 

35 ) 

36 

37* calling :func:`_asyncio.create_async_engine` with 

38 ``postgresql+psycopg://...`` will automatically select the async version, 

39 e.g.:: 

40 

41 from sqlalchemy.ext.asyncio import create_async_engine 

42 

43 asyncio_engine = create_async_engine( 

44 "postgresql+psycopg://scott:tiger@localhost/test" 

45 ) 

46 

47The asyncio version of the dialect may also be specified explicitly using the 

48``psycopg_async`` suffix, as:: 

49 

50 from sqlalchemy.ext.asyncio import create_async_engine 

51 

52 asyncio_engine = create_async_engine( 

53 "postgresql+psycopg_async://scott:tiger@localhost/test" 

54 ) 

55 

56.. seealso:: 

57 

58 :ref:`postgresql_psycopg2` - The SQLAlchemy ``psycopg`` 

59 dialect shares most of its behavior with the ``psycopg2`` dialect. 

60 Further documentation is available there. 

61 

62Using psycopg Connection Pooling 

63-------------------------------- 

64 

65The ``psycopg`` driver provides its own connection pool implementation that 

66may be used in place of SQLAlchemy's pooling functionality. 

67This pool implementation provides support for fixed and dynamic pool sizes 

68(including automatic downsizing for unused connections), connection health 

69pre-checks, and support for both synchronous and asynchronous code 

70environments. 

71 

72Here is an example that uses the sync version of the pool, using 

73``psycopg_pool >= 3.3`` that introduces support for ``close_returns=True``:: 

74 

75 import psycopg_pool 

76 from sqlalchemy import create_engine 

77 from sqlalchemy.pool import NullPool 

78 

79 # Create a psycopg_pool connection pool 

80 my_pool = psycopg_pool.ConnectionPool( 

81 conninfo="postgresql://scott:tiger@localhost/test", 

82 close_returns=True, # Return "closed" active connections to the pool 

83 # ... other pool parameters as desired ... 

84 ) 

85 

86 # Create an engine that uses the connection pool to get a connection 

87 engine = create_engine( 

88 url="postgresql+psycopg://", # Only need the dialect now 

89 poolclass=NullPool, # Disable SQLAlchemy's default connection pool 

90 creator=my_pool.getconn, # Use Psycopg 3 connection pool to obtain connections 

91 ) 

92 

93Similarly an the async example:: 

94 

95 import psycopg_pool 

96 from sqlalchemy.ext.asyncio import create_async_engine 

97 from sqlalchemy.pool import NullPool 

98 

99 

100 async def define_engine(): 

101 # Create a psycopg_pool connection pool 

102 my_pool = psycopg_pool.AsyncConnectionPool( 

103 conninfo="postgresql://scott:tiger@localhost/test", 

104 open=False, # See comment below 

105 close_returns=True, # Return "closed" active connections to the pool 

106 # ... other pool parameters as desired ... 

107 ) 

108 

109 # Must explicitly open AsyncConnectionPool outside constructor 

110 # https://www.psycopg.org/psycopg3/docs/api/pool.html#psycopg_pool.AsyncConnectionPool 

111 await my_pool.open() 

112 

113 # Create an engine that uses the connection pool to get a connection 

114 engine = create_async_engine( 

115 url="postgresql+psycopg://", # Only need the dialect now 

116 poolclass=NullPool, # Disable SQLAlchemy's default connection pool 

117 async_creator=my_pool.getconn, # Use Psycopg 3 connection pool to obtain connections 

118 ) 

119 

120 return engine, my_pool 

121 

122The resulting engine may then be used normally. Internally, Psycopg 3 handles 

123connection pooling:: 

124 

125 with engine.connect() as conn: 

126 print(conn.scalar(text("select 42"))) 

127 

128.. seealso:: 

129 

130 `Connection pools <https://www.psycopg.org/psycopg3/docs/advanced/pool.html>`_ - 

131 the Psycopg 3 documentation for ``psycopg_pool.ConnectionPool``. 

132 

133 `Example for older version of psycopg_pool 

134 <https://github.com/sqlalchemy/sqlalchemy/discussions/12522#discussioncomment-13024666>`_ - 

135 An example about using the ``psycopg_pool<3.3`` that did not have the 

136 ``close_returns``` parameter. 

137 

138Using a different Cursor class 

139------------------------------ 

140 

141One of the differences between ``psycopg`` and the older ``psycopg2`` 

142is how bound parameters are handled: ``psycopg2`` would bind them 

143client side, while ``psycopg`` by default will bind them server side. 

144 

145It's possible to configure ``psycopg`` to do client side binding by 

146specifying the ``cursor_factory`` to be ``ClientCursor`` when creating 

147the engine:: 

148 

149 from psycopg import ClientCursor 

150 

151 client_side_engine = create_engine( 

152 "postgresql+psycopg://...", 

153 connect_args={"cursor_factory": ClientCursor}, 

154 ) 

155 

156Similarly when using an async engine the ``AsyncClientCursor`` can be 

157specified:: 

158 

159 from psycopg import AsyncClientCursor 

160 

161 client_side_engine = create_async_engine( 

162 "postgresql+psycopg://...", 

163 connect_args={"cursor_factory": AsyncClientCursor}, 

164 ) 

165 

166.. seealso:: 

167 

168 `Client-side-binding cursors <https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#client-side-binding-cursors>`_ 

169 

170""" # noqa 

171from __future__ import annotations 

172 

173from collections import deque 

174import logging 

175import re 

176from typing import cast 

177from typing import TYPE_CHECKING 

178 

179from . import ranges 

180from ._psycopg_common import _PGDialect_common_psycopg 

181from ._psycopg_common import _PGExecutionContext_common_psycopg 

182from .base import INTERVAL 

183from .base import PGCompiler 

184from .base import PGIdentifierPreparer 

185from .base import REGCONFIG 

186from .json import JSON 

187from .json import JSONB 

188from .json import JSONPathType 

189from .types import CITEXT 

190from ... import pool 

191from ... import util 

192from ...engine import AdaptedConnection 

193from ...sql import sqltypes 

194from ...util.concurrency import await_fallback 

195from ...util.concurrency import await_only 

196 

197if TYPE_CHECKING: 

198 from typing import Iterable 

199 

200 from psycopg import AsyncConnection 

201 

202logger = logging.getLogger("sqlalchemy.dialects.postgresql") 

203 

204 

205class _PGString(sqltypes.String): 

206 render_bind_cast = True 

207 

208 

209class _PGREGCONFIG(REGCONFIG): 

210 render_bind_cast = True 

211 

212 

213class _PGJSON(JSON): 

214 def bind_processor(self, dialect): 

215 return self._make_bind_processor(None, dialect._psycopg_Json) 

216 

217 def result_processor(self, dialect, coltype): 

218 return None 

219 

220 

221class _PGJSONB(JSONB): 

222 def bind_processor(self, dialect): 

223 return self._make_bind_processor(None, dialect._psycopg_Jsonb) 

224 

225 def result_processor(self, dialect, coltype): 

226 return None 

227 

228 

229class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType): 

230 __visit_name__ = "json_int_index" 

231 

232 render_bind_cast = True 

233 

234 

235class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType): 

236 __visit_name__ = "json_str_index" 

237 

238 render_bind_cast = True 

239 

240 

241class _PGJSONPathType(JSONPathType): 

242 pass 

243 

244 

245class _PGInterval(INTERVAL): 

246 render_bind_cast = True 

247 

248 

249class _PGTimeStamp(sqltypes.DateTime): 

250 render_bind_cast = True 

251 

252 

253class _PGDate(sqltypes.Date): 

254 render_bind_cast = True 

255 

256 

257class _PGTime(sqltypes.Time): 

258 render_bind_cast = True 

259 

260 

261class _PGInteger(sqltypes.Integer): 

262 render_bind_cast = True 

263 

264 

265class _PGSmallInteger(sqltypes.SmallInteger): 

266 render_bind_cast = True 

267 

268 

269class _PGNullType(sqltypes.NullType): 

270 render_bind_cast = True 

271 

272 

273class _PGBigInteger(sqltypes.BigInteger): 

274 render_bind_cast = True 

275 

276 

277class _PGBoolean(sqltypes.Boolean): 

278 render_bind_cast = True 

279 

280 

281class _PsycopgRange(ranges.AbstractSingleRangeImpl): 

282 def bind_processor(self, dialect): 

283 psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range 

284 

285 def to_range(value): 

286 if isinstance(value, ranges.Range): 

287 value = psycopg_Range( 

288 value.lower, value.upper, value.bounds, value.empty 

289 ) 

290 return value 

291 

292 return to_range 

293 

294 def result_processor(self, dialect, coltype): 

295 def to_range(value): 

296 if value is not None: 

297 value = ranges.Range( 

298 value._lower, 

299 value._upper, 

300 bounds=value._bounds if value._bounds else "[)", 

301 empty=not value._bounds, 

302 ) 

303 return value 

304 

305 return to_range 

306 

307 

308class _PsycopgMultiRange(ranges.AbstractMultiRangeImpl): 

309 def bind_processor(self, dialect): 

310 psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range 

311 psycopg_Multirange = cast( 

312 PGDialect_psycopg, dialect 

313 )._psycopg_Multirange 

314 

315 NoneType = type(None) 

316 

317 def to_range(value): 

318 if isinstance(value, (str, NoneType, psycopg_Multirange)): 

319 return value 

320 

321 return psycopg_Multirange( 

322 [ 

323 psycopg_Range( 

324 element.lower, 

325 element.upper, 

326 element.bounds, 

327 element.empty, 

328 ) 

329 for element in cast("Iterable[ranges.Range]", value) 

330 ] 

331 ) 

332 

333 return to_range 

334 

335 def result_processor(self, dialect, coltype): 

336 def to_range(value): 

337 if value is None: 

338 return None 

339 else: 

340 return ranges.MultiRange( 

341 ranges.Range( 

342 elem._lower, 

343 elem._upper, 

344 bounds=elem._bounds if elem._bounds else "[)", 

345 empty=not elem._bounds, 

346 ) 

347 for elem in value 

348 ) 

349 

350 return to_range 

351 

352 

353class PGExecutionContext_psycopg(_PGExecutionContext_common_psycopg): 

354 pass 

355 

356 

357class PGCompiler_psycopg(PGCompiler): 

358 pass 

359 

360 

361class PGIdentifierPreparer_psycopg(PGIdentifierPreparer): 

362 pass 

363 

364 

365def _log_notices(diagnostic): 

366 logger.info("%s: %s", diagnostic.severity, diagnostic.message_primary) 

367 

368 

369class PGDialect_psycopg(_PGDialect_common_psycopg): 

370 driver = "psycopg" 

371 

372 supports_statement_cache = True 

373 supports_server_side_cursors = True 

374 default_paramstyle = "pyformat" 

375 supports_sane_multi_rowcount = True 

376 

377 execution_ctx_cls = PGExecutionContext_psycopg 

378 statement_compiler = PGCompiler_psycopg 

379 preparer = PGIdentifierPreparer_psycopg 

380 psycopg_version = (0, 0) 

381 

382 _has_native_hstore = True 

383 _psycopg_adapters_map = None 

384 

385 colspecs = util.update_copy( 

386 _PGDialect_common_psycopg.colspecs, 

387 { 

388 sqltypes.String: _PGString, 

389 REGCONFIG: _PGREGCONFIG, 

390 JSON: _PGJSON, 

391 CITEXT: CITEXT, 

392 sqltypes.JSON: _PGJSON, 

393 JSONB: _PGJSONB, 

394 sqltypes.JSON.JSONPathType: _PGJSONPathType, 

395 sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType, 

396 sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType, 

397 sqltypes.Interval: _PGInterval, 

398 INTERVAL: _PGInterval, 

399 sqltypes.Date: _PGDate, 

400 sqltypes.DateTime: _PGTimeStamp, 

401 sqltypes.Time: _PGTime, 

402 sqltypes.Integer: _PGInteger, 

403 sqltypes.SmallInteger: _PGSmallInteger, 

404 sqltypes.BigInteger: _PGBigInteger, 

405 ranges.AbstractSingleRange: _PsycopgRange, 

406 ranges.AbstractMultiRange: _PsycopgMultiRange, 

407 }, 

408 ) 

409 

410 def __init__(self, **kwargs): 

411 super().__init__(**kwargs) 

412 

413 if self.dbapi: 

414 m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__) 

415 if m: 

416 self.psycopg_version = tuple( 

417 int(x) for x in m.group(1, 2, 3) if x is not None 

418 ) 

419 

420 if self.psycopg_version < (3, 0, 2): 

421 raise ImportError( 

422 "psycopg version 3.0.2 or higher is required." 

423 ) 

424 

425 from psycopg.adapt import AdaptersMap 

426 

427 self._psycopg_adapters_map = adapters_map = AdaptersMap( 

428 self.dbapi.adapters 

429 ) 

430 

431 if self._native_inet_types is False: 

432 import psycopg.types.string 

433 

434 adapters_map.register_loader( 

435 "inet", psycopg.types.string.TextLoader 

436 ) 

437 adapters_map.register_loader( 

438 "cidr", psycopg.types.string.TextLoader 

439 ) 

440 

441 if self._json_deserializer: 

442 from psycopg.types.json import set_json_loads 

443 

444 set_json_loads(self._json_deserializer, adapters_map) 

445 

446 if self._json_serializer: 

447 from psycopg.types.json import set_json_dumps 

448 

449 set_json_dumps(self._json_serializer, adapters_map) 

450 

451 def create_connect_args(self, url): 

452 # see https://github.com/psycopg/psycopg/issues/83 

453 cargs, cparams = super().create_connect_args(url) 

454 

455 if self._psycopg_adapters_map: 

456 cparams["context"] = self._psycopg_adapters_map 

457 if self.client_encoding is not None: 

458 cparams["client_encoding"] = self.client_encoding 

459 return cargs, cparams 

460 

461 def _type_info_fetch(self, connection, name): 

462 from psycopg.types import TypeInfo 

463 

464 return TypeInfo.fetch(connection.connection.driver_connection, name) 

465 

466 def initialize(self, connection): 

467 super().initialize(connection) 

468 

469 # PGDialect.initialize() checks server version for <= 8.2 and sets 

470 # this flag to False if so 

471 if not self.insert_returning: 

472 self.insert_executemany_returning = False 

473 

474 # HSTORE can't be registered until we have a connection so that 

475 # we can look up its OID, so we set up this adapter in 

476 # initialize() 

477 if self.use_native_hstore: 

478 info = self._type_info_fetch(connection, "hstore") 

479 self._has_native_hstore = info is not None 

480 if self._has_native_hstore: 

481 from psycopg.types.hstore import register_hstore 

482 

483 # register the adapter for connections made subsequent to 

484 # this one 

485 assert self._psycopg_adapters_map 

486 register_hstore(info, self._psycopg_adapters_map) 

487 

488 # register the adapter for this connection 

489 assert connection.connection 

490 register_hstore(info, connection.connection.driver_connection) 

491 

492 @classmethod 

493 def import_dbapi(cls): 

494 import psycopg 

495 

496 return psycopg 

497 

498 @classmethod 

499 def get_async_dialect_cls(cls, url): 

500 return PGDialectAsync_psycopg 

501 

502 @util.memoized_property 

503 def _isolation_lookup(self): 

504 return { 

505 "READ COMMITTED": self.dbapi.IsolationLevel.READ_COMMITTED, 

506 "READ UNCOMMITTED": self.dbapi.IsolationLevel.READ_UNCOMMITTED, 

507 "REPEATABLE READ": self.dbapi.IsolationLevel.REPEATABLE_READ, 

508 "SERIALIZABLE": self.dbapi.IsolationLevel.SERIALIZABLE, 

509 } 

510 

511 @util.memoized_property 

512 def _psycopg_Json(self): 

513 from psycopg.types import json 

514 

515 return json.Json 

516 

517 @util.memoized_property 

518 def _psycopg_Jsonb(self): 

519 from psycopg.types import json 

520 

521 return json.Jsonb 

522 

523 @util.memoized_property 

524 def _psycopg_TransactionStatus(self): 

525 from psycopg.pq import TransactionStatus 

526 

527 return TransactionStatus 

528 

529 @util.memoized_property 

530 def _psycopg_Range(self): 

531 from psycopg.types.range import Range 

532 

533 return Range 

534 

535 @util.memoized_property 

536 def _psycopg_Multirange(self): 

537 from psycopg.types.multirange import Multirange 

538 

539 return Multirange 

540 

541 def _do_isolation_level(self, connection, autocommit, isolation_level): 

542 connection.autocommit = autocommit 

543 connection.isolation_level = isolation_level 

544 

545 def get_isolation_level(self, dbapi_connection): 

546 status_before = dbapi_connection.info.transaction_status 

547 value = super().get_isolation_level(dbapi_connection) 

548 

549 # don't rely on psycopg providing enum symbols, compare with 

550 # eq/ne 

551 if status_before == self._psycopg_TransactionStatus.IDLE: 

552 dbapi_connection.rollback() 

553 return value 

554 

555 def set_isolation_level(self, dbapi_connection, level): 

556 if level == "AUTOCOMMIT": 

557 self._do_isolation_level( 

558 dbapi_connection, autocommit=True, isolation_level=None 

559 ) 

560 else: 

561 self._do_isolation_level( 

562 dbapi_connection, 

563 autocommit=False, 

564 isolation_level=self._isolation_lookup[level], 

565 ) 

566 

567 def set_readonly(self, connection, value): 

568 connection.read_only = value 

569 

570 def get_readonly(self, connection): 

571 return connection.read_only 

572 

573 def on_connect(self): 

574 def notices(conn): 

575 conn.add_notice_handler(_log_notices) 

576 

577 fns = [notices] 

578 

579 if self.isolation_level is not None: 

580 

581 def on_connect(conn): 

582 self.set_isolation_level(conn, self.isolation_level) 

583 

584 fns.append(on_connect) 

585 

586 # fns always has the notices function 

587 def on_connect(conn): 

588 for fn in fns: 

589 fn(conn) 

590 

591 return on_connect 

592 

593 def is_disconnect(self, e, connection, cursor): 

594 if isinstance(e, self.dbapi.Error) and connection is not None: 

595 if connection.closed or connection.broken: 

596 return True 

597 return False 

598 

599 def _do_prepared_twophase(self, connection, command, recover=False): 

600 dbapi_conn = connection.connection.dbapi_connection 

601 if ( 

602 recover 

603 # don't rely on psycopg providing enum symbols, compare with 

604 # eq/ne 

605 or dbapi_conn.info.transaction_status 

606 != self._psycopg_TransactionStatus.IDLE 

607 ): 

608 dbapi_conn.rollback() 

609 before_autocommit = dbapi_conn.autocommit 

610 try: 

611 if not before_autocommit: 

612 self._do_autocommit(dbapi_conn, True) 

613 dbapi_conn.execute(command) 

614 finally: 

615 if not before_autocommit: 

616 self._do_autocommit(dbapi_conn, before_autocommit) 

617 

618 def do_rollback_twophase( 

619 self, connection, xid, is_prepared=True, recover=False 

620 ): 

621 if is_prepared: 

622 self._do_prepared_twophase( 

623 connection, f"ROLLBACK PREPARED '{xid}'", recover=recover 

624 ) 

625 else: 

626 self.do_rollback(connection.connection) 

627 

628 def do_commit_twophase( 

629 self, connection, xid, is_prepared=True, recover=False 

630 ): 

631 if is_prepared: 

632 self._do_prepared_twophase( 

633 connection, f"COMMIT PREPARED '{xid}'", recover=recover 

634 ) 

635 else: 

636 self.do_commit(connection.connection) 

637 

638 @util.memoized_property 

639 def _dialect_specific_select_one(self): 

640 return ";" 

641 

642 

643class AsyncAdapt_psycopg_cursor: 

644 __slots__ = ("_cursor", "await_", "_rows") 

645 

646 _psycopg_ExecStatus = None 

647 

648 def __init__(self, cursor, await_) -> None: 

649 self._cursor = cursor 

650 self.await_ = await_ 

651 self._rows = deque() 

652 

653 def __getattr__(self, name): 

654 return getattr(self._cursor, name) 

655 

656 @property 

657 def arraysize(self): 

658 return self._cursor.arraysize 

659 

660 @arraysize.setter 

661 def arraysize(self, value): 

662 self._cursor.arraysize = value 

663 

664 async def _async_soft_close(self) -> None: 

665 return 

666 

667 def close(self): 

668 self._rows.clear() 

669 # Normal cursor just call _close() in a non-sync way. 

670 self._cursor._close() 

671 

672 def execute(self, query, params=None, **kw): 

673 result = self.await_(self._cursor.execute(query, params, **kw)) 

674 # sqlalchemy result is not async, so need to pull all rows here 

675 res = self._cursor.pgresult 

676 

677 # don't rely on psycopg providing enum symbols, compare with 

678 # eq/ne 

679 if res and res.status == self._psycopg_ExecStatus.TUPLES_OK: 

680 rows = self.await_(self._cursor.fetchall()) 

681 self._rows = deque(rows) 

682 return result 

683 

684 def executemany(self, query, params_seq): 

685 return self.await_(self._cursor.executemany(query, params_seq)) 

686 

687 def __iter__(self): 

688 while self._rows: 

689 yield self._rows.popleft() 

690 

691 def fetchone(self): 

692 if self._rows: 

693 return self._rows.popleft() 

694 else: 

695 return None 

696 

697 def fetchmany(self, size=None): 

698 if size is None: 

699 size = self._cursor.arraysize 

700 

701 rr = self._rows 

702 return [rr.popleft() for _ in range(min(size, len(rr)))] 

703 

704 def fetchall(self): 

705 retval = list(self._rows) 

706 self._rows.clear() 

707 return retval 

708 

709 

710class AsyncAdapt_psycopg_ss_cursor(AsyncAdapt_psycopg_cursor): 

711 def execute(self, query, params=None, **kw): 

712 self.await_(self._cursor.execute(query, params, **kw)) 

713 return self 

714 

715 def close(self): 

716 self.await_(self._cursor.close()) 

717 

718 def fetchone(self): 

719 return self.await_(self._cursor.fetchone()) 

720 

721 def fetchmany(self, size=0): 

722 return self.await_(self._cursor.fetchmany(size)) 

723 

724 def fetchall(self): 

725 return self.await_(self._cursor.fetchall()) 

726 

727 def __iter__(self): 

728 iterator = self._cursor.__aiter__() 

729 while True: 

730 try: 

731 yield self.await_(iterator.__anext__()) 

732 except StopAsyncIteration: 

733 break 

734 

735 

736class AsyncAdapt_psycopg_connection(AdaptedConnection): 

737 _connection: AsyncConnection 

738 __slots__ = () 

739 await_ = staticmethod(await_only) 

740 

741 def __init__(self, connection) -> None: 

742 self._connection = connection 

743 

744 def __getattr__(self, name): 

745 return getattr(self._connection, name) 

746 

747 def execute(self, query, params=None, **kw): 

748 cursor = self.await_(self._connection.execute(query, params, **kw)) 

749 return AsyncAdapt_psycopg_cursor(cursor, self.await_) 

750 

751 def cursor(self, *args, **kw): 

752 cursor = self._connection.cursor(*args, **kw) 

753 if hasattr(cursor, "name"): 

754 return AsyncAdapt_psycopg_ss_cursor(cursor, self.await_) 

755 else: 

756 return AsyncAdapt_psycopg_cursor(cursor, self.await_) 

757 

758 def commit(self): 

759 self.await_(self._connection.commit()) 

760 

761 def rollback(self): 

762 self.await_(self._connection.rollback()) 

763 

764 def close(self): 

765 self.await_(self._connection.close()) 

766 

767 @property 

768 def autocommit(self): 

769 return self._connection.autocommit 

770 

771 @autocommit.setter 

772 def autocommit(self, value): 

773 self.set_autocommit(value) 

774 

775 def set_autocommit(self, value): 

776 self.await_(self._connection.set_autocommit(value)) 

777 

778 def set_isolation_level(self, value): 

779 self.await_(self._connection.set_isolation_level(value)) 

780 

781 def set_read_only(self, value): 

782 self.await_(self._connection.set_read_only(value)) 

783 

784 def set_deferrable(self, value): 

785 self.await_(self._connection.set_deferrable(value)) 

786 

787 

788class AsyncAdaptFallback_psycopg_connection(AsyncAdapt_psycopg_connection): 

789 __slots__ = () 

790 await_ = staticmethod(await_fallback) 

791 

792 

793class PsycopgAdaptDBAPI: 

794 def __init__(self, psycopg) -> None: 

795 self.psycopg = psycopg 

796 

797 for k, v in self.psycopg.__dict__.items(): 

798 if k != "connect": 

799 self.__dict__[k] = v 

800 

801 def connect(self, *arg, **kw): 

802 async_fallback = kw.pop("async_fallback", False) 

803 creator_fn = kw.pop( 

804 "async_creator_fn", self.psycopg.AsyncConnection.connect 

805 ) 

806 if util.asbool(async_fallback): 

807 return AsyncAdaptFallback_psycopg_connection( 

808 await_fallback(creator_fn(*arg, **kw)) 

809 ) 

810 else: 

811 return AsyncAdapt_psycopg_connection( 

812 await_only(creator_fn(*arg, **kw)) 

813 ) 

814 

815 

816class PGDialectAsync_psycopg(PGDialect_psycopg): 

817 is_async = True 

818 supports_statement_cache = True 

819 

820 @classmethod 

821 def import_dbapi(cls): 

822 import psycopg 

823 from psycopg.pq import ExecStatus 

824 

825 AsyncAdapt_psycopg_cursor._psycopg_ExecStatus = ExecStatus 

826 

827 return PsycopgAdaptDBAPI(psycopg) 

828 

829 @classmethod 

830 def get_pool_class(cls, url): 

831 async_fallback = url.query.get("async_fallback", False) 

832 

833 if util.asbool(async_fallback): 

834 return pool.FallbackAsyncAdaptedQueuePool 

835 else: 

836 return pool.AsyncAdaptedQueuePool 

837 

838 def _type_info_fetch(self, connection, name): 

839 from psycopg.types import TypeInfo 

840 

841 adapted = connection.connection 

842 return adapted.await_(TypeInfo.fetch(adapted.driver_connection, name)) 

843 

844 def _do_isolation_level(self, connection, autocommit, isolation_level): 

845 connection.set_autocommit(autocommit) 

846 connection.set_isolation_level(isolation_level) 

847 

848 def _do_autocommit(self, connection, value): 

849 connection.set_autocommit(value) 

850 

851 def set_readonly(self, connection, value): 

852 connection.set_read_only(value) 

853 

854 def set_deferrable(self, connection, value): 

855 connection.set_deferrable(value) 

856 

857 def get_driver_connection(self, connection): 

858 return connection._connection 

859 

860 

861dialect = PGDialect_psycopg 

862dialect_async = PGDialectAsync_psycopg