Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/pg8000.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

304 statements  

1# dialects/postgresql/pg8000.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors <see AUTHORS 

3# file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8 

9r""" 

10.. dialect:: postgresql+pg8000 

11 :name: pg8000 

12 :dbapi: pg8000 

13 :connectstring: postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...] 

14 :url: https://pypi.org/project/pg8000/ 

15 

16.. versionchanged:: 1.4 The pg8000 dialect has been updated for version 

17 1.16.6 and higher, and is again part of SQLAlchemy's continuous integration 

18 with full feature support. 

19 

20.. _pg8000_unicode: 

21 

22Unicode 

23------- 

24 

25pg8000 will encode / decode string values between it and the server using the 

26PostgreSQL ``client_encoding`` parameter; by default this is the value in 

27the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``. 

28Typically, this can be changed to ``utf-8``, as a more useful default:: 

29 

30 # client_encoding = sql_ascii # actually, defaults to database encoding 

31 client_encoding = utf8 

32 

33The ``client_encoding`` can be overridden for a session by executing the SQL: 

34 

35.. sourcecode:: sql 

36 

37 SET CLIENT_ENCODING TO 'utf8'; 

38 

39SQLAlchemy will execute this SQL on all new connections based on the value 

40passed to :func:`_sa.create_engine` using the ``client_encoding`` parameter:: 

41 

42 engine = create_engine( 

43 "postgresql+pg8000://user:pass@host/dbname", client_encoding="utf8" 

44 ) 

45 

46.. _pg8000_ssl: 

47 

48SSL Connections 

49--------------- 

50 

51pg8000 accepts a Python ``SSLContext`` object which may be specified using the 

52:paramref:`_sa.create_engine.connect_args` dictionary:: 

53 

54 import ssl 

55 

56 ssl_context = ssl.create_default_context() 

57 engine = sa.create_engine( 

58 "postgresql+pg8000://scott:tiger@192.168.0.199/test", 

59 connect_args={"ssl_context": ssl_context}, 

60 ) 

61 

62If the server uses an automatically-generated certificate that is self-signed 

63or does not match the host name (as seen from the client), it may also be 

64necessary to disable hostname checking:: 

65 

66 import ssl 

67 

68 ssl_context = ssl.create_default_context() 

69 ssl_context.check_hostname = False 

70 ssl_context.verify_mode = ssl.CERT_NONE 

71 engine = sa.create_engine( 

72 "postgresql+pg8000://scott:tiger@192.168.0.199/test", 

73 connect_args={"ssl_context": ssl_context}, 

74 ) 

75 

76.. _pg8000_isolation_level: 

77 

78pg8000 Transaction Isolation Level 

79------------------------------------- 

80 

81The pg8000 dialect offers the same isolation level settings as that 

82of the :ref:`psycopg2 <psycopg2_isolation_level>` dialect: 

83 

84* ``READ COMMITTED`` 

85* ``READ UNCOMMITTED`` 

86* ``REPEATABLE READ`` 

87* ``SERIALIZABLE`` 

88* ``AUTOCOMMIT`` 

89 

90.. seealso:: 

91 

92 :ref:`postgresql_isolation_level` 

93 

94 :ref:`psycopg2_isolation_level` 

95 

96 

97""" # noqa 

98import decimal 

99import re 

100 

101from . import ranges 

102from .array import ARRAY as PGARRAY 

103from .base import _DECIMAL_TYPES 

104from .base import _FLOAT_TYPES 

105from .base import _INT_TYPES 

106from .base import ENUM 

107from .base import INTERVAL 

108from .base import PGCompiler 

109from .base import PGDialect 

110from .base import PGExecutionContext 

111from .base import PGIdentifierPreparer 

112from .json import JSON 

113from .json import JSONB 

114from .json import JSONPathType 

115from .pg_catalog import _SpaceVector 

116from .pg_catalog import OIDVECTOR 

117from .types import CITEXT 

118from ... import exc 

119from ... import util 

120from ...engine import processors 

121from ...sql import sqltypes 

122from ...sql.elements import quoted_name 

123 

124 

125class _PGString(sqltypes.String): 

126 render_bind_cast = True 

127 

128 

129class _PGNumeric(sqltypes.Numeric): 

130 render_bind_cast = True 

131 

