Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/pg8000.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

306 statements  

1# dialects/postgresql/pg8000.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors <see AUTHORS 

3# file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9r""" 

10.. dialect:: postgresql+pg8000 

11 :name: pg8000 

12 :dbapi: pg8000 

13 :connectstring: postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...] 

14 :url: https://pypi.org/project/pg8000/ 

15 

16.. versionchanged:: 1.4 The pg8000 dialect has been updated for version 

17 1.16.6 and higher, and is again part of SQLAlchemy's continuous integration 

18 with full feature support. 

19 

20.. _pg8000_unicode: 

21 

22Unicode 

23------- 

24 

25pg8000 will encode / decode string values between it and the server using the 

26PostgreSQL ``client_encoding`` parameter; by default this is the value in 

27the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``. 

28Typically, this can be changed to ``utf-8``, as a more useful default:: 

29 

30 # client_encoding = sql_ascii # actually, defaults to database encoding 

31 client_encoding = utf8 

32 

33The ``client_encoding`` can be overridden for a session by executing the SQL: 

34 

35.. sourcecode:: sql 

36 

37 SET CLIENT_ENCODING TO 'utf8'; 

38 

39SQLAlchemy will execute this SQL on all new connections based on the value 

40passed to :func:`_sa.create_engine` using the ``client_encoding`` parameter:: 

41 

42 engine = create_engine( 

43 "postgresql+pg8000://user:pass@host/dbname", client_encoding="utf8" 

44 ) 

45 

46.. _pg8000_ssl: 

47 

48SSL Connections 

49--------------- 

50 

51pg8000 accepts a Python ``SSLContext`` object which may be specified using the 

52:paramref:`_sa.create_engine.connect_args` dictionary:: 

53 

54 import ssl 

55 

56 ssl_context = ssl.create_default_context() 

57 engine = sa.create_engine( 

58 "postgresql+pg8000://scott:tiger@192.168.0.199/test", 

59 connect_args={"ssl_context": ssl_context}, 

60 ) 

61 

62If the server uses an automatically-generated certificate that is self-signed 

63or does not match the host name (as seen from the client), it may also be 

64necessary to disable hostname checking:: 

65 

66 import ssl 

67 

68 ssl_context = ssl.create_default_context() 

69 ssl_context.check_hostname = False 

70 ssl_context.verify_mode = ssl.CERT_NONE 

71 engine = sa.create_engine( 

72 "postgresql+pg8000://scott:tiger@192.168.0.199/test", 

73 connect_args={"ssl_context": ssl_context}, 

74 ) 

75 

76.. _pg8000_isolation_level: 

77 

78pg8000 Transaction Isolation Level 

79------------------------------------- 

80 

81The pg8000 dialect offers the same isolation level settings as that 

82of the :ref:`psycopg2 <psycopg2_isolation_level>` dialect: 

83 

84* ``READ COMMITTED`` 

85* ``READ UNCOMMITTED`` 

86* ``REPEATABLE READ`` 

87* ``SERIALIZABLE`` 

88* ``AUTOCOMMIT`` 

89 

90.. seealso:: 

91 

92 :ref:`postgresql_isolation_level` 

93 

94 :ref:`psycopg2_isolation_level` 

95 

96 

