1# dialects/postgresql/pg8000.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors <see AUTHORS
3# file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8
9r"""
10.. dialect:: postgresql+pg8000
11 :name: pg8000
12 :dbapi: pg8000
13 :connectstring: postgresql+pg8000://user:password@host:port/dbname[?key=value&key=value...]
14 :url: https://pypi.org/project/pg8000/
15
16.. versionchanged:: 1.4 The pg8000 dialect has been updated for version
17 1.16.6 and higher, and is again part of SQLAlchemy's continuous integration
18 with full feature support.
19
20.. _pg8000_unicode:
21
22Unicode
23-------
24
25pg8000 will encode / decode string values between it and the server using the
26PostgreSQL ``client_encoding`` parameter; by default this is the value in
27the ``postgresql.conf`` file, which often defaults to ``SQL_ASCII``.
28Typically, this can be changed to ``utf-8``, as a more useful default::
29
30 # client_encoding = sql_ascii # actually, defaults to database encoding
31 client_encoding = utf8
32
33The ``client_encoding`` can be overridden for a session by executing the SQL:
34
35.. sourcecode:: sql
36
37 SET CLIENT_ENCODING TO 'utf8';
38
39SQLAlchemy will execute this SQL on all new connections based on the value
40passed to :func:`_sa.create_engine` using the ``client_encoding`` parameter::
41
42 engine = create_engine(
43 "postgresql+pg8000://user:pass@host/dbname", client_encoding="utf8"
44 )
45
46.. _pg8000_ssl:
47
48SSL Connections
49---------------
50
51pg8000 accepts a Python ``SSLContext`` object which may be specified using the
52:paramref:`_sa.create_engine.connect_args` dictionary::
53
54 import ssl
55
56 ssl_context = ssl.create_default_context()
57 engine = sa.create_engine(
58 "postgresql+pg8000://scott:tiger@192.168.0.199/test",
59 connect_args={"ssl_context": ssl_context},
60 )
61
62If the server uses an automatically-generated certificate that is self-signed
63or does not match the host name (as seen from the client), it may also be
64necessary to disable hostname checking::
65
66 import ssl
67
68 ssl_context = ssl.create_default_context()
69 ssl_context.check_hostname = False
70 ssl_context.verify_mode = ssl.CERT_NONE
71 engine = sa.create_engine(
72 "postgresql+pg8000://scott:tiger@192.168.0.199/test",
73 connect_args={"ssl_context": ssl_context},
74 )
75
76.. _pg8000_isolation_level:
77
78pg8000 Transaction Isolation Level
79-------------------------------------
80
81The pg8000 dialect offers the same isolation level settings as that
82of the :ref:`psycopg2 <psycopg2_isolation_level>` dialect:
83
84* ``READ COMMITTED``
85* ``READ UNCOMMITTED``
86* ``REPEATABLE READ``
87* ``SERIALIZABLE``
88* ``AUTOCOMMIT``
89
90.. seealso::
91
92 :ref:`postgresql_isolation_level`
93
94 :ref:`psycopg2_isolation_level`
95
96
97""" # noqa
98import decimal
99import re
100
101from . import ranges
102from .array import ARRAY as PGARRAY
103from .base import _DECIMAL_TYPES
104from .base import _FLOAT_TYPES
105from .base import _INT_TYPES
106from .base import ENUM
107from .base import INTERVAL
108from .base import PGCompiler
109from .base import PGDialect
110from .base import PGExecutionContext
111from .base import PGIdentifierPreparer
112from .json import JSON
113from .json import JSONB
114from .json import JSONPathType
115from .pg_catalog import _SpaceVector
116from .pg_catalog import OIDVECTOR
117from .types import CITEXT
118from ... import exc
119from ... import util
120from ...engine import processors
121from ...sql import sqltypes
122from ...sql.elements import quoted_name
123
124
125class _PGString(sqltypes.String):
126 render_bind_cast = True
127
128
129class _PGNumeric(sqltypes.Numeric):
130 render_bind_cast = True
131
132 def result_processor(self, dialect, coltype):
133 if self.asdecimal:
134 if coltype in _FLOAT_TYPES:
135 return processors.to_decimal_processor_factory(
136 decimal.Decimal, self._effective_decimal_return_scale
137 )
138 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
139 # pg8000 returns Decimal natively for 1700
140 return None
141 else:
142 raise exc.InvalidRequestError(
143 "Unknown PG numeric type: %d" % coltype
144 )
145 else:
146 if coltype in _FLOAT_TYPES:
147 # pg8000 returns float natively for 701
148 return None
149 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
150 return processors.to_float
151 else:
152 raise exc.InvalidRequestError(
153 "Unknown PG numeric type: %d" % coltype
154 )
155
156
157class _PGFloat(_PGNumeric, sqltypes.Float):
158 __visit_name__ = "float"
159 render_bind_cast = True
160
161
162class _PGNumericNoBind(_PGNumeric):
163 def bind_processor(self, dialect):
164 return None
165
166
167class _PGJSON(JSON):
168 render_bind_cast = True
169
170 def result_processor(self, dialect, coltype):
171 return None
172
173
174class _PGJSONB(JSONB):
175 render_bind_cast = True
176
177 def result_processor(self, dialect, coltype):
178 return None
179
180
181class _PGJSONIndexType(sqltypes.JSON.JSONIndexType):
182 def get_dbapi_type(self, dbapi):
183 raise NotImplementedError("should not be here")
184
185
186class _PGJSONIntIndexType(sqltypes.JSON.JSONIntIndexType):
187 __visit_name__ = "json_int_index"
188
189 render_bind_cast = True
190
191
192class _PGJSONStrIndexType(sqltypes.JSON.JSONStrIndexType):
193 __visit_name__ = "json_str_index"
194
195 render_bind_cast = True
196
197
198class _PGJSONPathType(JSONPathType):
199 pass
200
201 # DBAPI type 1009
202
203
204class _PGEnum(ENUM):
205 def get_dbapi_type(self, dbapi):
206 return dbapi.UNKNOWN
207
208
209class _PGInterval(INTERVAL):
210 render_bind_cast = True
211
212 def get_dbapi_type(self, dbapi):
213 return dbapi.INTERVAL
214
215 @classmethod
216 def adapt_emulated_to_native(cls, interval, **kw):
217 return _PGInterval(precision=interval.second_precision)
218
219
220class _PGTimeStamp(sqltypes.DateTime):
221 render_bind_cast = True
222
223
224class _PGDate(sqltypes.Date):
225 render_bind_cast = True
226
227
228class _PGTime(sqltypes.Time):
229 render_bind_cast = True
230
231
232class _PGInteger(sqltypes.Integer):
233 render_bind_cast = True
234
235
236class _PGSmallInteger(sqltypes.SmallInteger):
237 render_bind_cast = True
238
239
240class _PGNullType(sqltypes.NullType):
241 pass
242
243
244class _PGBigInteger(sqltypes.BigInteger):
245 render_bind_cast = True
246
247
248class _PGBoolean(sqltypes.Boolean):
249 render_bind_cast = True
250
251
252class _PGARRAY(PGARRAY):
253 render_bind_cast = True
254
255
256class _PGOIDVECTOR(_SpaceVector, OIDVECTOR):
257 pass
258
259
260class _Pg8000Range(ranges.AbstractSingleRangeImpl):
261 def bind_processor(self, dialect):
262 pg8000_Range = dialect.dbapi.Range
263
264 def to_range(value):
265 if isinstance(value, ranges.Range):
266 value = pg8000_Range(
267 value.lower, value.upper, value.bounds, value.empty
268 )
269 return value
270
271 return to_range
272
273 def result_processor(self, dialect, coltype):
274 def to_range(value):
275 if value is not None:
276 value = ranges.Range(
277 value.lower,
278 value.upper,
279 bounds=value.bounds,
280 empty=value.is_empty,
281 )
282 return value
283
284 return to_range
285
286
287class _Pg8000MultiRange(ranges.AbstractMultiRangeImpl):
288 def bind_processor(self, dialect):
289 pg8000_Range = dialect.dbapi.Range
290
291 def to_multirange(value):
292 if isinstance(value, list):
293 mr = []
294 for v in value:
295 if isinstance(v, ranges.Range):
296 mr.append(
297 pg8000_Range(v.lower, v.upper, v.bounds, v.empty)
298 )
299 else:
300 mr.append(v)
301 return mr
302 else:
303 return value
304
305 return to_multirange
306
307 def result_processor(self, dialect, coltype):
308 def to_multirange(value):
309 if value is None:
310 return None
311 else:
312 return ranges.MultiRange(
313 ranges.Range(
314 v.lower, v.upper, bounds=v.bounds, empty=v.is_empty
315 )
316 for v in value
317 )
318
319 return to_multirange
320
321
322_server_side_id = util.counter()
323
324
325class PGExecutionContext_pg8000(PGExecutionContext):
326 def create_server_side_cursor(self):
327 ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:])
328 return ServerSideCursor(self._dbapi_connection.cursor(), ident)
329
330 def pre_exec(self):
331 if not self.compiled:
332 return
333
334
335class ServerSideCursor:
336 server_side = True
337
338 def __init__(self, cursor, ident):
339 self.ident = ident
340 self.cursor = cursor
341
342 @property
343 def connection(self):
344 return self.cursor.connection
345
346 @property
347 def rowcount(self):
348 return self.cursor.rowcount
349
350 @property
351 def description(self):
352 return self.cursor.description
353
354 def execute(self, operation, args=(), stream=None):
355 op = "DECLARE " + self.ident + " NO SCROLL CURSOR FOR " + operation
356 self.cursor.execute(op, args, stream=stream)
357 return self
358
359 def executemany(self, operation, param_sets):
360 self.cursor.executemany(operation, param_sets)
361 return self
362
363 def fetchone(self):
364 self.cursor.execute("FETCH FORWARD 1 FROM " + self.ident)
365 return self.cursor.fetchone()
366
367 def fetchmany(self, num=None):
368 if num is None:
369 return self.fetchall()
370 else:
371 self.cursor.execute(
372 "FETCH FORWARD " + str(int(num)) + " FROM " + self.ident
373 )
374 return self.cursor.fetchall()
375
376 def fetchall(self):
377 self.cursor.execute("FETCH FORWARD ALL FROM " + self.ident)
378 return self.cursor.fetchall()
379
380 def close(self):
381 self.cursor.execute("CLOSE " + self.ident)
382 self.cursor.close()
383
384 def setinputsizes(self, *sizes):
385 self.cursor.setinputsizes(*sizes)
386
387 def setoutputsize(self, size, column=None):
388 pass
389
390
391class PGCompiler_pg8000(PGCompiler):
392 def visit_mod_binary(self, binary, operator, **kw):
393 return (
394 self.process(binary.left, **kw)
395 + " %% "
396 + self.process(binary.right, **kw)
397 )
398
399
400class PGIdentifierPreparer_pg8000(PGIdentifierPreparer):
401 def __init__(self, *args, **kwargs):
402 PGIdentifierPreparer.__init__(self, *args, **kwargs)
403 self._double_percents = False
404
405
406class PGDialect_pg8000(PGDialect):
407 driver = "pg8000"
408 supports_statement_cache = True
409
410 supports_unicode_statements = True
411
412 supports_unicode_binds = True
413
414 default_paramstyle = "format"
415 supports_sane_multi_rowcount = True
416 execution_ctx_cls = PGExecutionContext_pg8000
417 statement_compiler = PGCompiler_pg8000
418 preparer = PGIdentifierPreparer_pg8000
419 supports_server_side_cursors = True
420
421 render_bind_cast = True
422
423 # reversed as of pg8000 1.16.6. 1.16.5 and lower
424 # are no longer compatible
425 description_encoding = None
426 # description_encoding = "use_encoding"
427
428 colspecs = util.update_copy(
429 PGDialect.colspecs,
430 {
431 sqltypes.String: _PGString,
432 sqltypes.Numeric: _PGNumericNoBind,
433 sqltypes.Float: _PGFloat,
434 sqltypes.JSON: _PGJSON,
435 sqltypes.Boolean: _PGBoolean,
436 sqltypes.NullType: _PGNullType,
437 JSONB: _PGJSONB,
438 CITEXT: CITEXT,
439 sqltypes.JSON.JSONPathType: _PGJSONPathType,
440 sqltypes.JSON.JSONIndexType: _PGJSONIndexType,
441 sqltypes.JSON.JSONIntIndexType: _PGJSONIntIndexType,
442 sqltypes.JSON.JSONStrIndexType: _PGJSONStrIndexType,
443 sqltypes.Interval: _PGInterval,
444 INTERVAL: _PGInterval,
445 sqltypes.DateTime: _PGTimeStamp,
446 sqltypes.DateTime: _PGTimeStamp,
447 sqltypes.Date: _PGDate,
448 sqltypes.Time: _PGTime,
449 sqltypes.Integer: _PGInteger,
450 sqltypes.SmallInteger: _PGSmallInteger,
451 sqltypes.BigInteger: _PGBigInteger,
452 sqltypes.Enum: _PGEnum,
453 sqltypes.ARRAY: _PGARRAY,
454 OIDVECTOR: _PGOIDVECTOR,
455 ranges.INT4RANGE: _Pg8000Range,
456 ranges.INT8RANGE: _Pg8000Range,
457 ranges.NUMRANGE: _Pg8000Range,
458 ranges.DATERANGE: _Pg8000Range,
459 ranges.TSRANGE: _Pg8000Range,
460 ranges.TSTZRANGE: _Pg8000Range,
461 ranges.INT4MULTIRANGE: _Pg8000MultiRange,
462 ranges.INT8MULTIRANGE: _Pg8000MultiRange,
463 ranges.NUMMULTIRANGE: _Pg8000MultiRange,
464 ranges.DATEMULTIRANGE: _Pg8000MultiRange,
465 ranges.TSMULTIRANGE: _Pg8000MultiRange,
466 ranges.TSTZMULTIRANGE: _Pg8000MultiRange,
467 },
468 )
469
470 def __init__(self, client_encoding=None, **kwargs):
471 PGDialect.__init__(self, **kwargs)
472 self.client_encoding = client_encoding
473
474 if self._dbapi_version < (1, 16, 6):
475 raise NotImplementedError("pg8000 1.16.6 or greater is required")
476
477 if self._native_inet_types:
478 raise NotImplementedError(
479 "The pg8000 dialect does not fully implement "
480 "ipaddress type handling; INET is supported by default, "
481 "CIDR is not"
482 )
483
484 @util.memoized_property
485 def _dbapi_version(self):
486 if self.dbapi and hasattr(self.dbapi, "__version__"):
487 return tuple(
488 [
489 int(x)
490 for x in re.findall(
491 r"(\d+)(?:[-\.]?|$)", self.dbapi.__version__
492 )
493 ]
494 )
495 else:
496 return (99, 99, 99)
497
498 @classmethod
499 def import_dbapi(cls):
500 return __import__("pg8000")
501
502 def create_connect_args(self, url):
503 opts = url.translate_connect_args(username="user")
504 if "port" in opts:
505 opts["port"] = int(opts["port"])
506 opts.update(url.query)
507 return ([], opts)
508
509 def is_disconnect(self, e, connection, cursor):
510 if isinstance(e, self.dbapi.InterfaceError) and "network error" in str(
511 e
512 ):
513 # new as of pg8000 1.19.0 for broken connections
514 return True
515
516 # connection was closed normally
517 return "connection is closed" in str(e)
518
519 def get_isolation_level_values(self, dbapi_connection):
520 return (
521 "AUTOCOMMIT",
522 "READ COMMITTED",
523 "READ UNCOMMITTED",
524 "REPEATABLE READ",
525 "SERIALIZABLE",
526 )
527
528 def set_isolation_level(self, dbapi_connection, level):
529 level = level.replace("_", " ")
530
531 if level == "AUTOCOMMIT":
532 dbapi_connection.autocommit = True
533 else:
534 dbapi_connection.autocommit = False
535 cursor = dbapi_connection.cursor()
536 cursor.execute(
537 "SET SESSION CHARACTERISTICS AS TRANSACTION "
538 f"ISOLATION LEVEL {level}"
539 )
540 cursor.execute("COMMIT")
541 cursor.close()
542
543 def set_readonly(self, connection, value):
544 cursor = connection.cursor()
545 try:
546 cursor.execute(
547 "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
548 % ("READ ONLY" if value else "READ WRITE")
549 )
550 cursor.execute("COMMIT")
551 finally:
552 cursor.close()
553
554 def get_readonly(self, connection):
555 cursor = connection.cursor()
556 try:
557 cursor.execute("show transaction_read_only")
558 val = cursor.fetchone()[0]
559 finally:
560 cursor.close()
561
562 return val == "on"
563
564 def set_deferrable(self, connection, value):
565 cursor = connection.cursor()
566 try:
567 cursor.execute(
568 "SET SESSION CHARACTERISTICS AS TRANSACTION %s"
569 % ("DEFERRABLE" if value else "NOT DEFERRABLE")
570 )
571 cursor.execute("COMMIT")
572 finally:
573 cursor.close()
574
575 def get_deferrable(self, connection):
576 cursor = connection.cursor()
577 try:
578 cursor.execute("show transaction_deferrable")
579 val = cursor.fetchone()[0]
580 finally:
581 cursor.close()
582
583 return val == "on"
584
585 def _set_client_encoding(self, dbapi_connection, client_encoding):
586 cursor = dbapi_connection.cursor()
587 cursor.execute(
588 f"""SET CLIENT_ENCODING TO '{
589 client_encoding.replace("'", "''")
590 }'"""
591 )
592 cursor.execute("COMMIT")
593 cursor.close()
594
595 def do_begin_twophase(self, connection, xid):
596 connection.connection.tpc_begin((0, xid, ""))
597
598 def do_prepare_twophase(self, connection, xid):
599 connection.connection.tpc_prepare()
600
601 def do_rollback_twophase(
602 self, connection, xid, is_prepared=True, recover=False
603 ):
604 connection.connection.tpc_rollback((0, xid, ""))
605
606 def do_commit_twophase(
607 self, connection, xid, is_prepared=True, recover=False
608 ):
609 connection.connection.tpc_commit((0, xid, ""))
610
611 def do_recover_twophase(self, connection):
612 return [row[1] for row in connection.connection.tpc_recover()]
613
614 def on_connect(self):
615 fns = []
616
617 def on_connect(conn):
618 conn.py_types[quoted_name] = conn.py_types[str]
619
620 fns.append(on_connect)
621
622 if self.client_encoding is not None:
623
624 def on_connect(conn):
625 self._set_client_encoding(conn, self.client_encoding)
626
627 fns.append(on_connect)
628
629 if self._native_inet_types is False:
630
631 def on_connect(conn):
632 # inet
633 conn.register_in_adapter(869, lambda s: s)
634
635 # cidr
636 conn.register_in_adapter(650, lambda s: s)
637
638 fns.append(on_connect)
639
640 if self._json_deserializer:
641
642 def on_connect(conn):
643 # json
644 conn.register_in_adapter(114, self._json_deserializer)
645
646 # jsonb
647 conn.register_in_adapter(3802, self._json_deserializer)
648
649 fns.append(on_connect)
650
651 if len(fns) > 0:
652
653 def on_connect(conn):
654 for fn in fns:
655 fn(conn)
656
657 return on_connect
658 else:
659 return None
660
661 @util.memoized_property
662 def _dialect_specific_select_one(self):
663 return ";"
664
665
666dialect = PGDialect_pg8000