132 def result_processor(self, dialect, coltype): 

133 if self.asdecimal: 

134 if coltype in _FLOAT_TYPES: 

135 return processors.to_decimal_processor_factory( 

136 decimal.Decimal, self._effective_decimal_return_scale 

137 ) 

138 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

139 # pg8000 returns Decimal natively for 1700 

140 return None 

141 else: 

142 raise exc.InvalidRequestError( 

143 "Unknown PG numeric type: %d" % coltype 

144 ) 

145 else: 

146 if coltype in _FLOAT_TYPES: 

147 # pg8000 returns float natively for 701 

148 return None 

149 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

150 return processors.to_float 

151 else: 

152 raise exc.InvalidRequestError( 

153 "Unknown PG numeric type: %d" % coltype 

154 ) 

155 

156 

157class _PGFloat(_PGNumeric, sqltypes.Float): 

158 __visit_name__ = "float" 

159 render_bind_cast = True 

160 

161 

162class _PGNumericNoBind(_PGNumeric): 

163 def bind_processor(self, dialect): 

164 return None 

165 

166 

167class _PGJSON(JSON): 

168 render_bind_cast = True 

169 

170 def result_processor(self, dialect, coltype): 

171 return None 

172 

173 

174class _PGJSONB(JSONB): 

175 render_bind_cast = True 

176 

177 def result_processor(self, dialect, coltype): 

178 return None 

179 

180 

181class _PGJSONIndexType(sqltypes.JSON.JSONIndexType): 

182 def get_dbapi_type(self, dbapi): 

183 raise NotImplementedError("should not be here") 

184 

185 

186class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType): 

187 __visit_name__ = "json_int_index" 

188 

189 render_bind_cast = True 

190 

191 

192class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType): 

193 __visit_name__ = "json_str_index" 

194 

195 render_bind_cast = True 

196 

197 

198class _PGJSONPathType(JSONPathType): 

199 pass 

200 

201 # DBAPI type 1009 

202 

203 

204class _PGEnum(ENUM): 

205 def get_dbapi_type(self, dbapi): 

206 return dbapi.UNKNOWN 

207 

208 

209class _PGInterval(INTERVAL): 

210 render_bind_cast = True 

211 

212 def get_dbapi_type(self, dbapi): 

213 return dbapi.INTERVAL 

214 

215 @classmethod 

216 def adapt_emulated_to_native(cls, interval, **kw): 

217 return _PGInterval(precision=interval.second_precision) 

218 

219 

220class _PGTimeStamp(sqltypes.DateTime): 

221 render_bind_cast = True 

222 

223 

224class _PGDate(sqltypes.Date): 

225 render_bind_cast = True 

226 

227 

228class _PGTime(sqltypes.Time): 

229 render_bind_cast = True 

230 

231 

232class _PGInteger(sqltypes.Integer): 

233 render_bind_cast = True 

234 

235 

236class _PGSmallInteger(sqltypes.SmallInteger): 

237 render_bind_cast = True 

238 

239 

240class _PGNullType(sqltypes.NullType): 

241 pass 

242 

243 

244class _PGBigInteger(sqltypes.BigInteger): 

245 render_bind_cast = True 

246 

247 

248class _PGBoolean(sqltypes.Boolean): 

249 render_bind_cast = True 

250 

251 

252class _PGARRAY(PGARRAY): 

253 render_bind_cast = True 

254 

255 

256class _PGOIDVECTOR(_SpaceVector, OIDVECTOR): 

257 pass 

258 

259 

260class _Pg8000Range(ranges.AbstractSingleRangeImpl): 

261 def bind_processor(self, dialect): 

262 pg8000_Range = dialect.dbapi.Range 

263 

264 def to_range(value): 

265 if isinstance(value, ranges.Range): 

266 value = pg8000_Range( 

267 value.lower, value.upper, value.bounds, value.empty 

268 ) 

269 return value 

270 

271 return to_range 

272 

273 def result_processor(self, dialect, coltype): 

274 def to_range(value): 

275 if value is not None: 

276 value = ranges.Range( 

277 value.lower, 

278 value.upper, 

279 bounds=value.bounds, 

280 empty=value.is_empty, 

281 ) 

282 return value 

283 

284 return to_range 

285 

286 