97""" # noqa 

98import decimal 

99import re 

100 

101from . import ranges 

102from .array import ARRAY as PGARRAY 

103from .base import _DECIMAL_TYPES 

104from .base import _FLOAT_TYPES 

105from .base import _INT_TYPES 

106from .base import ENUM 

107from .base import INTERVAL 

108from .base import PGCompiler 

109from .base import PGDialect 

110from .base import PGExecutionContext 

111from .base import PGIdentifierPreparer 

112from .json import JSON 

113from .json import JSONB 

114from .json import JSONPathType 

115from .pg_catalog import _SpaceVector 

116from .pg_catalog import OIDVECTOR 

117from .types import CITEXT 

118from ... import exc 

119from ... import util 

120from ...engine import processors 

121from ...sql import sqltypes 

122from ...sql.elements import quoted_name 

123 

124 

125class _PGString(sqltypes.String): 

126 render_bind_cast = True 

127 

128 

129class _PGNumeric(sqltypes.Numeric): 

130 render_bind_cast = True 

131 

132 def result_processor(self, dialect, coltype): 

133 if self.asdecimal: 

134 if coltype in _FLOAT_TYPES: 

135 return processors.to_decimal_processor_factory( 

136 decimal.Decimal, self._effective_decimal_return_scale 

137 ) 

138 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

139 # pg8000 returns Decimal natively for 1700 

140 return None 

141 else: 

142 raise exc.InvalidRequestError( 

143 "Unknown PG numeric type: %d" % coltype 

144 ) 

145 else: 

146 if coltype in _FLOAT_TYPES: 

147 # pg8000 returns float natively for 701 

148 return None 

149 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

150 return processors.to_float 

151 else: 

152 raise exc.InvalidRequestError( 

153 "Unknown PG numeric type: %d" % coltype 

154 ) 

155 

156 

157class _PGFloat(_PGNumeric, sqltypes.Float): 

158 __visit_name__ = "float" 

159 render_bind_cast = True 

160 

161 

162class _PGNumericNoBind(_PGNumeric): 

163 def bind_processor(self, dialect): 

164 return None 

165 

166 

167class _PGJSON(JSON): 

168 render_bind_cast = True 

169 

170 def result_processor(self, dialect, coltype): 

171 return None 

172 

173 

174class _PGJSONB(JSONB): 

175 render_bind_cast = True 

176 

177 def result_processor(self, dialect, coltype): 

178 return None 

179 

180 

181class _PGJSONIndexType(sqltypes.JSON.JSONIndexType): 

182 def get_dbapi_type(self, dbapi): 

183 raise NotImplementedError("should not be here") 

184 

185 

186class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType): 

187 __visit_name__ = "json_int_index" 

188 

189 render_bind_cast = True 

190 

191 

192class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType): 

193 __visit_name__ = "json_str_index" 

194 

195 render_bind_cast = True 

196 

197 

198class _PGJSONPathType(JSONPathType): 

199 pass 

200 

201 # DBAPI type 1009 

202 

203 

204class _PGEnum(ENUM): 

205 def get_dbapi_type(self, dbapi): 

206 return dbapi.UNKNOWN 

207 

208 

209class _PGInterval(INTERVAL): 

210 render_bind_cast = True 

211 

212 def get_dbapi_type(self, dbapi): 

213 return dbapi.INTERVAL 

214 

215 @classmethod 

216 def adapt_emulated_to_native(cls, interval, **kw): 

217 return _PGInterval(precision=interval.second_precision) 

218 

219 

220class _PGTimeStamp(sqltypes.DateTime): 

221 render_bind_cast = True 

222 

223 

224class _PGDate(sqltypes.Date): 

225 render_bind_cast = True 

226 

227 

228class _PGTime(sqltypes.Time): 

229 render_bind_cast = True 

230 

231 

232class _PGInteger(sqltypes.Integer): 

233 render_bind_cast = True 

234 

235 

236class _PGSmallInteger(sqltypes.SmallInteger): 

237 render_bind_cast = True 

238 

239 

240class _PGNullType(sqltypes.NullType): 

241 pass 

242 

243 

244class _PGBigInteger(sqltypes.BigInteger): 

245 render_bind_cast = True 

246 

247 

248class _PGBoolean(sqltypes.Boolean): 

249 render_bind_cast = True 

250 

251 

252class _PGARRAY(PGARRAY): 

253 render_bind_cast = True 

254 

255 

256class _PGOIDVECTOR(_SpaceVector, OIDVECTOR): 

257 pass 

258 

259 

260class _Pg8000Range(ranges.AbstractSingleRangeImpl): 

261 def bind_processor(self, dialect): 

262 pg8000_Range = dialect.dbapi.Range 

263 

264 def to_range(value): 

265 if isinstance(value, ranges.Range): 

266 value = pg8000_Range( 

267 value.lower, value.upper, value.bounds, value.empty 

268 ) 

269 return value 

270 

271 return to_range 

272 

273 def result_processor(self, dialect, coltype): 

274 def to_range(value): 

275 if value is not None: 

276 value = ranges.Range( 

277 value.lower, 

278 value.upper, 

279 bounds=value.bounds, 

280 empty=value.is_empty, 

281 ) 

282 return value 

283 

284 return to_range 

285 

286 

287class _Pg8000MultiRange(ranges.AbstractMultiRangeImpl): 

288 def bind_processor(self, dialect): 

289 pg8000_Range = dialect.dbapi.Range 

290 

291 def to_multirange(value): 

292 if isinstance(value, list): 

293 mr = [] 

294 for v in value: 

295 if isinstance(v, ranges.Range): 

296 mr.append( 

297 pg8000_Range(v.lower, v.upper, v.bounds, v.empty) 

298 ) 

299 else: 

300 mr.append(v) 

301 return mr 

302 else: 

303 return value 

304 

305 return to_multirange 

306 

307 def result_processor(self, dialect, coltype): 

308 def to_multirange(value): 

309 if value is None: 

310 return None 

311 else: 

312 return ranges.MultiRange( 

313 ranges.Range( 

314 v.lower, v.upper, bounds=v.bounds, empty=v.is_empty 

315 ) 

316 for v in value 

317 ) 

318 

319 return to_multirange 

320 

321 

322_server_side_id = util.counter() 

323 

324 

325class PGExecutionContext_pg8000(PGExecutionContext): 

326 def create_server_side_cursor(self): 

327 ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) 

328 return ServerSideCursor(self._dbapi_connection.cursor(), ident) 

329 

330 def pre_exec(self): 

331 if not self.compiled: 

332 return 

333 

334 

335class ServerSideCursor: 

336 server_side = True 

337 

338 def __init__(self, cursor, ident): 

339 self.ident = ident 

340 self.cursor = cursor 

341 

342 @property 

343 def connection(self): 

344 return self.cursor.connection 

345 

346 @property 

347 def rowcount(self): 

348 return self.cursor.rowcount 

349 

350 @property 

351 def description(self): 

352 return self.cursor.description 

353 

354 def execute(self, operation, args=(), stream=None): 

355 op = "DECLARE " + self.ident + " NO SCROLL CURSOR FOR " + operation 

356 self.cursor.execute(op, args, stream=stream) 

357 return self 

358 

359 def executemany(self, operation, param_sets): 

360 self.cursor.executemany(operation, param_sets) 

361 return self 

362 

363 def fetchone(self): 

364 self.cursor.execute("FETCH FORWARD 1 FROM " + self.ident) 

365 return self.cursor.fetchone() 

366 

367 def fetchmany(self, num=None): 

368 if num is None: 

369 return self.fetchall() 

370 else: 

371 self.cursor.execute( 

372 "FETCH FORWARD " + str(int(num)) + " FROM " + self.ident 

373 ) 

374 return self.cursor.fetchall() 

375 

376 def fetchall(self): 

377 self.cursor.execute("FETCH FORWARD ALL FROM " + self.ident) 

378 return self.cursor.fetchall() 

379 

380 def close(self): 

381 self.cursor.execute("CLOSE " + self.ident) 

382 self.cursor.close() 

383 

384 def setinputsizes(self, *sizes): 

385 self.cursor.setinputsizes(*sizes) 

386 

387 def setoutputsize(self, size, column=None): 

388 pass 

389 

390 

391class PGCompiler_pg8000(PGCompiler): 

392 def visit_mod_binary(self, binary, operator, **kw): 

393 return ( 

394 self.process(binary.left, **kw) 

395 + " %% " 

396 + self.process(binary.right, **kw) 

397 ) 

398 

399 

400class PGIdentifierPreparer_pg8000(PGIdentifierPreparer): 

401 def __init__(self, *args, **kwargs): 

402 PGIdentifierPreparer.__init__(self, *args, **kwargs) 

403 self._double_percents = False 

404 

405 

406class PGDialect_pg8000(PGDialect): 

407 driver = "pg8000" 

408 supports_statement_cache = True 

409 

410 supports_unicode_statements = True 

411 

412 supports_unicode_binds = True 

413 

414 default_paramstyle = "format" 

415 supports_sane_multi_rowcount = True 

416 execution_ctx_cls = PGExecutionContext_pg8000 

417 statement_compiler = PGCompiler_pg8000 

418 preparer = PGIdentifierPreparer_pg8000 

419 supports_server_side_cursors = True 

420 

421 render_bind_cast = True 

422 

423 # reversed as of pg8000 1.16.6. 1.16.5 and lower 

424 # are no longer compatible 

425 description_encoding = None 

426 # description_encoding = "use_encoding" 

427 

428 colspecs = util.update_copy( 

429 PGDialect.colspecs, 

430 { 

431 sqltypes.String: _PGString, 

432 sqltypes.Numeric: _PGNumericNoBind, 

433 sqltypes.Float: _PGFloat, 

434 sqltypes.JSON: _PGJSON, 

435 sqltypes.Boolean: _PGBoolean, 

436 sqltypes.NullType: _PGNullType, 

437 JSONB: _PGJSONB, 

438 CITEXT: CITEXT, 

439 sqltypes.JSON.JSONPathType: _PGJSONPathType, 

440 sqltypes.JSON.JSONIndexType: _PGJSONIndexType, 

441 sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType, 

442 sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType, 

443 sqltypes.Interval: _PGInterval, 

444 INTERVAL: _PGInterval, 

445 sqltypes.DateTime: _PGTimeStamp, 

446 sqltypes.DateTime: _PGTimeStamp, 

447 sqltypes.Date: _PGDate, 

448 sqltypes.Time: _PGTime, 

449 sqltypes.Integer: _PGInteger, 

450 sqltypes.SmallInteger: _PGSmallInteger, 

451 sqltypes.BigInteger: _PGBigInteger, 

452 sqltypes.Enum: _PGEnum, 

453 sqltypes.ARRAY: _PGARRAY, 

454 OIDVECTOR: _PGOIDVECTOR, 

455 ranges.INT4RANGE: _Pg8000Range, 

456 ranges.INT8RANGE: _Pg8000Range, 

457 ranges.NUMRANGE: _Pg8000Range, 

458 ranges.DATERANGE: _Pg8000Range, 

459 ranges.TSRANGE: _Pg8000Range, 

460 ranges.TSTZRANGE: _Pg8000Range, 

461 ranges.INT4MULTIRANGE: _Pg8000MultiRange, 

462 ranges.INT8MULTIRANGE: _Pg8000MultiRange, 

463 ranges.NUMMULTIRANGE: _Pg8000MultiRange, 

464 ranges.DATEMULTIRANGE: _Pg8000MultiRange, 

465 ranges.TSMULTIRANGE: _Pg8000MultiRange, 

466 ranges.TSTZMULTIRANGE: _Pg8000MultiRange, 

467 }, 

468 ) 

469 

470 def __init__(self, client_encoding=None, **kwargs): 

471 PGDialect.__init__(self, **kwargs) 

472 self.client_encoding = client_encoding 

473 

474 if self._dbapi_version < (1, 16, 6): 

475 raise NotImplementedError("pg8000 1.16.6 or greater is required") 

476 

477 if self._native_inet_types: 

478 raise NotImplementedError( 

479 "The pg8000 dialect does not fully implement " 

480 "ipaddress type handling; INET is supported by default, " 

481 "CIDR is not" 

482 ) 

483 

484 @util.memoized_property 

485 def _dbapi_version(self): 

486 if self.dbapi and hasattr(self.dbapi, "__version__"): 

487 return tuple( 

488 [ 

489 int(x) 

490 for x in re.findall( 

491 r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__ 

492 ) 

493 ] 

494 ) 

495 else: 

496 return (99, 99, 99) 

497 

498 @classmethod 

499 def import_dbapi(cls): 

500 return __import__("pg8000") 

501 

502 def create_connect_args(self, url): 

503 opts = url.translate_connect_args(username="user") 

504 if "port" in opts: 

505 opts["port"] = int(opts["port"]) 

506 opts.update(url.query) 

507 return ([], opts) 

508 

509 def is_disconnect(self, e, connection, cursor): 

510 if isinstance(e, self.dbapi.InterfaceError) and "network error" in str( 

511 e 

512 ): 

513 # new as of pg8000 1.19.0 for broken connections 

514 return True 

515 

516 # connection was closed normally 

517 return "connection is closed" in str(e) 

518 

519 def get_isolation_level_values(self, dbapi_connection): 

520 return ( 

521 "AUTOCOMMIT", 

522 "READ COMMITTED", 

523 "READ UNCOMMITTED", 

524 "REPEATABLE READ", 

525 "SERIALIZABLE", 

526 ) 

527 

528 def set_isolation_level(self, dbapi_connection, level): 

529 level = level.replace("_", " ") 

530 

531 if level == "AUTOCOMMIT": 

532 dbapi_connection.autocommit = True 

533 else: 

534 dbapi_connection.autocommit = False 

535 cursor = dbapi_connection.cursor() 

536 cursor.execute( 

537 "SET SESSION CHARACTERISTICS AS TRANSACTION " 

538 f"ISOLATION LEVEL {level}" 

539 ) 

540 cursor.execute("COMMIT") 

541 cursor.close() 

542 

543 def detect_autocommit_setting(self, dbapi_conn) -> bool: 

544 return bool(dbapi_conn.autocommit) 

545 

546 def set_readonly(self, connection, value): 

547 cursor = connection.cursor() 

548 try: 

549 cursor.execute( 

550 "SET SESSION CHARACTERISTICS AS TRANSACTION %s" 

551 % ("READ ONLY" if value else "READ WRITE") 

552 ) 

553 cursor.execute("COMMIT") 

554 finally: 

555 cursor.close() 

556 

557 def get_readonly(self, connection): 

558 cursor = connection.cursor() 

559 try: 

560 cursor.execute("show transaction_read_only") 

561 val = cursor.fetchone()[0] 

562 finally: 

563 cursor.close() 

564 

565 return val == "on" 

566 

567 def set_deferrable(self, connection, value): 

568 cursor = connection.cursor() 

569 try: 

570 cursor.execute( 

571 "SET SESSION CHARACTERISTICS AS TRANSACTION %s" 

572 % ("DEFERRABLE" if value else "NOT DEFERRABLE") 

573 ) 

574 cursor.execute("COMMIT") 

575 finally: 

576 cursor.close() 

577 

578 def get_deferrable(self, connection): 

579 cursor = connection.cursor() 

580 try: 

581 cursor.execute("show transaction_deferrable") 

582 val = cursor.fetchone()[0] 

583 finally: 

584 cursor.close() 

585 

586 return val == "on" 

587 

588 def _set_client_encoding(self, dbapi_connection, client_encoding): 

589 cursor = dbapi_connection.cursor() 

590 cursor.execute( 

591 f"""SET CLIENT_ENCODING TO '{ 

