1# dialects/postgresql/pg8000.py
2# Copyright (C) 2005-2026 the SQLAlchemy authors and contributors <see AUTHORS
3# file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9r"""
10.. dialect:: postgresql+pg8000
11 :name: pg8000
12 :dbapi: pg8000
13 :connectstring: postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...]
14 :url: https://pypi.org/project/pg8000/
15
16.. versionchanged:: 1.4 The pg8000 dialect has been updated for version
17 1.16.6 and higher, and is again part of SQLAlchemy's continuous integration
18 with full feature support.
19
20.. _pg8000_unicode:
21
22Unicode
23-------
24
25pg8000 will encode / decode string values between it and the server using the
26PostgreSQL ``client_encoding`` parameter; by default this is the value in
27the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``.
28Typically, this can be changed to ``utf-8``, as a more useful default::
29
30 # client_encoding = sql_ascii # actually, defaults to database encoding
31 client_encoding = utf8
32
33The ``client_encoding`` can be overridden for a session by executing the SQL:
34
35.. sourcecode:: sql
36
37 SET CLIENT_ENCODING TO 'utf8';
38
39SQLAlchemy will execute this SQL on all new connections based on the value
40passed to :func:`_sa.create_engine` using the ``client_encoding`` parameter::
41
42 engine = create_engine(
43 "postgresql+pg8000://user:pass@host/dbname", client_encoding="utf8"
44 )
45
46.. _pg8000_ssl:
47
48SSL Connections
49---------------
50
51pg8000 accepts a Python ``SSLContext`` object which may be specified using the
52:paramref:`_sa.create_engine.connect_args` dictionary::
53
54 import ssl
55
56 ssl_context = ssl.create_default_context()
57 engine = sa.create_engine(
58 "postgresql+pg8000://scott:tiger@192.168.0.199/test",
59 connect_args={"ssl_context": ssl_context},
60 )
61
62If the server uses an automatically-generated certificate that is self-signed
63or does not match the host name (as seen from the client), it may also be
64necessary to disable hostname checking::
65
66 import ssl
67
68 ssl_context = ssl.create_default_context()
69 ssl_context.check_hostname = False
70 ssl_context.verify_mode = ssl.CERT_NONE
71 engine = sa.create_engine(
72 "postgresql+pg8000://scott:tiger@192.168.0.199/test",
73 connect_args={"ssl_context": ssl_context},
74 )
75
76.. _pg8000_isolation_level:
77
78pg8000 Transaction Isolation Level
79-------------------------------------
80
81The pg8000 dialect offers the same isolation level settings as that
82of the :ref:`psycopg2 <psycopg2_isolation_level>` dialect:
83
84* ``READ COMMITTED``
85* ``READ UNCOMMITTED``
86* ``REPEATABLE READ``
87* ``SERIALIZABLE``
88* ``AUTOCOMMIT``
89
90.. seealso::
91
92 :ref:`postgresql_isolation_level`
93
94 :ref:`psycopg2_isolation_level`
95
96
97""" # noqa
98
99import decimal
100import re
101
102from . import ranges
103from .array import ARRAY as PGARRAY
104from .base import _DECIMAL_TYPES
105from .base import _FLOAT_TYPES
106from .base import _INT_TYPES
107from .base import ENUM
108from .base import INTERVAL
109from .base import PGCompiler
110from .base import PGDialect
111from .base import PGExecutionContext
112from .base import PGIdentifierPreparer
113from .json import JSON
114from .json import JSONB
115from .json import JSONPathType
116from .pg_catalog import _SpaceVector
117from .pg_catalog import OIDVECTOR
118from .types import CITEXT
119from ... import exc
120from ... import util
121from ...engine import processors
122from ...sql import sqltypes
123from ...sql.elements import quoted_name
124
125
126class _PGString(sqltypes.String):
127 render_bind_cast = True
128
129
130class _PGNumeric(sqltypes.Numeric):
131 render_bind_cast = True
132
133 def result_processor(self, dialect, coltype):
134 if self.asdecimal:
135 if coltype in _FLOAT_TYPES:
136 return processors.to_decimal_processor_factory(
137 decimal.Decimal, self._effective_decimal_return_scale
138 )
139 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
140 # pg8000 returns Decimal natively for 1700
141 return None
142 else:
143 raise exc.InvalidRequestError(
144 "Unknown PG numeric type: %d" % coltype
145 )
146 else:
147 if coltype in _FLOAT_TYPES:
148 # pg8000 returns float natively for 701
149 return None
150 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
151 return processors.to_float
152 else:
153 raise exc.InvalidRequestError(
154 "Unknown PG numeric type: %d" % coltype
155 )
156
157
158class _PGFloat(_PGNumeric, sqltypes.Float):
159 __visit_name__ = "float"
160 render_bind_cast = True
161
162
163class _PGNumericNoBind(_PGNumeric):
164 def bind_processor(self, dialect):
165 return None
166
167
168class _PGJSON(JSON):
169 render_bind_cast = True
170
171 def result_processor(self, dialect, coltype):
172 return None
173
174
175class _PGJSONB(JSONB):
176 render_bind_cast = True
177
178 def result_processor(self, dialect, coltype):
179 return None
180
181
182class _PGJSONIndexType(sqltypes.JSON.JSONIndexType):
183 def get_dbapi_type(self, dbapi):
184 raise NotImplementedError("should not be here")
185
186
187class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType):
188 __visit_name__ = "json_int_index"
189
190 render_bind_cast = True
191
192
193class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType):
194 __visit_name__ = "json_str_index"
195
196 render_bind_cast = True
197
198
199class _PGJSONPathType(JSONPathType):
200 pass
201
202 # DBAPI type 1009
203
204
205class _PGEnum(ENUM):
206 def get_dbapi_type(self, dbapi):
207 return dbapi.UNKNOWN
208
209
210class _PGInterval(INTERVAL):
211 render_bind_cast = True
212
213 def get_dbapi_type(self, dbapi):
214 return dbapi.INTERVAL
215
216 @classmethod
217 def adapt_emulated_to_native(cls, interval, **kw):
218 return _PGInterval(precision=interval.second_precision)
219
220
221class _PGTimeStamp(sqltypes.DateTime):
222 render_bind_cast = True
223
224
225class _PGDate(sqltypes.Date):
226 render_bind_cast = True
227
228
229class _PGTime(sqltypes.Time):
230 render_bind_cast = True
231
232
233class _PGInteger(sqltypes.Integer):
234 render_bind_cast = True
235
236
237class _PGSmallInteger(sqltypes.SmallInteger):
238 render_bind_cast = True
239
240
241class _PGNullType(sqltypes.NullType):
242 pass
243
244
245class _PGBigInteger(sqltypes.BigInteger):
246 render_bind_cast = True
247
248
249class _PGBoolean(sqltypes.Boolean):
250 render_bind_cast = True
251
252
253class _PGARRAY(PGARRAY):
254 render_bind_cast = True
255
256
257class _PGOIDVECTOR(_SpaceVector, OIDVECTOR):
258 pass
259
260
261class _Pg8000Range(ranges.AbstractSingleRangeImpl):
262 def bind_processor(self, dialect):
263 pg8000_Range = dialect.dbapi.Range
264
265 def to_range(value):
266 if isinstance(value, ranges.Range):
267 value = pg8000_Range(
268 value.lower, value.upper, value.bounds, value.empty
269 )
270 return value
271
272 return to_range
273
274 def result_processor(self, dialect, coltype):
275 def to_range(value):
276 if value is not None:
277 value = ranges.Range(
278 value.lower,
279 value.upper,
280 bounds=value.bounds,
281 empty=value.is_empty,
282 )
283 return value
284
285 return to_range
286
287
288class _Pg8000MultiRange(ranges.AbstractMultiRangeImpl):
289 def bind_processor(self, dialect):
290 pg8000_Range = dialect.dbapi.Range
291
292 def to_multirange(value):
293 if isinstance(value, list):
294 mr = []
295 for v in value:
296 if isinstance(v, ranges.Range):
297 mr.append(
298 pg8000_Range(v.lower, v.upper, v.bounds, v.empty)
299 )
300 else:
301 mr.append(v)
302 return mr
303 else:
304 return value
305
306 return to_multirange
307
308 def result_processor(self, dialect, coltype):
309 def to_multirange(value):
310 if value is None:
311 return None
312 else:
313 return ranges.MultiRange(
314 ranges.Range(
315 v.lower, v.upper, bounds=v.bounds, empty=v.is_empty
316 )
317 for v in value
318 )
319
320 return to_multirange
321
322
323_server_side_id = util.counter()
324
325
326class PGExecutionContext_pg8000(PGExecutionContext):
327 def create_server_side_cursor(self):
328 ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:])
329 return ServerSideCursor(self._dbapi_connection.cursor(), ident)
330
331 def pre_exec(self):
332 if not self.compiled:
333 return
334
335
336class ServerSideCursor:
337 server_side = True
338
339 def __init__(self, cursor, ident):
340 self.ident = ident
341 self.cursor = cursor
342
343 @property
344 def connection(self):
345 return self.cursor.connection
346
347 @property
348 def rowcount(self):
349 return self.cursor.rowcount
350
351 @property
352 def description(self):
353 return self.cursor.description
354
355 def execute(self, operation, args=(), stream=None):
356 op = "DECLARE " + self.ident + " NO SCROLL CURSOR FOR " + operation
357 self.cursor.execute(op, args, stream=stream)
358 return self
359
360 def executemany(self, operation, param_sets):
361 self.cursor.executemany(operation, param_sets)
362 return self
363
364 def fetchone(self):
365 self.cursor.execute("FETCH FORWARD 1 FROM " + self.ident)
366 return self.cursor.fetchone()
367
368 def fetchmany(self, num=None):
369 if num is None:
370 return self.fetchall()
371 else:
372 self.cursor.execute(
373 "FETCH FORWARD " + str(int(num)) + " FROM " + self.ident
374 )
375 return self.cursor.fetchall()
376
377 def fetchall(self):
378 self.cursor.execute("FETCH FORWARD ALL FROM " + self.ident)
379 return self.cursor.fetchall()
380
381 def close(self):
382 self.cursor.execute("CLOSE " + self.ident)
383 self.cursor.close()
384
385 def setinputsizes(self, *sizes):
386 self.cursor.setinputsizes(*sizes)
387
388 def setoutputsize(self, size, column=None):
389 pass
390
391
392class PGCompiler_pg8000(PGCompiler):
393 def visit_mod_binary(self, binary, operator, **kw):
394 return (
395 self.process(binary.left, **kw)
396 + " %% "
397 + self.process(binary.right, **kw)
398 )
399
400
401class PGIdentifierPreparer_pg8000(PGIdentifierPreparer):
402 def __init__(self, *args, **kwargs):
403 PGIdentifierPreparer.__init__(self, *args, **kwargs)
404 self._double_percents = False
405
406
407class PGDialect_pg8000(PGDialect):
408 driver = "pg8000"
409 supports_statement_cache = True
410
411 supports_unicode_statements = True
412
413 supports_unicode_binds = True
414
415 default_paramstyle = "format"
416 supports_sane_multi_rowcount = True
417 execution_ctx_cls = PGExecutionContext_pg8000
418 statement_compiler = PGCompiler_pg8000
419 preparer = PGIdentifierPreparer_pg8000
420 supports_server_side_cursors = True
421
422 render_bind_cast = True
423
424 # reversed as of pg8000 1.16.6. 1.16.5 and lower
425 # are no longer compatible
426 description_encoding = None
427 # description_encoding = "use_encoding"
428
429 colspecs = util.update_copy(
430 PGDialect.colspecs,
431 {
432 sqltypes.String: _PGString,
433 sqltypes.Numeric: _PGNumericNoBind,
434 sqltypes.Float: _PGFloat,
435 sqltypes.JSON: _PGJSON,
436 sqltypes.Boolean: _PGBoolean,
437 sqltypes.NullType: _PGNullType,
438 JSONB: _PGJSONB,
439 CITEXT: CITEXT,
440 sqltypes.JSON.JSONPathType: _PGJSONPathType,
441 sqltypes.JSON.JSONIndexType: _PGJSONIndexType,
442 sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType,
443 sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType,
444 sqltypes.Interval: _PGInterval,
445 INTERVAL: _PGInterval,
446 sqltypes.DateTime: _PGTimeStamp,
447 sqltypes.DateTime: _PGTimeStamp,
448 sqltypes.Date: _PGDate,
449 sqltypes.Time: _PGTime,
450 sqltypes.Integer: _PGInteger,
451 sqltypes.SmallInteger: _PGSmallInteger,
452 sqltypes.BigInteger: _PGBigInteger,
453 sqltypes.Enum: _PGEnum,
454 sqltypes.ARRAY: _PGARRAY,
455 OIDVECTOR: _PGOIDVECTOR,
456 ranges.INT4RANGE: _Pg8000Range,
457 ranges.INT8RANGE: _Pg8000Range,
458 ranges.NUMRANGE: _Pg8000Range,
459 ranges.DATERANGE: _Pg8000Range,
460 ranges.TSRANGE: _Pg8000Range,
461 ranges.TSTZRANGE: _Pg8000Range,
462 ranges.INT4MULTIRANGE: _Pg8000MultiRange,
463 ranges.INT8MULTIRANGE: _Pg8000MultiRange,
464 ranges.NUMMULTIRANGE: _Pg8000MultiRange,
465 ranges.DATEMULTIRANGE: _Pg8000MultiRange,
466 ranges.TSMULTIRANGE: _Pg8000MultiRange,
467 ranges.TSTZMULTIRANGE: _Pg8000MultiRange,
468 },
469 )
470
471 def __init__(self, client_encoding=None, **kwargs):
472 PGDialect.__init__(self, **kwargs)
473 self.client_encoding = client_encoding
474
475 if self._dbapi_version < (1, 16, 6):
476 raise NotImplementedError("pg8000 1.16.6 or greater is required")
477
478 if self._native_inet_types:
479 raise NotImplementedError(
480 "The pg8000 dialect does not fully implement "
481 "ipaddress type handling; INET is supported by default, "
482 "CIDR is not"
483 )
484
485 @util.memoized_property
486 def _dbapi_version(self):
487 if self.dbapi and hasattr(self.dbapi, "__version__"):
488 return tuple(
489 [
490 int(x)
491 for x in re.findall(
492 r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__
493 )
494 ]
495 )
496 else:
497 return (99, 99, 99)
498
499 @classmethod
500 def import_dbapi(cls):
501 return __import__("pg8000")
502
503 def create_connect_args(self, url):
504 opts = url.translate_connect_args(username="user")
505 if "port" in opts:
506 opts["port"] = int(opts["port"])
507 opts.update(url.query)
508 return ([], opts)
509
510 def is_disconnect(self, e, connection, cursor):
511 if isinstance(e, self.dbapi.InterfaceError) and "network error" in str(
512 e
513 ):
514 # new as of pg8000 1.19.0 for broken connections
515 return True
516
517 # connection was closed normally
518 return "connection is closed" in str(e)
519
520 def get_isolation_level_values(self, dbapi_connection):
521 return (
522 "AUTOCOMMIT",
523 "READ COMMITTED",
524 "READ UNCOMMITTED",
525 "REPEATABLE READ",
526 "SERIALIZABLE",
527 )
528
529 def set_isolation_level(self, dbapi_connection, level):
530 level = level.replace("_", " ")
531
532 if level == "AUTOCOMMIT":
533 dbapi_connection.autocommit = True
534 else:
535 dbapi_connection.autocommit = False
536 cursor = dbapi_connection.cursor()
537 cursor.execute(
538 "SET SESSION CHARACTERISTICS AS TRANSACTION "
539 f"ISOLATION LEVEL {level}"
540 )
541 cursor.execute("COMMIT")
542 cursor.close()
543
544 def detect_autocommit_setting(self, dbapi_conn) -> bool:
545 return bool(dbapi_conn.autocommit)
546
547 def set_readonly(self, connection, value):
548 cursor = connection.cursor()
549 try:
550 cursor.execute(
551 "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
552 % ("READ ONLY" if value else "READ WRITE")
553 )
554 cursor.execute("COMMIT")
555 finally:
556 cursor.close()
557
558 def get_readonly(self, connection):
559 cursor = connection.cursor()
560 try:
561 cursor.execute("show transaction_read_only")
562 val = cursor.fetchone()[0]
563 finally:
564 cursor.close()
565
566 return val == "on"
567
568 def set_deferrable(self, connection, value):
569 cursor = connection.cursor()
570 try:
571 cursor.execute(
572 "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
573 % ("DEFERRABLE" if value else "NOT DEFERRABLE")
574 )
575 cursor.execute("COMMIT")
576 finally:
577 cursor.close()
578
579 def get_deferrable(self, connection):
580 cursor = connection.cursor()
581 try:
582 cursor.execute("show transaction_deferrable")
583 val = cursor.fetchone()[0]
584 finally:
585 cursor.close()
586
587 return val == "on"
588
589 def _set_client_encoding(self, dbapi_connection, client_encoding):
590 cursor = dbapi_connection.cursor()
591 cursor.execute(f"""
592 SET CLIENT_ENCODING TO '{client_encoding.replace("'", "''")}'""")
593 cursor.execute("COMMIT")
594 cursor.close()
595
596 def do_begin_twophase(self, connection, xid):
597 connection.connection.tpc_begin((0, xid, ""))
598
599 def do_prepare_twophase(self, connection, xid):
600 connection.connection.tpc_prepare()
601
602 def do_rollback_twophase(
603 self, connection, xid, is_prepared=True, recover=False
604 ):
605 connection.connection.tpc_rollback((0, xid, ""))
606
607 def do_commit_twophase(
608 self, connection, xid, is_prepared=True, recover=False
609 ):
610 connection.connection.tpc_commit((0, xid, ""))
611
612 def do_recover_twophase(self, connection):
613 return [row[1] for row in connection.connection.tpc_recover()]
614
615 def on_connect(self):
616 fns = []
617
618 def on_connect(conn):
619 conn.py_types[quoted_name] = conn.py_types[str]
620
621 fns.append(on_connect)
622
623 if self.client_encoding is not None:
624
625 def on_connect(conn):
626 self._set_client_encoding(conn, self.client_encoding)
627
628 fns.append(on_connect)
629
630 if self._native_inet_types is False:
631
632 def on_connect(conn):
633 # inet
634 conn.register_in_adapter(869, lambda s: s)
635
636 # cidr
637 conn.register_in_adapter(650, lambda s: s)
638
639 fns.append(on_connect)
640
641 if self._json_deserializer:
642
643 def on_connect(conn):
644 # json
645 conn.register_in_adapter(114, self._json_deserializer)
646
647 # jsonb
648 conn.register_in_adapter(3802, self._json_deserializer)
649
650 fns.append(on_connect)
651
652 if len(fns) > 0:
653
654 def on_connect(conn):
655 for fn in fns:
656 fn(conn)
657
658 return on_connect
659 else:
660 return None
661
662 @util.memoized_property
663 def _dialect_specific_select_one(self):
664 return ";"
665
666
667dialect = PGDialect_pg8000