287class _Pg8000MultiRange(ranges.AbstractMultiRangeImpl): 

288 def bind_processor(self, dialect): 

289 pg8000_Range = dialect.dbapi.Range 

290 

291 def to_multirange(value): 

292 if isinstance(value, list): 

293 mr = [] 

294 for v in value: 

295 if isinstance(v, ranges.Range): 

296 mr.append( 

297 pg8000_Range(v.lower, v.upper, v.bounds, v.empty) 

298 ) 

299 else: 

300 mr.append(v) 

301 return mr 

302 else: 

303 return value 

304 

305 return to_multirange 

306 

307 def result_processor(self, dialect, coltype): 

308 def to_multirange(value): 

309 if value is None: 

310 return None 

311 else: 

312 return ranges.MultiRange( 

313 ranges.Range( 

314 v.lower, v.upper, bounds=v.bounds, empty=v.is_empty 

315 ) 

316 for v in value 

317 ) 

318 

319 return to_multirange 

320 

321 

322_server_side_id = util.counter() 

323 

324 

325class PGExecutionContext_pg8000(PGExecutionContext): 

326 def create_server_side_cursor(self): 

327 ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) 

328 return ServerSideCursor(self._dbapi_connection.cursor(), ident) 

329 

330 def pre_exec(self): 

331 if not self.compiled: 

332 return 

333 

334 

335class ServerSideCursor: 

336 server_side = True 

337 

338 def __init__(self, cursor, ident): 

339 self.ident = ident 

340 self.cursor = cursor 

341 

342 @property 

343 def connection(self): 

344 return self.cursor.connection 

345 

346 @property 

347 def rowcount(self): 

348 return self.cursor.rowcount 

349 

350 @property 

351 def description(self): 

352 return self.cursor.description 

353 

354 def execute(self, operation, args=(), stream=None): 

355 op = "DECLARE " + self.ident + " NO SCROLL CURSOR FOR " + operation 

356 self.cursor.execute(op, args, stream=stream) 

357 return self 

358 

359 def executemany(self, operation, param_sets): 

360 self.cursor.executemany(operation, param_sets) 

361 return self 

362 

363 def fetchone(self): 

364 self.cursor.execute("FETCH FORWARD 1 FROM " + self.ident) 

365 return self.cursor.fetchone() 

366 

367 def fetchmany(self, num=None): 

368 if num is None: 

369 return self.fetchall() 

370 else: 

371 self.cursor.execute( 

372 "FETCH FORWARD " + str(int(num)) + " FROM " + self.ident 

373 ) 

374 return self.cursor.fetchall() 

375 

376 def fetchall(self): 

377 self.cursor.execute("FETCH FORWARD ALL FROM " + self.ident) 

378 return self.cursor.fetchall() 

379 

380 def close(self): 

381 self.cursor.execute("CLOSE " + self.ident) 

382 self.cursor.close() 

383 

384 def setinputsizes(self, *sizes): 

385 self.cursor.setinputsizes(*sizes) 

386 

387 def setoutputsize(self, size, column=None): 

388 pass 

389 

390 

391class PGCompiler_pg8000(PGCompiler): 

392 def visit_mod_binary(self, binary, operator, **kw): 

393 return ( 

394 self.process(binary.left, **kw) 

395 + " %% " 

396 + self.process(binary.right, **kw) 

397 ) 

398 

399 

400class PGIdentifierPreparer_pg8000(PGIdentifierPreparer): 

401 def __init__(self, *args, **kwargs): 

402 PGIdentifierPreparer.__init__(self, *args, **kwargs) 

403 self._double_percents = False 

404 

405 

406class PGDialect_pg8000(PGDialect): 

407 driver = "pg8000" 

408 supports_statement_cache = True 

409 

410 supports_unicode_statements = True 

411 

412 supports_unicode_binds = True 

413 

414 default_paramstyle = "format" 

415 supports_sane_multi_rowcount = True 

416 execution_ctx_cls = PGExecutionContext_pg8000 

417 statement_compiler = PGCompiler_pg8000 

418 preparer = PGIdentifierPreparer_pg8000 

419 supports_server_side_cursors = True 

420 

421 render_bind_cast = True 

422 

423 # reversed as of pg8000 1.16.6. 1.16.5 and lower 

424 # are no longer compatible 

425 description_encoding = None 

