Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/pg8000.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

306 statements  

1# dialects/postgresql/pg8000.py 

2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors <see AUTHORS 

3# file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9r""" 

10.. dialect:: postgresql+pg8000 

11 :name: pg8000 

12 :dbapi: pg8000 

13 :connectstring: postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...] 

14 :url: https://pypi.org/project/pg8000/ 

15 

16.. versionchanged:: 1.4 The pg8000 dialect has been updated for version 

17 1.16.6 and higher, and is again part of SQLAlchemy's continuous integration 

18 with full feature support. 

19 

20.. _pg8000_unicode: 

21 

22Unicode 

23------- 

24 

25pg8000 will encode / decode string values between it and the server using the 

26PostgreSQL ``client_encoding`` parameter; by default this is the value in 

27the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``. 

28Typically, this can be changed to ``utf-8``, as a more useful default:: 

29 

30 # client_encoding = sql_ascii # actually, defaults to database encoding 

31 client_encoding = utf8 

32 

33The ``client_encoding`` can be overridden for a session by executing the SQL: 

34 

35.. sourcecode:: sql 

36 

37 SET CLIENT_ENCODING TO 'utf8'; 

38 

39SQLAlchemy will execute this SQL on all new connections based on the value 

40passed to :func:`_sa.create_engine` using the ``client_encoding`` parameter:: 

41 

42 engine = create_engine( 

43 "postgresql+pg8000://user:pass@host/dbname", client_encoding="utf8" 

44 ) 

45 

46.. _pg8000_ssl: 

47 

48SSL Connections 

49--------------- 

50 

51pg8000 accepts a Python ``SSLContext`` object which may be specified using the 

52:paramref:`_sa.create_engine.connect_args` dictionary:: 

53 

54 import ssl 

55 

56 ssl_context = ssl.create_default_context() 

57 engine = sa.create_engine( 

58 "postgresql+pg8000://scott:tiger@192.168.0.199/test", 

59 connect_args={"ssl_context": ssl_context}, 

60 ) 

61 

62If the server uses an automatically-generated certificate that is self-signed 

63or does not match the host name (as seen from the client), it may also be 

64necessary to disable hostname checking:: 

65 

66 import ssl 

67 

68 ssl_context = ssl.create_default_context() 

69 ssl_context.check_hostname = False 

70 ssl_context.verify_mode = ssl.CERT_NONE 

71 engine = sa.create_engine( 

72 "postgresql+pg8000://scott:tiger@192.168.0.199/test", 

73 connect_args={"ssl_context": ssl_context}, 

74 ) 

75 

76.. _pg8000_isolation_level: 

77 

78pg8000 Transaction Isolation Level 

79------------------------------------- 

80 

81The pg8000 dialect offers the same isolation level settings as that 

82of the :ref:`psycopg2 <psycopg2_isolation_level>` dialect: 

83 

84* ``READ COMMITTED`` 

85* ``READ UNCOMMITTED`` 

86* ``REPEATABLE READ`` 

87* ``SERIALIZABLE`` 

88* ``AUTOCOMMIT`` 

89 

90.. seealso:: 

91 

92 :ref:`postgresql_isolation_level` 

93 

94 :ref:`psycopg2_isolation_level` 

95 

96 