592 client_encoding.replace("'", "''") 

593 }'""" 

594 ) 

595 cursor.execute("COMMIT") 

596 cursor.close() 

597 

598 def do_begin_twophase(self, connection, xid): 

599 connection.connection.tpc_begin((0, xid, "")) 

600 

601 def do_prepare_twophase(self, connection, xid): 

602 connection.connection.tpc_prepare() 

603 

604 def do_rollback_twophase( 

605 self, connection, xid, is_prepared=True, recover=False 

606 ): 

607 connection.connection.tpc_rollback((0, xid, "")) 

608 

609 def do_commit_twophase( 

610 self, connection, xid, is_prepared=True, recover=False 

611 ): 

612 connection.connection.tpc_commit((0, xid, "")) 

613 

614 def do_recover_twophase(self, connection): 

615 return [row[1] for row in connection.connection.tpc_recover()] 

616 

617 def on_connect(self): 

618 fns = [] 

619 

620 def on_connect(conn): 

621 conn.py_types[quoted_name] = conn.py_types[str] 

622 

623 fns.append(on_connect) 

624 

625 if self.client_encoding is not None: 

626 

627 def on_connect(conn): 

628 self._set_client_encoding(conn, self.client_encoding) 

629 

630 fns.append(on_connect) 

631 

632 if self._native_inet_types is False: 

633 

634 def on_connect(conn): 

635 # inet 

636 conn.register_in_adapter(869, lambda s: s) 

637 

638 # cidr 

639 conn.register_in_adapter(650, lambda s: s) 

640 

641 fns.append(on_connect) 

642 

643 if self._json_deserializer: 

644 

645 def on_connect(conn): 

646 # json 

647 conn.register_in_adapter(114, self._json_deserializer) 

648 

649 # jsonb 

650 conn.register_in_adapter(3802, self._json_deserializer) 

651 

652 fns.append(on_connect) 

653 

654 if len(fns) > 0: 

655 

656 def on_connect(conn): 

657 for fn in fns: 

658 fn(conn) 

659 

660 return on_connect 

661 else: 

662 return None 

663 

664 @util.memoized_property 

665 def _dialect_specific_select_one(self): 

666 return ";" 

667 

668 

669dialect = PGDialect_pg8000