426 # description_encoding = "use_encoding" 

427 

428 colspecs = util.update_copy( 

429 PGDialect.colspecs, 

430 { 

431 sqltypes.String: _PGString, 

432 sqltypes.Numeric: _PGNumericNoBind, 

433 sqltypes.Float: _PGFloat, 

434 sqltypes.JSON: _PGJSON, 

435 sqltypes.Boolean: _PGBoolean, 

436 sqltypes.NullType: _PGNullType, 

437 JSONB: _PGJSONB, 

438 CITEXT: CITEXT, 

439 sqltypes.JSON.JSONPathType: _PGJSONPathType, 

440 sqltypes.JSON.JSONIndexType: _PGJSONIndexType, 

441 sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType, 

442 sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType, 

443 sqltypes.Interval: _PGInterval, 

444 INTERVAL: _PGInterval, 

445 sqltypes.DateTime: _PGTimeStamp, 

446 sqltypes.DateTime: _PGTimeStamp, 

447 sqltypes.Date: _PGDate, 

448 sqltypes.Time: _PGTime, 

449 sqltypes.Integer: _PGInteger, 

450 sqltypes.SmallInteger: _PGSmallInteger, 

451 sqltypes.BigInteger: _PGBigInteger, 

452 sqltypes.Enum: _PGEnum, 

453 sqltypes.ARRAY: _PGARRAY, 

454 OIDVECTOR: _PGOIDVECTOR, 

455 ranges.INT4RANGE: _Pg8000Range, 

456 ranges.INT8RANGE: _Pg8000Range, 

457 ranges.NUMRANGE: _Pg8000Range, 

458 ranges.DATERANGE: _Pg8000Range, 

459 ranges.TSRANGE: _Pg8000Range, 

460 ranges.TSTZRANGE: _Pg8000Range, 

461 ranges.INT4MULTIRANGE: _Pg8000MultiRange, 

462 ranges.INT8MULTIRANGE: _Pg8000MultiRange, 

463 ranges.NUMMULTIRANGE: _Pg8000MultiRange, 

464 ranges.DATEMULTIRANGE: _Pg8000MultiRange, 

465 ranges.TSMULTIRANGE: _Pg8000MultiRange, 

466 ranges.TSTZMULTIRANGE: _Pg8000MultiRange, 

467 }, 

468 ) 

469 

470 def __init__(self, client_encoding=None, **kwargs): 

471 PGDialect.__init__(self, **kwargs) 

472 self.client_encoding = client_encoding 

473 

474 if self._dbapi_version < (1, 16, 6): 

475 raise NotImplementedError("pg8000 1.16.6 or greater is required") 

476 

477 if self._native_inet_types: 

478 raise NotImplementedError( 

479 "The pg8000 dialect does not fully implement " 

480 "ipaddress type handling; INET is supported by default, " 

481 "CIDR is not" 

482 ) 

483 

484 @util.memoized_property 

485 def _dbapi_version(self): 

486 if self.dbapi and hasattr(self.dbapi, "__version__"): 

487 return tuple( 

488 [ 

489 int(x) 

490 for x in re.findall( 

491 r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__ 

492 ) 

493 ] 

494 ) 

495 else: 

496 return (99, 99, 99) 

497 

498 @classmethod 

499 def import_dbapi(cls): 

500 return __import__("pg8000") 

501 

502 def create_connect_args(self, url): 

503 opts = url.translate_connect_args(username="user") 

504 if "port" in opts: 

505 opts["port"] = int(opts["port"]) 

506 opts.update(url.query) 

507 return ([], opts) 

508 

509 def is_disconnect(self, e, connection, cursor): 

510 if isinstance(e, self.dbapi.InterfaceError) and "network error" in str( 

511 e 

512 ): 

513 # new as of pg8000 1.19.0 for broken connections 

514 return True 

515 

516 # connection was closed normally 

517 return "connection is closed" in str(e) 

518 

519 def get_isolation_level_values(self, dbapi_connection): 

520 return ( 

521 "AUTOCOMMIT", 

522 "READ COMMITTED", 

523 "READ UNCOMMITTED", 

524 "REPEATABLE READ", 

525 "SERIALIZABLE", 

526 ) 

527 

528 def set_isolation_level(self, dbapi_connection, level): 

