Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

396 statements  

1# dialects/postgresql/psycopg.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9r""" 

10.. dialect:: postgresql+psycopg 

11 :name: psycopg (a.k.a. psycopg 3) 

12 :dbapi: psycopg 

13 :connectstring: postgresql+psycopg://user:password@host:port/dbname[?key=value&key=value...] 

14 :url: https://pypi.org/project/psycopg/ 

15 

16``psycopg`` is the package and module name for version 3 of the ``psycopg`` 

17database driver, formerly known as ``psycopg2``. This driver is different 

18enough from its ``psycopg2`` predecessor that SQLAlchemy supports it 

19via a totally separate dialect; support for ``psycopg2`` is expected to remain 

20for as long as that package continues to function for modern Python versions, 

21and also remains the default dialect for the ``postgresql://`` dialect 

22series. 

23 

24The SQLAlchemy ``psycopg`` dialect provides both a sync and an async 

25implementation under the same dialect name. The proper version is 

26selected depending on how the engine is created: 

27 

28* calling :func:`_sa.create_engine` with ``postgresql+psycopg://...`` will 

29 automatically select the sync version, e.g.:: 

30 

31 from sqlalchemy import create_engine 

32 

33 sync_engine = create_engine( 

34 "postgresql+psycopg://scott:tiger@localhost/test" 

35 ) 

36 

37* calling :func:`_asyncio.create_async_engine` with 

38 ``postgresql+psycopg://...`` will automatically select the async version, 

39 e.g.:: 

40 

41 from sqlalchemy.ext.asyncio import create_async_engine 

42 

43 asyncio_engine = create_async_engine( 

44 "postgresql+psycopg://scott:tiger@localhost/test" 

45 ) 

46 

47The asyncio version of the dialect may also be specified explicitly using the 

48``psycopg_async`` suffix, as:: 

49 

50 from sqlalchemy.ext.asyncio import create_async_engine 

51 

52 asyncio_engine = create_async_engine( 

53 "postgresql+psycopg_async://scott:tiger@localhost/test" 

54 ) 

55 

56.. seealso:: 

57 

58 :ref:`postgresql_psycopg2` - The SQLAlchemy ``psycopg`` 

59 dialect shares most of its behavior with the ``psycopg2`` dialect. 

60 Further documentation is available there. 

61 

62Using a different Cursor class 

63------------------------------ 

64 

65One of the differences between ``psycopg`` and the older ``psycopg2`` 

66is how bound parameters are handled: ``psycopg2`` would bind them 

67client side, while ``psycopg`` by default will bind them server side. 

68 

69It's possible to configure ``psycopg`` to do client side binding by 

70specifying the ``cursor_factory`` to be ``ClientCursor`` when creating 

71the engine:: 

72 

73 from psycopg import ClientCursor 

74 

75 client_side_engine = create_engine( 

76 "postgresql+psycopg://...", 

77 connect_args={"cursor_factory": ClientCursor}, 

78 ) 

79 

80Similarly when using an async engine the ``AsyncClientCursor`` can be 

81specified:: 

82 

83 from psycopg import AsyncClientCursor 

84 

85 client_side_engine = create_async_engine( 

86 "postgresql+psycopg://...", 

87 connect_args={"cursor_factory": AsyncClientCursor}, 

88 ) 

89 

90.. seealso:: 

91 

92 `Client-side-binding cursors <https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#client-side-binding-cursors>`_ 

93 

