Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/psycopg.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

394 statements  

1# dialects/postgresql/psycopg.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9r""" 

10.. dialect:: postgresql+psycopg 

11 :name: psycopg (a.k.a. psycopg 3) 

12 :dbapi: psycopg 

13 :connectstring: postgresql+psycopg://user:password@host:port/dbname[?key=value&key=value...] 

14 :url: https://pypi.org/project/psycopg/ 

15 

16``psycopg`` is the package and module name for version 3 of the ``psycopg`` 

17database driver, formerly known as ``psycopg2``. This driver is different 

18enough from its ``psycopg2`` predecessor that SQLAlchemy supports it 

19via a totally separate dialect; support for ``psycopg2`` is expected to remain 

20for as long as that package continues to function for modern Python versions, 

21and also remains the default dialect for the ``postgresql://`` dialect 

22series. 

23 

24The SQLAlchemy ``psycopg`` dialect provides both a sync and an async 

25implementation under the same dialect name. The proper version is 

26selected depending on how the engine is created: 

27 

28* calling :func:`_sa.create_engine` with ``postgresql+psycopg://...`` will 

29 automatically select the sync version, e.g.:: 

30 

31 from sqlalchemy import create_engine 

32 

33 sync_engine = create_engine( 

34 "postgresql+psycopg://scott:tiger@localhost/test" 

35 ) 

36 

37* calling :func:`_asyncio.create_async_engine` with 

38 ``postgresql+psycopg://...`` will automatically select the async version, 

39 e.g.:: 

40 

41 from sqlalchemy.ext.asyncio import create_async_engine 

42 

43 asyncio_engine = create_async_engine( 

44 "postgresql+psycopg://scott:tiger@localhost/test" 

45 ) 

46 

47The asyncio version of the dialect may also be specified explicitly using the 

48``psycopg_async`` suffix, as:: 

49 

50 from sqlalchemy.ext.asyncio import create_async_engine 

51 

52 asyncio_engine = create_async_engine( 

53 "postgresql+psycopg_async://scott:tiger@localhost/test" 

54 ) 

55 

56.. seealso:: 

57 

58 :ref:`postgresql_psycopg2` - The SQLAlchemy ``psycopg`` 

59 dialect shares most of its behavior with the ``psycopg2`` dialect. 

60 Further documentation is available there. 

61 

62Using a different Cursor class 

63------------------------------ 

64 

65One of the differences between ``psycopg`` and the older ``psycopg2`` 

66is how bound parameters are handled: ``psycopg2`` would bind them 

67client side, while ``psycopg`` by default will bind them server side. 

68 

69It's possible to configure ``psycopg`` to do client side binding by 

70specifying the ``cursor_factory`` to be ``ClientCursor`` when creating 

71the engine:: 

72 

73 from psycopg import ClientCursor 

74 

75 client_side_engine = create_engine( 

76 "postgresql+psycopg://...", 

77 connect_args={"cursor_factory": ClientCursor}, 

78 ) 

79 

80Similarly when using an async engine the ``AsyncClientCursor`` can be 

81specified:: 

82 

83 from psycopg import AsyncClientCursor 

84 

85 client_side_engine = create_async_engine( 

86 "postgresql+psycopg://...", 

87 connect_args={"cursor_factory": AsyncClientCursor}, 

88 ) 

89 

90.. seealso:: 

91 

92 `Client-side-binding cursors <https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#client-side-binding-cursors>`_ 

93 