529 level = level.replace("_", " ") 

530 

531 if level == "AUTOCOMMIT": 

532 dbapi_connection.autocommit = True 

533 else: 

534 dbapi_connection.autocommit = False 

535 cursor = dbapi_connection.cursor() 

536 cursor.execute( 

537 "SET SESSION CHARACTERISTICS AS TRANSACTION " 

538 f"ISOLATION LEVEL {level}" 

539 ) 

540 cursor.execute("COMMIT") 

541 cursor.close() 

542 

543 def set_readonly(self, connection, value): 

544 cursor = connection.cursor() 

545 try: 

546 cursor.execute( 

547 "SET SESSION CHARACTERISTICS AS TRANSACTION %s" 

548 % ("READ ONLY" if value else "READ WRITE") 

549 ) 

550 cursor.execute("COMMIT") 

551 finally: 

552 cursor.close() 

553 

554 def get_readonly(self, connection): 

555 cursor = connection.cursor() 

556 try: 

557 cursor.execute("show transaction_read_only") 

558 val = cursor.fetchone()[0] 

559 finally: 

560 cursor.close() 

561 

562 return val == "on" 

563 

564 def set_deferrable(self, connection, value): 

565 cursor = connection.cursor() 

566 try: 

567 cursor.execute( 

568 "SET SESSION CHARACTERISTICS AS TRANSACTION %s" 

569 % ("DEFERRABLE" if value else "NOT DEFERRABLE") 

570 ) 

571 cursor.execute("COMMIT") 

572 finally: 

573 cursor.close() 

574 

575 def get_deferrable(self, connection): 

576 cursor = connection.cursor() 

577 try: 

578 cursor.execute("show transaction_deferrable") 

579 val = cursor.fetchone()[0] 

580 finally: 

581 cursor.close() 

582 

583 return val == "on" 

584 

585 def _set_client_encoding(self, dbapi_connection, client_encoding): 

586 cursor = dbapi_connection.cursor() 

587 cursor.execute( 

588 f"""SET CLIENT_ENCODING TO '{ 

589 client_encoding.replace("'", "''") 

590 }'""" 

591 ) 

592 cursor.execute("COMMIT") 

593 cursor.close() 

594 

595 def do_begin_twophase(self, connection, xid): 

596 connection.connection.tpc_begin((0, xid, "")) 

597 

598 def do_prepare_twophase(self, connection, xid): 

599 connection.connection.tpc_prepare() 

600 

601 def do_rollback_twophase( 

602 self, connection, xid, is_prepared=True, recover=False 

603 ): 

604 connection.connection.tpc_rollback((0, xid, "")) 

605 

606 def do_commit_twophase( 

607 self, connection, xid, is_prepared=True, recover=False 

608 ): 

609 connection.connection.tpc_commit((0, xid, "")) 

610 

611 def do_recover_twophase(self, connection): 

612 return [row[1] for row in connection.connection.tpc_recover()] 

613 

614 def on_connect(self): 

615 fns = [] 

616 

617 def on_connect(conn): 

618 conn.py_types[quoted_name] = conn.py_types[str] 

619 

620 fns.append(on_connect) 

621 

622 if self.client_encoding is not None: 

623 

624 def on_connect(conn): 

625 self._set_client_encoding(conn, self.client_encoding) 

626 

627 fns.append(on_connect) 

628 

629 if self._native_inet_types is False: 

630 

631 def on_connect(conn): 

632 # inet 

633 conn.register_in_adapter(869, lambda s: s) 

634 

635 # cidr 

636 conn.register_in_adapter(650, lambda s: s) 

637 

638 fns.append(on_connect) 

639 

640 if self._json_deserializer: 

641 

642 def on_connect(conn): 

643 # json 

644 conn.register_in_adapter(114, self._json_deserializer) 

645 

646 # jsonb 

647 conn.register_in_adapter(3802, self._json_deserializer) 

648 

649 fns.append(on_connect) 

650 

651 if len(fns) > 0: 

652 

653 def on_connect(conn): 

654 for fn in fns: 

655 fn(conn) 

656 

657 return on_connect 

658 else: 

659 return None 

660 

661 @util.memoized_property 

662 def _dialect_specific_select_one(self): 

663 return ";" 

664 

665 

666dialect = PGDialect_pg8000