97""" # noqa 

98 

99import decimal 

100import re 

101 

102from . import ranges 

103from .array import ARRAY as PGARRAY 

104from .base import _DECIMAL_TYPES 

105from .base import _FLOAT_TYPES 

106from .base import _INT_TYPES 

107from .base import ENUM 

108from .base import INTERVAL 

109from .base import PGCompiler 

110from .base import PGDialect 

111from .base import PGExecutionContext 

112from .base import PGIdentifierPreparer 

113from .json import JSON 

114from .json import JSONB 

115from .json import JSONPathType 

116from .pg_catalog import _SpaceVector 

117from .pg_catalog import OIDVECTOR 

118from .types import CITEXT 

119from ... import exc 

120from ... import util 

121from ...engine import processors 

122from ...sql import sqltypes 

123from ...sql.elements import quoted_name 

124 

125 

126class _PGString(sqltypes.String): 

127 render_bind_cast = True 

128 

129 

130class _PGNumeric(sqltypes.Numeric): 

131 render_bind_cast = True 

132 

133 def result_processor(self, dialect, coltype): 

134 if self.asdecimal: 

135 if coltype in _FLOAT_TYPES: 

136 return processors.to_decimal_processor_factory( 

137 decimal.Decimal, self._effective_decimal_return_scale 

138 ) 

139 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

140 # pg8000 returns Decimal natively for 1700 

141 return None 

142 else: 

143 raise exc.InvalidRequestError( 

144 "Unknown PG numeric type: %d" % coltype 

145 ) 

146 else: 

147 if coltype in _FLOAT_TYPES: 

148 # pg8000 returns float natively for 701 

149 return None 

150 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

151 return processors.to_float 

152 else: 

153 raise exc.InvalidRequestError( 

154 "Unknown PG numeric type: %d" % coltype 

155 ) 

156 

157 

158class _PGFloat(_PGNumeric, sqltypes.Float): 

159 __visit_name__ = "float" 

160 render_bind_cast = True 

161 

162 

163class _PGNumericNoBind(_PGNumeric): 

164 def bind_processor(self, dialect): 

165 return None 

166 

167 

168class _PGJSON(JSON): 

169 render_bind_cast = True 

170 

171 def result_processor(self, dialect, coltype): 

172 return None 

173 

174 

175class _PGJSONB(JSONB): 

176 render_bind_cast = True 

177 

178 def result_processor(self, dialect, coltype): 

179 return None 

180 

181 

182class _PGJSONIndexType(sqltypes.JSON.JSONIndexType): 

183 def get_dbapi_type(self, dbapi): 

184 raise NotImplementedError("should not be here") 

185 

186 

187class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType): 

188 __visit_name__ = "json_int_index" 

189 

190 render_bind_cast = True 

191 

192 

193class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType): 

194 __visit_name__ = "json_str_index" 

195 

196 render_bind_cast = True 

197 

198 

199class _PGJSONPathType(JSONPathType): 

200 pass 

201 

202 # DBAPI type 1009 

203 

204 

205class _PGEnum(ENUM): 

206 def get_dbapi_type(self, dbapi): 

207 return dbapi.UNKNOWN 

208 

209 

210class _PGInterval(INTERVAL): 

211 render_bind_cast = True 

212 

213 def get_dbapi_type(self, dbapi): 

214 return dbapi.INTERVAL 

215 

216 @classmethod 

217 def adapt_emulated_to_native(cls, interval, **kw): 

218 return _PGInterval(precision=interval.second_precision) 

219 

220 

221class _PGTimeStamp(sqltypes.DateTime): 

222 render_bind_cast = True 

223 

224 

225class _PGDate(sqltypes.Date): 

226 render_bind_cast = True 

227 

228 

229class _PGTime(sqltypes.Time): 

230 render_bind_cast = True 

231 

232 

233class _PGInteger(sqltypes.Integer): 

234 render_bind_cast = True 

235 

236 

237class _PGSmallInteger(sqltypes.SmallInteger): 

238 render_bind_cast = True 

239 

240 

241class _PGNullType(sqltypes.NullType): 

242 pass 

243 

244 

245class _PGBigInteger(sqltypes.BigInteger): 

246 render_bind_cast = True 

247 

248 

249class _PGBoolean(sqltypes.Boolean): 

250 render_bind_cast = True 

251 

252 

253class _PGARRAY(PGARRAY): 

254 render_bind_cast = True 

255 

256 

257class _PGOIDVECTOR(_SpaceVector, OIDVECTOR): 

258 pass 

259 

260 

261class _Pg8000Range(ranges.AbstractSingleRangeImpl): 

262 def bind_processor(self, dialect): 

263 pg8000_Range = dialect.dbapi.Range 

264 

265 def to_range(value): 

266 if isinstance(value, ranges.Range): 

267 value = pg8000_Range( 

268 value.lower, value.upper, value.bounds, value.empty 

269 ) 

270 return value 

271 

272 return to_range 

273 

274 def result_processor(self, dialect, coltype): 

275 def to_range(value): 

276 if value is not None: 

277 value = ranges.Range( 

278 value.lower, 

279 value.upper, 

280 bounds=value.bounds, 

281 empty=value.is_empty, 

282 ) 

283 return value 

284 

285 return to_range 

286 

287 

288class _Pg8000MultiRange(ranges.AbstractMultiRangeImpl): 

289 def bind_processor(self, dialect): 

290 pg8000_Range = dialect.dbapi.Range 

291 

292 def to_multirange(value): 

293 if isinstance(value, list): 

294 mr = [] 

295 for v in value: 

296 if isinstance(v, ranges.Range): 

297 mr.append( 

298 pg8000_Range(v.lower, v.upper, v.bounds, v.empty) 

299 ) 

300 else: 

301 mr.append(v) 

302 return mr 

303 else: 

304 return value 

305 

306 return to_multirange 

307 

308 def result_processor(self, dialect, coltype): 

309 def to_multirange(value): 

310 if value is None: 

311 return None 

312 else: 

313 return ranges.MultiRange( 

314 ranges.Range( 

315 v.lower, v.upper, bounds=v.bounds, empty=v.is_empty 

316 ) 

317 for v in value 

318 ) 

319 

320 return to_multirange 

321 

322 

323_server_side_id = util.counter() 

324 

325 

326class PGExecutionContext_pg8000(PGExecutionContext): 

327 def create_server_side_cursor(self): 

328 ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) 

329 return ServerSideCursor(self._dbapi_connection.cursor(), ident) 

330 

331 def pre_exec(self): 

332 if not self.compiled: 

333 return 

334 

335 

336class ServerSideCursor: 

337 server_side = True 

338 

339 def __init__(self, cursor, ident): 

340 self.ident = ident 

341 self.cursor = cursor 

342 

343 @property 

344 def connection(self): 

345 return self.cursor.connection 

346 

347 @property 

348 def rowcount(self): 

349 return self.cursor.rowcount 

350 

351 @property 

352 def description(self): 

353 return self.cursor.description 

354 

355 def execute(self, operation, args=(), stream=None): 

356 op = "DECLARE " + self.ident + " NO SCROLL CURSOR FOR " + operation 

357 self.cursor.execute(op, args, stream=stream) 

358 return self 

359 

360 def executemany(self, operation, param_sets): 

361 self.cursor.executemany(operation, param_sets) 

362 return self 

363 

364 def fetchone(self): 

365 self.cursor.execute("FETCH FORWARD 1 FROM " + self.ident) 

366 return self.cursor.fetchone() 

367 

368 def fetchmany(self, num=None): 

369 if num is None: 

370 return self.fetchall() 

371 else: 

372 self.cursor.execute( 

373 "FETCH FORWARD " + str(int(num)) + " FROM " + self.ident 

374 ) 

375 return self.cursor.fetchall() 

376 

377 def fetchall(self): 

378 self.cursor.execute("FETCH FORWARD ALL FROM " + self.ident) 

379 return self.cursor.fetchall() 

380 

381 def close(self): 

382 self.cursor.execute("CLOSE " + self.ident) 

383 self.cursor.close() 

384 

385 def setinputsizes(self, *sizes): 

386 self.cursor.setinputsizes(*sizes) 

387 

388 def setoutputsize(self, size, column=None): 

389 pass 

390 

391 

392class PGCompiler_pg8000(PGCompiler): 

393 def visit_mod_binary(self, binary, operator, **kw): 

394 return ( 

395 self.process(binary.left, **kw) 

396 + " %% " 

397 + self.process(binary.right, **kw) 

398 ) 

399 

400 

401class PGIdentifierPreparer_pg8000(PGIdentifierPreparer): 

402 def __init__(self, *args, **kwargs): 

403 PGIdentifierPreparer.__init__(self, *args, **kwargs) 

404 self._double_percents = False 

405 

406 

407class PGDialect_pg8000(PGDialect): 

408 driver = "pg8000" 

409 supports_statement_cache = True 

410 

411 supports_unicode_statements = True 

412 

413 supports_unicode_binds = True 

414 

415 default_paramstyle = "format" 

416 supports_sane_multi_rowcount = True 

417 execution_ctx_cls = PGExecutionContext_pg8000 

418 statement_compiler = PGCompiler_pg8000 

419 preparer = PGIdentifierPreparer_pg8000 

420 supports_server_side_cursors = True 

421 

422 render_bind_cast = True 

423 

424 # reversed as of pg8000 1.16.6. 1.16.5 and lower 

425 # are no longer compatible 

426 description_encoding = None 

427 # description_encoding = "use_encoding" 

428 

429 colspecs = util.update_copy( 

430 PGDialect.colspecs, 

431 { 

432 sqltypes.String: _PGString, 

433 sqltypes.Numeric: _PGNumericNoBind, 

434 sqltypes.Float: _PGFloat, 

435 sqltypes.JSON: _PGJSON, 

436 sqltypes.Boolean: _PGBoolean, 

437 sqltypes.NullType: _PGNullType, 

438 JSONB: _PGJSONB, 

439 CITEXT: CITEXT, 

440 sqltypes.JSON.JSONPathType: _PGJSONPathType, 

441 sqltypes.JSON.JSONIndexType: _PGJSONIndexType, 

442 sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType, 

443 sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType, 

444 sqltypes.Interval: _PGInterval, 

445 INTERVAL: _PGInterval, 

446 sqltypes.DateTime: _PGTimeStamp, 

447 sqltypes.DateTime: _PGTimeStamp, 

448 sqltypes.Date: _PGDate, 

449 sqltypes.Time: _PGTime, 

450 sqltypes.Integer: _PGInteger, 

451 sqltypes.SmallInteger: _PGSmallInteger, 

452 sqltypes.BigInteger: _PGBigInteger, 

453 sqltypes.Enum: _PGEnum, 

454 sqltypes.ARRAY: _PGARRAY, 

455 OIDVECTOR: _PGOIDVECTOR, 

456 ranges.INT4RANGE: _Pg8000Range, 

457 ranges.INT8RANGE: _Pg8000Range, 

458 ranges.NUMRANGE: _Pg8000Range, 

459 ranges.DATERANGE: _Pg8000Range, 

460 ranges.TSRANGE: _Pg8000Range, 

461 ranges.TSTZRANGE: _Pg8000Range, 

462 ranges.INT4MULTIRANGE: _Pg8000MultiRange, 

463 ranges.INT8MULTIRANGE: _Pg8000MultiRange, 

464 ranges.NUMMULTIRANGE: _Pg8000MultiRange, 

465 ranges.DATEMULTIRANGE: _Pg8000MultiRange, 

466 ranges.TSMULTIRANGE: _Pg8000MultiRange, 

467 ranges.TSTZMULTIRANGE: _Pg8000MultiRange, 

468 }, 

469 ) 

470 

471 def __init__(self, client_encoding=None, **kwargs): 

472 PGDialect.__init__(self, **kwargs) 

473 self.client_encoding = client_encoding 

474 

475 if self._dbapi_version < (1, 16, 6): 

476 raise NotImplementedError("pg8000 1.16.6 or greater is required") 

477 

478 if self._native_inet_types: 

479 raise NotImplementedError( 

480 "The pg8000 dialect does not fully implement " 

481 "ipaddress type handling; INET is supported by default, " 

482 "CIDR is not" 

483 ) 

484 

485 @util.memoized_property 

486 def _dbapi_version(self): 

487 if self.dbapi and hasattr(self.dbapi, "__version__"): 

488 return tuple( 

489 [ 

490 int(x) 

491 for x in re.findall( 

492 r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__ 

493 ) 

494 ] 

495 ) 

496 else: 

497 return (99, 99, 99) 

498 

499 @classmethod 

500 def import_dbapi(cls): 

501 return __import__("pg8000") 

502 

503 def create_connect_args(self, url): 

504 opts = url.translate_connect_args(username="user") 

505 if "port" in opts: 

506 opts["port"] = int(opts["port"]) 

507 opts.update(url.query) 

508 return ([], opts) 

509 

510 def is_disconnect(self, e, connection, cursor): 

511 if isinstance(e, self.dbapi.InterfaceError) and "network error" in str( 

512 e 

513 ): 

514 # new as of pg8000 1.19.0 for broken connections 

515 return True 

516 

517 # connection was closed normally 

518 return "connection is closed" in str(e) 

519 

520 def get_isolation_level_values(self, dbapi_connection): 

521 return ( 

522 "AUTOCOMMIT", 

523 "READ COMMITTED", 

524 "READ UNCOMMITTED", 

525 "REPEATABLE READ", 

526 "SERIALIZABLE", 

527 ) 

528 

529 def set_isolation_level(self, dbapi_connection, level): 

530 level = level.replace("_", " ") 

531 

532 if level == "AUTOCOMMIT": 

533 dbapi_connection.autocommit = True 

534 else: 

535 dbapi_connection.autocommit = False 

536 cursor = dbapi_connection.cursor() 

537 cursor.execute( 

538 "SET SESSION CHARACTERISTICS AS TRANSACTION " 

539 f"ISOLATION LEVEL {level}" 

540 ) 

541 cursor.execute("COMMIT") 

542 cursor.close() 

543 

544 def detect_autocommit_setting(self, dbapi_conn) -> bool: 

545 return bool(dbapi_conn.autocommit) 

546 

547 def set_readonly(self, connection, value): 

548 cursor = connection.cursor() 

549 try: 

550 cursor.execute( 

551 "SET SESSION CHARACTERISTICS AS TRANSACTION %s" 

552 % ("READ ONLY" if value else "READ WRITE") 

553 ) 

554 cursor.execute("COMMIT") 

555 finally: 

556 cursor.close() 

557 

558 def get_readonly(self, connection): 

559 cursor = connection.cursor() 

560 try: 

561 cursor.execute("show transaction_read_only") 

562 val = cursor.fetchone()[0] 

563 finally: 

564 cursor.close() 

565 

566 return val == "on" 

567 

568 def set_deferrable(self, connection, value): 

569 cursor = connection.cursor() 

570 try: 

571 cursor.execute( 

572 "SET SESSION CHARACTERISTICS AS TRANSACTION %s" 

573 % ("DEFERRABLE" if value else "NOT DEFERRABLE") 

574 ) 

575 cursor.execute("COMMIT") 

576 finally: 

577 cursor.close() 

578 

579 def get_deferrable(self, connection): 

580 cursor = connection.cursor() 

581 try: 

582 cursor.execute("show transaction_deferrable") 

583 val = cursor.fetchone()[0] 

584 finally: 

585 cursor.close() 

586 

587 return val == "on" 

588 

589 def _set_client_encoding(self, dbapi_connection, client_encoding): 

590 cursor = dbapi_connection.cursor() 

591 cursor.execute(f""" 