94""" # noqa 

95from __future__ import annotations 

96 

97from collections import deque 

98import logging 

99import re 

100from typing import cast 

101from typing import TYPE_CHECKING 

102 

103from . import ranges 

104from ._psycopg_common import _PGDialect_common_psycopg 

105from ._psycopg_common import _PGExecutionContext_common_psycopg 

106from .base import INTERVAL 

107from .base import PGCompiler 

108from .base import PGIdentifierPreparer 

109from .base import REGCONFIG 

110from .json import JSON 

111from .json import JSONB 

112from .json import JSONPathType 

113from .types import CITEXT 

114from ... import pool 

115from ... import util 

116from ...engine import AdaptedConnection 

117from ...sql import sqltypes 

118from ...util.concurrency import await_fallback 

119from ...util.concurrency import await_only 

120 

121if TYPE_CHECKING: 

122 from typing import Iterable 

123 

124 from psycopg import AsyncConnection 

125 

126logger = logging.getLogger("sqlalchemy.dialects.postgresql") 

127 

128 

129class _PGString(sqltypes.String): 

130 render_bind_cast = True 

131 

132 

133class _PGREGCONFIG(REGCONFIG): 

134 render_bind_cast = True 

135 

136 

137class _PGJSON(JSON): 

138 def bind_processor(self, dialect): 

139 return self._make_bind_processor(None, dialect._psycopg_Json) 

140 

141 def result_processor(self, dialect, coltype): 

142 return None 

143 

144 

145class _PGJSONB(JSONB): 

146 def bind_processor(self, dialect): 

147 return self._make_bind_processor(None, dialect._psycopg_Jsonb) 

148 

149 def result_processor(self, dialect, coltype): 

150 return None 

151 

152 

153class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType): 

154 __visit_name__ = "json_int_index" 

155 

156 render_bind_cast = True 

157 

158 

159class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType): 

160 __visit_name__ = "json_str_index" 

161 

162 render_bind_cast = True 

163 

164 

165class _PGJSONPathType(JSONPathType): 

166 pass 

167 

168 

169class _PGInterval(INTERVAL): 

170 render_bind_cast = True 

171 

172 

173class _PGTimeStamp(sqltypes.DateTime): 

174 render_bind_cast = True 

175 

176 

177class _PGDate(sqltypes.Date): 

178 render_bind_cast = True 

179 

180 

181class _PGTime(sqltypes.Time): 

182 render_bind_cast = True 

183 

184 

185class _PGInteger(sqltypes.Integer): 

186 render_bind_cast = True 

187 

188 

189class _PGSmallInteger(sqltypes.SmallInteger): 

190 render_bind_cast = True 

191 

192 

193class _PGNullType(sqltypes.NullType): 

194 render_bind_cast = True 

195 

196 

197class _PGBigInteger(sqltypes.BigInteger): 

198 render_bind_cast = True 

199 

200 

201class _PGBoolean(sqltypes.Boolean): 

202 render_bind_cast = True 

203 

204 

205class _PsycopgRange(ranges.AbstractSingleRangeImpl): 

206 def bind_processor(self, dialect): 

207 psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range 

208 

209 def to_range(value): 

210 if isinstance(value, ranges.Range): 

211 value = psycopg_Range( 

212 value.lower, value.upper, value.bounds, value.empty 

213 ) 

214 return value 

215 

216 return to_range 

217 

218 def result_processor(self, dialect, coltype): 

219 def to_range(value): 

220 if value is not None: 

221 value = ranges.Range( 

222 value._lower, 

223 value._upper, 

224 bounds=value._bounds if value._bounds else "[)", 

225 empty=not value._bounds, 

226 ) 

227 return value 

228 

229 return to_range 

230 

231 

232class _PsycopgMultiRange(ranges.AbstractMultiRangeImpl): 

233 def bind_processor(self, dialect): 

234 psycopg_Range = cast(PGDialect_psycopg, dialect)._psycopg_Range 

235 psycopg_Multirange = cast( 

236 PGDialect_psycopg, dialect 

237 )._psycopg_Multirange 

238 

239 NoneType = type(None) 

240 

241 def to_range(value): 

242 if isinstance(value, (str, NoneType, psycopg_Multirange)): 

243 return value 

244 

245 return psycopg_Multirange( 

246 [ 

247 psycopg_Range( 

248 element.lower, 

249 element.upper, 

250 element.bounds, 

251 element.empty, 

252 ) 

253 for element in cast("Iterable[ranges.Range]", value) 

254 ] 

255 ) 

256 

257 return to_range 

258 

259 def result_processor(self, dialect, coltype): 

260 def to_range(value): 

261 if value is None: 

262 return None 

263 else: 

264 return ranges.MultiRange( 

265 ranges.Range( 

266 elem._lower, 

267 elem._upper, 

268 bounds=elem._bounds if elem._bounds else "[)", 

269 empty=not elem._bounds, 

270 ) 

271 for elem in value 

272 ) 

273 

274 return to_range 

275 

276 

277class PGExecutionContext_psycopg(_PGExecutionContext_common_psycopg): 

278 pass 

279 

280 

281class PGCompiler_psycopg(PGCompiler): 

282 pass 

283 

284 

285class PGIdentifierPreparer_psycopg(PGIdentifierPreparer): 

286 pass 

287 

288 

289def _log_notices(diagnostic): 

290 logger.info("%s: %s", diagnostic.severity, diagnostic.message_primary) 

291 

292 

293class PGDialect_psycopg(_PGDialect_common_psycopg): 

294 driver = "psycopg" 

295 

296 supports_statement_cache = True 

297 supports_server_side_cursors = True 

298 default_paramstyle = "pyformat" 

299 supports_sane_multi_rowcount = True 

300 

301 execution_ctx_cls = PGExecutionContext_psycopg 

302 statement_compiler = PGCompiler_psycopg 

303 preparer = PGIdentifierPreparer_psycopg 

304 psycopg_version = (0, 0) 

305 

306 _has_native_hstore = True 

307 _psycopg_adapters_map = None 

308 

309 colspecs = util.update_copy( 

310 _PGDialect_common_psycopg.colspecs, 

311 { 

312 sqltypes.String: _PGString, 

313 REGCONFIG: _PGREGCONFIG, 

314 JSON: _PGJSON, 

315 CITEXT: CITEXT, 

316 sqltypes.JSON: _PGJSON, 

317 JSONB: _PGJSONB, 

318 sqltypes.JSON.JSONPathType: _PGJSONPathType, 

319 sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType, 

320 sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType, 

321 sqltypes.Interval: _PGInterval, 

322 INTERVAL: _PGInterval, 

323 sqltypes.Date: _PGDate, 

324 sqltypes.DateTime: _PGTimeStamp, 

325 sqltypes.Time: _PGTime, 

326 sqltypes.Integer: _PGInteger, 

327 sqltypes.SmallInteger: _PGSmallInteger, 

328 sqltypes.BigInteger: _PGBigInteger, 

329 ranges.AbstractSingleRange: _PsycopgRange, 

330 ranges.AbstractMultiRange: _PsycopgMultiRange, 

331 }, 

332 ) 

333 

334 def __init__(self, **kwargs): 

335 super().__init__(**kwargs) 

336 

337 if self.dbapi: 

338 m = re.match(r"(\d+)\.(\d+)(?:\.(\d+))?", self.dbapi.__version__) 

339 if m: 

340 self.psycopg_version = tuple( 

341 int(x) for x in m.group(1, 2, 3) if x is not None 

342 ) 

343 

344 if self.psycopg_version < (3, 0, 2): 

345 raise ImportError( 

346 "psycopg version 3.0.2 or higher is required." 

347 ) 

348 

349 from psycopg.adapt import AdaptersMap 

350 

351 self._psycopg_adapters_map = adapters_map = AdaptersMap( 

352 self.dbapi.adapters 

353 ) 

354 

355 if self._native_inet_types is False: 

356 import psycopg.types.string 

357 

358 adapters_map.register_loader( 

359 "inet", psycopg.types.string.TextLoader 

360 ) 

361 adapters_map.register_loader( 

362 "cidr", psycopg.types.string.TextLoader 

363 ) 

364 

365 if self._json_deserializer: 

366 from psycopg.types.json import set_json_loads 

367 

368 set_json_loads(self._json_deserializer, adapters_map) 

369 

370 if self._json_serializer: 

371 from psycopg.types.json import set_json_dumps 

372 

373 set_json_dumps(self._json_serializer, adapters_map) 

374 

375 def create_connect_args(self, url): 

376 # see https://github.com/psycopg/psycopg/issues/83 

377 cargs, cparams = super().create_connect_args(url) 

378 

379 if self._psycopg_adapters_map: 

380 cparams["context"] = self._psycopg_adapters_map 

381 if self.client_encoding is not None: 

382 cparams["client_encoding"] = self.client_encoding 

383 return cargs, cparams 

384 

385 def _type_info_fetch(self, connection, name): 

386 from psycopg.types import TypeInfo 

387 

388 return TypeInfo.fetch(connection.connection.driver_connection, name) 

389 

390 def initialize(self, connection): 

391 super().initialize(connection) 

392 

393 # PGDialect.initialize() checks server version for <= 8.2 and sets 

394 # this flag to False if so 

395 if not self.insert_returning: 

396 self.insert_executemany_returning = False 

397 

398 # HSTORE can't be registered until we have a connection so that 

399 # we can look up its OID, so we set up this adapter in 

400 # initialize() 

401 if self.use_native_hstore: 

402 info = self._type_info_fetch(connection, "hstore") 

403 self._has_native_hstore = info is not None 

404 if self._has_native_hstore: 

405 from psycopg.types.hstore import register_hstore 

406 

407 # register the adapter for connections made subsequent to 

408 # this one 

409 assert self._psycopg_adapters_map 

410 register_hstore(info, self._psycopg_adapters_map) 

411 

412 # register the adapter for this connection 

413 assert connection.connection 

414 register_hstore(info, connection.connection.driver_connection) 

415 

416 @classmethod 

417 def import_dbapi(cls): 

418 import psycopg 

419 

420 return psycopg 

421 

422 @classmethod 

423 def get_async_dialect_cls(cls, url): 

424 return PGDialectAsync_psycopg 

425 

426 @util.memoized_property 

427 def _isolation_lookup(self): 

428 return { 

429 "READ COMMITTED": self.dbapi.IsolationLevel.READ_COMMITTED, 

430 "READ UNCOMMITTED": self.dbapi.IsolationLevel.READ_UNCOMMITTED, 

431 "REPEATABLE READ": self.dbapi.IsolationLevel.REPEATABLE_READ, 

432 "SERIALIZABLE": self.dbapi.IsolationLevel.SERIALIZABLE, 

433 } 

434 

435 @util.memoized_property 

436 def _psycopg_Json(self): 

437 from psycopg.types import json 

438 

439 return json.Json 

440 

441 @util.memoized_property 

442 def _psycopg_Jsonb(self): 

443 from psycopg.types import json 

444 

445 return json.Jsonb 

446 

447 @util.memoized_property 

448 def _psycopg_TransactionStatus(self): 

449 from psycopg.pq import TransactionStatus 

450 

451 return TransactionStatus 

452 

453 @util.memoized_property 

454 def _psycopg_Range(self): 

455 from psycopg.types.range import Range 

456 

457 return Range 

458 

459 @util.memoized_property 

460 def _psycopg_Multirange(self): 

461 from psycopg.types.multirange import Multirange 

462 

463 return Multirange 

464 

465 def _do_isolation_level(self, connection, autocommit, isolation_level): 

466 connection.autocommit = autocommit 

467 connection.isolation_level = isolation_level 

468 

469 def get_isolation_level(self, dbapi_connection): 

470 status_before = dbapi_connection.info.transaction_status 

471 value = super().get_isolation_level(dbapi_connection) 

472 

473 # don't rely on psycopg providing enum symbols, compare with 

474 # eq/ne 

475 if status_before == self._psycopg_TransactionStatus.IDLE: 

476 dbapi_connection.rollback() 

477 return value 

478 

479 def set_isolation_level(self, dbapi_connection, level): 

480 if level == "AUTOCOMMIT": 

481 self._do_isolation_level( 

482 dbapi_connection, autocommit=True, isolation_level=None 

483 ) 

484 else: 

485 self._do_isolation_level( 

486 dbapi_connection, 

487 autocommit=False, 

488 isolation_level=self._isolation_lookup[level], 

489 ) 

490 

491 def set_readonly(self, connection, value): 

492 connection.read_only = value 

493 

494 def get_readonly(self, connection): 

495 return connection.read_only 

496 

497 def on_connect(self): 

498 def notices(conn): 

499 conn.add_notice_handler(_log_notices) 

500 

501 fns = [notices] 

502 

503 if self.isolation_level is not None: 

504 

505 def on_connect(conn): 

506 self.set_isolation_level(conn, self.isolation_level) 

507 

508 fns.append(on_connect) 

509 

510 # fns always has the notices function 

511 def on_connect(conn): 

512 for fn in fns: 

513 fn(conn) 

514 

515 return on_connect 

516 

517 def is_disconnect(self, e, connection, cursor): 

518 if isinstance(e, self.dbapi.Error) and connection is not None: 

519 if connection.closed or connection.broken: 

520 return True 

521 return False 

522 

523 def _do_prepared_twophase(self, connection, command, recover=False): 

524 dbapi_conn = connection.connection.dbapi_connection 

525 if ( 

526 recover 

527 # don't rely on psycopg providing enum symbols, compare with 

528 # eq/ne 

529 or dbapi_conn.info.transaction_status 

530 != self._psycopg_TransactionStatus.IDLE 

531 ): 

532 dbapi_conn.rollback() 

533 before_autocommit = dbapi_conn.autocommit 

534 try: 

535 if not before_autocommit: 

536 self._do_autocommit(dbapi_conn, True) 

537 dbapi_conn.execute(command) 

538 finally: 

539 if not before_autocommit: 

540 self._do_autocommit(dbapi_conn, before_autocommit) 

541 

542 def do_rollback_twophase( 

543 self, connection, xid, is_prepared=True, recover=False 

544 ): 

545 if is_prepared: 

546 self._do_prepared_twophase( 

547 connection, f"ROLLBACK PREPARED '{xid}'", recover=recover 

548 ) 

549 else: 

550 self.do_rollback(connection.connection) 

551 

552 def do_commit_twophase( 

553 self, connection, xid, is_prepared=True, recover=False 

554 ): 

555 if is_prepared: 

556 self._do_prepared_twophase( 

557 connection, f"COMMIT PREPARED '{xid}'", recover=recover 

558 ) 

559 else: 

560 self.do_commit(connection.connection) 

561 

562 @util.memoized_property 

563 def _dialect_specific_select_one(self): 

564 return ";" 

565 

566 

567class AsyncAdapt_psycopg_cursor: 

568 __slots__ = ("_cursor", "await_", "_rows") 

569 

570 _psycopg_ExecStatus = None 

571 

572 def __init__(self, cursor, await_) -> None: 

573 self._cursor = cursor 

574 self.await_ = await_ 

575 self._rows = deque() 

576 

577 def __getattr__(self, name): 

578 return getattr(self._cursor, name) 

579 

580 @property 

581 def arraysize(self): 

582 return self._cursor.arraysize 

583 

584 @arraysize.setter 

585 def arraysize(self, value): 

586 self._cursor.arraysize = value 

587 

588 def close(self): 

589 self._rows.clear() 

590 # Normal cursor just call _close() in a non-sync way. 

591 self._cursor._close() 

592 

593 def execute(self, query, params=None, **kw): 

594 result = self.await_(self._cursor.execute(query, params, **kw)) 

595 # sqlalchemy result is not async, so need to pull all rows here 

596 res = self._cursor.pgresult 

597 

598 # don't rely on psycopg providing enum symbols, compare with 

599 # eq/ne 

600 if res and res.status == self._psycopg_ExecStatus.TUPLES_OK: 

601 rows = self.await_(self._cursor.fetchall()) 

602 self._rows = deque(rows) 

603 return result 

604 

605 def executemany(self, query, params_seq): 

606 return self.await_(self._cursor.executemany(query, params_seq)) 

607 

608 def __iter__(self): 

609 while self._rows: 

610 yield self._rows.popleft() 

611 

612 def fetchone(self): 

613 if self._rows: 

614 return self._rows.popleft() 

615 else: 

616 return None 

617 

618 def fetchmany(self, size=None): 

619 if size is None: 

620 size = self._cursor.arraysize 

621 

622 rr = self._rows 

623 return [rr.popleft() for _ in range(min(size, len(rr)))] 

624 

625 def fetchall(self): 

626 retval = list(self._rows) 

627 self._rows.clear() 

628 return retval 

629 

630 

631class AsyncAdapt_psycopg_ss_cursor(AsyncAdapt_psycopg_cursor): 

632 def execute(self, query, params=None, **kw): 

633 self.await_(self._cursor.execute(query, params, **kw)) 

634 return self 

635 

636 def close(self): 

637 self.await_(self._cursor.close()) 

638 

639 def fetchone(self): 

640 return self.await_(self._cursor.fetchone()) 

641 

642 def fetchmany(self, size=0): 

643 return self.await_(self._cursor.fetchmany(size)) 

644 

645 def fetchall(self): 

646 return self.await_(self._cursor.fetchall()) 

647 

648 def __iter__(self): 

649 iterator = self._cursor.__aiter__() 

650 while True: 

651 try: 

652 yield self.await_(iterator.__anext__()) 

653 except StopAsyncIteration: 

654 break 

655 

656 

657class AsyncAdapt_psycopg_connection(AdaptedConnection): 

658 _connection: AsyncConnection 

659 __slots__ = () 

660 await_ = staticmethod(await_only) 

661 

662 def __init__(self, connection) -> None: 

663 self._connection = connection 

664 

665 def __getattr__(self, name): 

666 return getattr(self._connection, name) 

667 

668 def execute(self, query, params=None, **kw): 

669 cursor = self.await_(self._connection.execute(query, params, **kw)) 

670 return AsyncAdapt_psycopg_cursor(cursor, self.await_) 

671 

672 def cursor(self, *args, **kw): 

673 cursor = self._connection.cursor(*args, **kw) 

674 if hasattr(cursor, "name"): 

675 return AsyncAdapt_psycopg_ss_cursor(cursor, self.await_) 

676 else: 

677 return AsyncAdapt_psycopg_cursor(cursor, self.await_) 

678 

679 def commit(self): 

680 self.await_(self._connection.commit()) 

681 

682 def rollback(self): 

683 self.await_(self._connection.rollback()) 

684 

685 def close(self): 

686 self.await_(self._connection.close()) 

687 

688 @property 

689 def autocommit(self): 

690 return self._connection.autocommit 

691 

692 @autocommit.setter 

693 def autocommit(self, value): 

694 self.set_autocommit(value) 

695 

696 def set_autocommit(self, value): 

697 self.await_(self._connection.set_autocommit(value)) 

698 

699 def set_isolation_level(self, value): 

700 self.await_(self._connection.set_isolation_level(value)) 

701 

702 def set_read_only(self, value): 

703 self.await_(self._connection.set_read_only(value)) 

704 

705 def set_deferrable(self, value): 

706 self.await_(self._connection.set_deferrable(value)) 

707 

708 

709class AsyncAdaptFallback_psycopg_connection(AsyncAdapt_psycopg_connection): 

710 __slots__ = () 

711 await_ = staticmethod(await_fallback) 

712 

713 

714class PsycopgAdaptDBAPI: 

715 def __init__(self, psycopg) -> None: 

716 self.psycopg = psycopg 

717 

718 for k, v in self.psycopg.__dict__.items(): 

719 if k != "connect": 

720 self.__dict__[k] = v 

721 

722 def connect(self, *arg, **kw): 

723 async_fallback = kw.pop("async_fallback", False) 

724 creator_fn = kw.pop( 

725 "async_creator_fn", self.psycopg.AsyncConnection.connect 

726 ) 

727 if util.asbool(async_fallback): 

728 return AsyncAdaptFallback_psycopg_connection( 

729 await_fallback(creator_fn(*arg, **kw)) 

730 ) 

731 else: 

732 return AsyncAdapt_psycopg_connection( 

733 await_only(creator_fn(*arg, **kw)) 

734 ) 

735 

736 

737class PGDialectAsync_psycopg(PGDialect_psycopg): 

738 is_async = True 

739 supports_statement_cache = True 

740 

741 @classmethod 

742 def import_dbapi(cls): 

743 import psycopg 

744 from psycopg.pq import ExecStatus 

745 

746 AsyncAdapt_psycopg_cursor._psycopg_ExecStatus = ExecStatus 

747 

748 return PsycopgAdaptDBAPI(psycopg) 

749 

750 @classmethod 

751 def get_pool_class(cls, url): 

752 async_fallback = url.query.get("async_fallback", False) 

753 

754 if util.asbool(async_fallback): 

755 return pool.FallbackAsyncAdaptedQueuePool 

756 else: 

757 return pool.AsyncAdaptedQueuePool 

758 

759 def _type_info_fetch(self, connection, name): 

760 from psycopg.types import TypeInfo 

761 

762 adapted = connection.connection 

763 return adapted.await_(TypeInfo.fetch(adapted.driver_connection, name)) 

764 

765 def _do_isolation_level(self, connection, autocommit, isolation_level): 

766 connection.set_autocommit(autocommit) 

767 connection.set_isolation_level(isolation_level) 

768 

769 def _do_autocommit(self, connection, value): 

770 connection.set_autocommit(value) 

771 

772 def set_readonly(self, connection, value): 

773 connection.set_read_only(value) 

774 

775 def set_deferrable(self, connection, value): 

776 connection.set_deferrable(value) 

777 

778 def get_driver_connection(self, connection): 

779 return connection._connection 

780 

781 

782dialect = PGDialect_psycopg 

783dialect_async = PGDialectAsync_psycopg