94""" # noqa 

95from __future__ import annotations 

96 

97from collections import deque 

98import logging 

99import re 

100from typing import cast 

101from typing import TYPE_CHECKING 

102 

103from . import ranges 

104from ._psycopg_common import _PGDialect_common_psycopg 

105from ._psycopg_common import _PGExecutionContext_common_psycopg 

106from .base import INTERVAL 

107from .base import PGCompiler 

108from .base import PGIdentifierPreparer 

109from .base import REGCONFIG 

110from .json import JSON 

111from .json import JSONB 

112from .json import JSONPathType 

113from .types import CITEXT 

114from ... import pool 

115from ... import util 

116from ...engine import AdaptedConnection 

117from ...sql import sqltypes 

118from ...util.concurrency import await_fallback 

119from ...util.concurrency import await_only 

120 

121if TYPE_CHECKING: 

122 from typing import Iterable 

123 

124 from psycopg import AsyncConnection 

125 

126logger = logging.getLogger("sqlalchemy.dialects.postgresql") 

127 

128 

129class _PGString(sqltypes.String): 

130 render_bind_cast = True 

131 

132 

133class _PGREGCONFIG(REGCONFIG): 

134 render_bind_cast = True 

135 

136 

137class _PGJSON(JSON): 

138 def bind_processor(self, dialect): 

139 return self._make_bind_processor(None, dialect._psycopg_Json) 

140 

141 def result_processor(self, dialect, coltype): 

142 return None 

143 

144 

145class _PGJSONB(JSONB): 

146 def bind_processor(self, dialect): 

147 return self._make_bind_processor(None, dialect._psycopg_Jsonb) 

148 

149 def result_processor(self, dialect, coltype): 

150 return None 

151 

152 

153class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType): 

154 __visit_name__ = "json_int_index" 

155 

156 render_bind_cast = True 

157 

158 

159class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType): 

160 __visit_name__ = "json_str_index" 

161 

162 render_bind_cast = True 

163 

164 

165class _PGJSONPathType(JSONPathType): 

166 pass 

167 

168 

169class _PGInterval(INTERVAL): 

170 render_bind_cast = True 

171 

172 

173class _PGTimeStamp(sqltypes.DateTime): 

174 render_bind_cast = True 

175 

176 

177class _PGDate(sqltypes.Date): 

178 render_bind_cast = True 

179 

180 

181class _PGTime(sqltypes.Time): 

182 render_bind_cast = True 

183 

184 

185class _PGInteger(sqltypes.Integer): 

186 render_bind_cast = True 

187 

188 

189class _PGSmallInteger(sqltypes.SmallInteger): 

190 render_bind_cast = True 

191 

192 

193class _PGNullType(sqltypes.NullType): 

194 render_bind_cast = True 

195 

196 

197class _PGBigInteger(sqltypes.BigInteger): 

198 render_bind_cast = True 

199 

200 

201class _PGBoolean(sqltypes.Boolean): 

202 render_bind_cast = True 

203 

204 

205class _PsycopgRange(ranges.AbstractSingleRangeImpl): 

206 def bind_processor(self, dialect): 

207 psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range 

208 

209 def to_range(value): 

210 if isinstance(value, ranges.Range): 

211 value = psycopg_Range( 

212 value.lower, value.upper, value.bounds, value.empty 

213 ) 

214 return value 

215 

216 return to_range 

217 

218 def result_processor(self, dialect, coltype): 

219 def to_range(value): 

220 if value is not None: 

221 value = ranges.Range( 

222 value._lower, 

223 value._upper, 

224 bounds=value._bounds if value._bounds else "[)", 

225 empty=not value._bounds, 

226 ) 

227 return value 

228 

229 return to_range 

230 

231 

232class _PsycopgMultiRange(ranges.AbstractMultiRangeImpl): 

233 def bind_processor(self, dialect): 

234 psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range 

235 psycopg_Multirange = cast( 

236 PGDialect_psycopg, dialect 

237 )._psycopg_Multirange 

238 

239 NoneType = type(None) 

240 

241 def to_range(value): 

242 if isinstance(value, (str, NoneType, psycopg_Multirange)): 

243 return value 

244 

245 return psycopg_Multirange( 

246 [ 

247 psycopg_Range( 

248 element.lower, 

249 element.upper, 

250 element.bounds, 

251 element.empty, 

252 ) 

253 for element in cast("Iterable[ranges.Range]", value) 

254 ] 

255 ) 

256 

257 return to_range 

258 

259 def result_processor(self, dialect, coltype): 

260 def to_range(value): 

261 if value is None: 

262 return None 

263 else: 

264 return ranges.MultiRange( 

265 ranges.Range( 

266 elem._lower, 

267 elem._upper, 

268 bounds=elem._bounds if elem._bounds else "[)", 

269 empty=not elem._bounds, 

270 ) 

271 for elem in value 

272 ) 

273 

274 return to_range 

275 

276 

277class PGExecutionContext_psycopg(_PGExecutionContext_common_psycopg): 

278 pass 

279 

280 

281class PGCompiler_psycopg(PGCompiler): 

282 pass 

283 

284 

285class PGIdentifierPreparer_psycopg(PGIdentifierPreparer): 

286 pass 

287 

288 

289def _log_notices(diagnostic): 

290 logger.info("%s: %s", diagnostic.severity, diagnostic.message_primary) 

291 

292 

293class PGDialect_psycopg(_PGDialect_common_psycopg): 

294 driver = "psycopg" 

295 

296 supports_statement_cache = True 

297 supports_server_side_cursors = True 

298 default_paramstyle = "pyformat" 

299 supports_sane_multi_rowcount = True 

300 

301 execution_ctx_cls = PGExecutionContext_psycopg 

302 statement_compiler = PGCompiler_psycopg 

303 preparer = PGIdentifierPreparer_psycopg 

304 psycopg_version = (0, 0) 

305 

306 _has_native_hstore = True 

307 _psycopg_adapters_map = None 

308 

309 colspecs = util.update_copy( 

310 _PGDialect_common_psycopg.colspecs, 

311 { 

312 sqltypes.String: _PGString, 

313 REGCONFIG: _PGREGCONFIG, 

314 JSON: _PGJSON, 

315 CITEXT: CITEXT, 

316 sqltypes.JSON: _PGJSON, 

317 JSONB: _PGJSONB, 

318 sqltypes.JSON.JSONPathType: _PGJSONPathType, 

319 sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType, 

320 sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType, 

321 sqltypes.Interval: _PGInterval, 

322 INTERVAL: _PGInterval, 

323 sqltypes.Date: _PGDate, 

324 sqltypes.DateTime: _PGTimeStamp, 

325 sqltypes.Time: _PGTime, 

326 sqltypes.Integer: _PGInteger, 

327 sqltypes.SmallInteger: _PGSmallInteger, 

328 sqltypes.BigInteger: _PGBigInteger, 

329 ranges.AbstractSingleRange: _PsycopgRange, 

330 ranges.AbstractMultiRange: _PsycopgMultiRange, 

331 }, 

332 ) 

333 

334 def __init__(self, **kwargs): 

335 super().__init__(**kwargs) 

336 

337 if self.dbapi: 

338 m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__) 

339 if m: 

340 self.psycopg_version = tuple( 

341 int(x) for x in m.group(1, 2, 3) if x is not None 

342 ) 

343 

344 if self.psycopg_version < (3, 0, 2): 

345 raise ImportError( 

346 "psycopg version 3.0.2 or higher is required." 

347 ) 

348 

349 from psycopg.adapt import AdaptersMap 

350 

351 self._psycopg_adapters_map = adapters_map = AdaptersMap( 

352 self.dbapi.adapters 

353 ) 

354 

355 if self._native_inet_types is False: 

356 import psycopg.types.string 

357 

358 adapters_map.register_loader( 

359 "inet", psycopg.types.string.TextLoader 

360 ) 

361 adapters_map.register_loader( 

362 "cidr", psycopg.types.string.TextLoader 

363 ) 

364 

365 if self._json_deserializer: 

366 from psycopg.types.json import set_json_loads 

367 

368 set_json_loads(self._json_deserializer, adapters_map) 

369 

370 if self._json_serializer: 

371 from psycopg.types.json import set_json_dumps 

372 

373 set_json_dumps(self._json_serializer, adapters_map) 

374 

375 def create_connect_args(self, url): 

376 # see https://github.com/psycopg/psycopg/issues/83 

377 cargs, cparams = super().create_connect_args(url) 

378 

379 if self._psycopg_adapters_map: 

380 cparams["context"] = self._psycopg_adapters_map 

381 if self.client_encoding is not None: 

382 cparams["client_encoding"] = self.client_encoding 

383 return cargs, cparams 

384 

385 def _type_info_fetch(self, connection, name): 

386 from psycopg.types import TypeInfo 

387 

388 return TypeInfo.fetch(connection.connection.driver_connection, name) 

389 

390 def initialize(self, connection): 

391 super().initialize(connection) 

392 

393 # PGDialect.initialize() checks server version for <= 8.2 and sets 

394 # this flag to False if so 

395 if not self.insert_returning: 

396 self.insert_executemany_returning = False 

397 

398 # HSTORE can't be registered until we have a connection so that 

399 # we can look up its OID, so we set up this adapter in 

400 # initialize() 

401 if self.use_native_hstore: 

402 info = self._type_info_fetch(connection, "hstore") 

403 self._has_native_hstore = info is not None 

404 if self._has_native_hstore: 

405 from psycopg.types.hstore import register_hstore 

406 

407 # register the adapter for connections made subsequent to 

408 # this one 

409 assert self._psycopg_adapters_map 

410 register_hstore(info, self._psycopg_adapters_map) 

411 

412 # register the adapter for this connection 

413 assert connection.connection 

414 register_hstore(info, connection.connection.driver_connection) 

415 

416 @classmethod 

417 def import_dbapi(cls): 

418 import psycopg 

419 

420 return psycopg 

421 

422 @classmethod 

423 def get_async_dialect_cls(cls, url): 

424 return PGDialectAsync_psycopg 

425 

426 @util.memoized_property 

427 def _isolation_lookup(self): 

428 return { 

429 "READ COMMITTED": self.dbapi.IsolationLevel.READ_COMMITTED, 

430 "READ UNCOMMITTED": self.dbapi.IsolationLevel.READ_UNCOMMITTED, 

431 "REPEATABLE READ": self.dbapi.IsolationLevel.REPEATABLE_READ, 

432 "SERIALIZABLE": self.dbapi.IsolationLevel.SERIALIZABLE, 

433 } 

434 

435 @util.memoized_property 

436 def _psycopg_Json(self): 

437 from psycopg.types import json 

438 

439 return json.Json 

440 

441 @util.memoized_property 

442 def _psycopg_Jsonb(self): 

443 from psycopg.types import json 

444 

445 return json.Jsonb 

446 

447 @util.memoized_property 

448 def _psycopg_TransactionStatus(self): 

449 from psycopg.pq import TransactionStatus 

450 

451 return TransactionStatus 

452 

453 @util.memoized_property 

454 def _psycopg_Range(self): 

455 from psycopg.types.range import Range 

456 

457 return Range 

458 

459 @util.memoized_property 

460 def _psycopg_Multirange(self): 

461 from psycopg.types.multirange import Multirange 

462 

463 return Multirange 

464 

465 def _do_isolation_level(self, connection, autocommit, isolation_level): 

466 connection.autocommit = autocommit 

467 connection.isolation_level = isolation_level 

468 

469 def get_isolation_level(self, dbapi_connection): 

470 status_before = dbapi_connection.info.transaction_status 

471 value = super().get_isolation_level(dbapi_connection) 

472 

473 # don't rely on psycopg providing enum symbols, compare with 

474 # eq/ne 

475 if status_before == self._psycopg_TransactionStatus.IDLE: 

476 dbapi_connection.rollback() 

477 return value 

478 

479 def set_isolation_level(self, dbapi_connection, level): 

480 if level == "AUTOCOMMIT": 

481 self._do_isolation_level( 

482 dbapi_connection, autocommit=True, isolation_level=None 

483 ) 

484 else: 

485 self._do_isolation_level( 

486 dbapi_connection, 

487 autocommit=False, 

488 isolation_level=self._isolation_lookup[level], 

489 ) 

490 

491 def set_readonly(self, connection, value): 

492 connection.read_only = value 

493 

494 def get_readonly(self, connection): 

495 return connection.read_only 

496 

497 def on_connect(self): 

498 def notices(conn): 

499 conn.add_notice_handler(_log_notices) 

500 

501 fns = [notices] 

502 

503 if self.isolation_level is not None: 

504 

505 def on_connect(conn): 

506 self.set_isolation_level(conn, self.isolation_level) 

507 

508 fns.append(on_connect) 

509 

510 # fns always has the notices function 

511 def on_connect(conn): 

512 for fn in fns: 

513 fn(conn) 

514 

515 return on_connect 

516 

517 def is_disconnect(self, e, connection, cursor): 

518 if isinstance(e, self.dbapi.Error) and connection is not None: 

519 if connection.closed or connection.broken: 

520 return True 

521 return False 

522 

523 def _do_prepared_twophase(self, connection, command, recover=False): 

524 dbapi_conn = connection.connection.dbapi_connection 

525 if ( 

526 recover 

527 # don't rely on psycopg providing enum symbols, compare with 

528 # eq/ne 

529 or dbapi_conn.info.transaction_status 

530 != self._psycopg_TransactionStatus.IDLE 

531 ): 

532 dbapi_conn.rollback() 

533 before_autocommit = dbapi_conn.autocommit 

534 try: 

535 if not before_autocommit: 

536 self._do_autocommit(dbapi_conn, True) 

537 dbapi_conn.execute(command) 

538 finally: 

539 if not before_autocommit: 

540 self._do_autocommit(dbapi_conn, before_autocommit) 

541 

542 def do_rollback_twophase( 

543 self, connection, xid, is_prepared=True, recover=False 

544 ): 

545 if is_prepared: 

546 self._do_prepared_twophase( 

547 connection, f"ROLLBACK PREPARED '{xid}'", recover=recover 

548 ) 

549 else: 

550 self.do_rollback(connection.connection) 

551 

552 def do_commit_twophase( 

553 self, connection, xid, is_prepared=True, recover=False 

554 ): 

555 if is_prepared: 

556 self._do_prepared_twophase( 

557 connection, f"COMMIT PREPARED '{xid}'", recover=recover 

558 ) 

559 else: 

560 self.do_commit(connection.connection) 

561 

562 @util.memoized_property 

563 def _dialect_specific_select_one(self): 

564 return ";" 

565 

566 

567class AsyncAdapt_psycopg_cursor: 

568 __slots__ = ("_cursor", "await_", "_rows") 

569 

570 _psycopg_ExecStatus = None 

571 

572 def __init__(self, cursor, await_) -> None: 

573 self._cursor = cursor 

574 self.await_ = await_ 

575 self._rows = deque() 

576 

577 def __getattr__(self, name): 

578 return getattr(self._cursor, name) 

579 

580 @property 

581 def arraysize(self): 

582 return self._cursor.arraysize 

583 

584 @arraysize.setter 

585 def arraysize(self, value): 

586 self._cursor.arraysize = value 

587 

588 async def _async_soft_close(self) -> None: 

589 return 

590 

591 def close(self): 

592 self._rows.clear() 

593 # Normal cursor just call _close() in a non-sync way. 

594 self._cursor._close() 

595 

596 def execute(self, query, params=None, **kw): 

597 result = self.await_(self._cursor.execute(query, params, **kw)) 

598 # sqlalchemy result is not async, so need to pull all rows here 

599 res = self._cursor.pgresult 

600 

601 # don't rely on psycopg providing enum symbols, compare with 

602 # eq/ne 

603 if res and res.status == self._psycopg_ExecStatus.TUPLES_OK: 

604 rows = self.await_(self._cursor.fetchall()) 

605 self._rows = deque(rows) 

606 return result 

607 

608 def executemany(self, query, params_seq): 

609 return self.await_(self._cursor.executemany(query, params_seq)) 

610 

611 def __iter__(self): 

612 while self._rows: 

613 yield self._rows.popleft() 

614 

615 def fetchone(self): 

616 if self._rows: 

617 return self._rows.popleft() 

618 else: 

619 return None 

620 

621 def fetchmany(self, size=None): 

622 if size is None: 

623 size = self._cursor.arraysize 

624 

625 rr = self._rows 

626 return [rr.popleft() for _ in range(min(size, len(rr)))] 

627 

628 def fetchall(self): 

629 retval = list(self._rows) 

630 self._rows.clear() 

631 return retval 

632 

633 

634class AsyncAdapt_psycopg_ss_cursor(AsyncAdapt_psycopg_cursor): 

635 def execute(self, query, params=None, **kw): 

636 self.await_(self._cursor.execute(query, params, **kw)) 

637 return self 

638 

639 def close(self): 

640 self.await_(self._cursor.close()) 

641 

642 def fetchone(self): 

643 return self.await_(self._cursor.fetchone()) 

644 

645 def fetchmany(self, size=0): 

646 return self.await_(self._cursor.fetchmany(size)) 

647 

648 def fetchall(self): 

649 return self.await_(self._cursor.fetchall()) 

650 

651 def __iter__(self): 

652 iterator = self._cursor.__aiter__() 

653 while True: 

654 try: 

655 yield self.await_(iterator.__anext__()) 

656 except StopAsyncIteration: 

657 break 

658 

659 

660class AsyncAdapt_psycopg_connection(AdaptedConnection): 

661 _connection: AsyncConnection 

662 __slots__ = () 

663 await_ = staticmethod(await_only) 

664 

665 def __init__(self, connection) -> None: 

666 self._connection = connection 

667 

668 def __getattr__(self, name): 

669 return getattr(self._connection, name) 

670 

671 def execute(self, query, params=None, **kw): 

672 cursor = self.await_(self._connection.execute(query, params, **kw)) 

673 return AsyncAdapt_psycopg_cursor(cursor, self.await_) 

674 

675 def cursor(self, *args, **kw): 

676 cursor = self._connection.cursor(*args, **kw) 

677 if hasattr(cursor, "name"): 

678 return AsyncAdapt_psycopg_ss_cursor(cursor, self.await_) 

679 else: 

680 return AsyncAdapt_psycopg_cursor(cursor, self.await_) 

681 

682 def commit(self): 

683 self.await_(self._connection.commit()) 

684 

685 def rollback(self): 

686 self.await_(self._connection.rollback()) 

687 

688 def close(self): 

689 self.await_(self._connection.close()) 

690 

691 @property 

692 def autocommit(self): 

693 return self._connection.autocommit 

694 

695 @autocommit.setter 

696 def autocommit(self, value): 

697 self.set_autocommit(value) 

698 

699 def set_autocommit(self, value): 

700 self.await_(self._connection.set_autocommit(value)) 

701 

702 def set_isolation_level(self, value): 

703 self.await_(self._connection.set_isolation_level(value)) 

704 

705 def set_read_only(self, value): 

706 self.await_(self._connection.set_read_only(value)) 

707 

708 def set_deferrable(self, value): 

709 self.await_(self._connection.set_deferrable(value)) 

710 

711 

712class AsyncAdaptFallback_psycopg_connection(AsyncAdapt_psycopg_connection): 

713 __slots__ = () 

714 await_ = staticmethod(await_fallback) 

715 

716 

717class PsycopgAdaptDBAPI: 

718 def __init__(self, psycopg) -> None: 

719 self.psycopg = psycopg 

720 

721 for k, v in self.psycopg.__dict__.items(): 

722 if k != "connect": 

723 self.__dict__[k] = v 

724 

725 def connect(self, *arg, **kw): 

726 async_fallback = kw.pop("async_fallback", False) 

727 creator_fn = kw.pop( 

728 "async_creator_fn", self.psycopg.AsyncConnection.connect 

729 ) 

730 if util.asbool(async_fallback): 

731 return AsyncAdaptFallback_psycopg_connection( 

732 await_fallback(creator_fn(*arg, **kw)) 

733 ) 

734 else: 

735 return AsyncAdapt_psycopg_connection( 

736 await_only(creator_fn(*arg, **kw)) 

737 ) 

738 

739 

740class PGDialectAsync_psycopg(PGDialect_psycopg): 

741 is_async = True 

742 supports_statement_cache = True 

743 

744 @classmethod 

745 def import_dbapi(cls): 

746 import psycopg 

747 from psycopg.pq import ExecStatus 

748 

749 AsyncAdapt_psycopg_cursor._psycopg_ExecStatus = ExecStatus 

750 

751 return PsycopgAdaptDBAPI(psycopg) 

752 

753 @classmethod 

754 def get_pool_class(cls, url): 

755 async_fallback = url.query.get("async_fallback", False) 

756 

757 if util.asbool(async_fallback): 

758 return pool.FallbackAsyncAdaptedQueuePool 

759 else: 

760 return pool.AsyncAdaptedQueuePool 

761 

762 def _type_info_fetch(self, connection, name): 

763 from psycopg.types import TypeInfo 

764 

765 adapted = connection.connection 

766 return adapted.await_(TypeInfo.fetch(adapted.driver_connection, name)) 

767 

768 def _do_isolation_level(self, connection, autocommit, isolation_level): 

769 connection.set_autocommit(autocommit) 

770 connection.set_isolation_level(isolation_level) 

771 

772 def _do_autocommit(self, connection, value): 

773 connection.set_autocommit(value) 

774 

775 def set_readonly(self, connection, value): 

776 connection.set_read_only(value) 

777 

778 def set_deferrable(self, connection, value): 

779 connection.set_deferrable(value) 

780 

781 def get_driver_connection(self, connection): 

782 return connection._connection 

783 

784 

785dialect = PGDialect_psycopg 

786dialect_async = PGDialectAsync_psycopg