592 SET CLIENT_ENCODING TO '{client_encoding.replace("'", "''")}'""") 

593 cursor.execute("COMMIT") 

594 cursor.close() 

595 

596 def do_begin_twophase(self, connection, xid): 

597 connection.connection.tpc_begin((0, xid, "")) 

598 

599 def do_prepare_twophase(self, connection, xid): 

600 connection.connection.tpc_prepare() 

601 

602 def do_rollback_twophase( 

603 self, connection, xid, is_prepared=True, recover=False 

604 ): 

605 connection.connection.tpc_rollback((0, xid, "")) 

606 

607 def do_commit_twophase( 

608 self, connection, xid, is_prepared=True, recover=False 

609 ): 

610 connection.connection.tpc_commit((0, xid, "")) 

611 

612 def do_recover_twophase(self, connection): 

613 return [row[1] for row in connection.connection.tpc_recover()] 

614 

615 def on_connect(self): 

616 fns = [] 

617 

618 def on_connect(conn): 

619 conn.py_types[quoted_name] = conn.py_types[str] 

620 

621 fns.append(on_connect) 

622 

623 if self.client_encoding is not None: 

624 

625 def on_connect(conn): 

626 self._set_client_encoding(conn, self.client_encoding) 

627 

628 fns.append(on_connect) 

629 

630 if self._native_inet_types is False: 

631 

632 def on_connect(conn): 

633 # inet 

634 conn.register_in_adapter(869, lambda s: s) 

635 

636 # cidr 

637 conn.register_in_adapter(650, lambda s: s) 

638 

639 fns.append(on_connect) 

640 

641 if self._json_deserializer: 

642 

643 def on_connect(conn): 

644 # json 

645 conn.register_in_adapter(114, self._json_deserializer) 

646 

647 # jsonb 

648 conn.register_in_adapter(3802, self._json_deserializer) 

649 

650 fns.append(on_connect) 

651 

652 if len(fns) > 0: 

653 

654 def on_connect(conn): 

655 for fn in fns: 

656 fn(conn) 

657 

658 return on_connect 

659 else: 

660 return None 

661 

662 @util.memoized_property 

663 def _dialect_specific_select_one(self): 

664 return ";" 

665 

666 

667dialect = PGDialect_pg8000