1# dialects/postgresql/_psycopg_common.py
2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors
3# <see AUTHORS file>
4#
5# This module is part of SQLAlchemy and is released under
6# the MIT License: https://www.opensource.org/licenses/mit-license.php
7# mypy: ignore-errors
8from __future__ import annotations
9
10import decimal
11
12from .array import ARRAY as PGARRAY
13from .base import _DECIMAL_TYPES
14from .base import _FLOAT_TYPES
15from .base import _INT_TYPES
16from .base import PGDialect
17from .base import PGExecutionContext
18from .hstore import HSTORE
19from .pg_catalog import _SpaceVector
20from .pg_catalog import INT2VECTOR
21from .pg_catalog import OIDVECTOR
22from ... import exc
23from ... import types as sqltypes
24from ... import util
25from ...engine import processors
26
27_server_side_id = util.counter()
28
29
30class _PsycopgNumeric(sqltypes.Numeric):
31 def bind_processor(self, dialect):
32 return None
33
34 def result_processor(self, dialect, coltype):
35 if self.asdecimal:
36 if coltype in _FLOAT_TYPES:
37 return processors.to_decimal_processor_factory(
38 decimal.Decimal, self._effective_decimal_return_scale
39 )
40 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
41 # psycopg returns Decimal natively for 1700
42 return None
43 else:
44 raise exc.InvalidRequestError(
45 "Unknown PG numeric type: %d" % coltype
46 )
47 else:
48 if coltype in _FLOAT_TYPES:
49 # psycopg returns float natively for 701
50 return None
51 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES:
52 return processors.to_float
53 else:
54 raise exc.InvalidRequestError(
55 "Unknown PG numeric type: %d" % coltype
56 )
57
58
59class _PsycopgFloat(_PsycopgNumeric):
60 __visit_name__ = "float"
61
62
63class _PsycopgHStore(HSTORE):
64 def bind_processor(self, dialect):
65 if dialect._has_native_hstore:
66 return None
67 else:
68 return super().bind_processor(dialect)
69
70 def result_processor(self, dialect, coltype):
71 if dialect._has_native_hstore:
72 return None
73 else:
74 return super().result_processor(dialect, coltype)
75
76
77class _PsycopgARRAY(PGARRAY):
78 render_bind_cast = True
79
80
81class _PsycopgINT2VECTOR(_SpaceVector, INT2VECTOR):
82 pass
83
84
85class _PsycopgOIDVECTOR(_SpaceVector, OIDVECTOR):
86 pass
87
88
89class _PGExecutionContext_common_psycopg(PGExecutionContext):
90 def create_server_side_cursor(self):
91 # use server-side cursors:
92 # psycopg
93 # https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#server-side-cursors
94 # psycopg2
95 # https://www.psycopg.org/docs/usage.html#server-side-cursors
96 ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:])
97 return self._dbapi_connection.cursor(ident)
98
99
100class _PGDialect_common_psycopg(PGDialect):
101 supports_statement_cache = True
102 supports_server_side_cursors = True
103
104 default_paramstyle = "pyformat"
105
106 _has_native_hstore = True
107
108 colspecs = util.update_copy(
109 PGDialect.colspecs,
110 {
111 sqltypes.Numeric: _PsycopgNumeric,
112 sqltypes.Float: _PsycopgFloat,
113 HSTORE: _PsycopgHStore,
114 sqltypes.ARRAY: _PsycopgARRAY,
115 INT2VECTOR: _PsycopgINT2VECTOR,
116 OIDVECTOR: _PsycopgOIDVECTOR,
117 },
118 )
119
120 def __init__(
121 self,
122 client_encoding=None,
123 use_native_hstore=True,
124 **kwargs,
125 ):
126 PGDialect.__init__(self, **kwargs)
127 if not use_native_hstore:
128 self._has_native_hstore = False
129 self.use_native_hstore = use_native_hstore
130 self.client_encoding = client_encoding
131
132 def create_connect_args(self, url):
133 opts = url.translate_connect_args(username="user", database="dbname")
134
135 multihosts, multiports = self._split_multihost_from_url(url)
136
137 if opts or url.query:
138 if not opts:
139 opts = {}
140 if "port" in opts:
141 opts["port"] = int(opts["port"])
142 opts.update(url.query)
143
144 if multihosts:
145 opts["host"] = ",".join(multihosts)
146 comma_ports = ",".join(str(p) if p else "" for p in multiports)
147 if comma_ports:
148 opts["port"] = comma_ports
149 return ([], opts)
150 else:
151 # no connection arguments whatsoever; psycopg2.connect()
152 # requires that "dsn" be present as a blank string.
153 return ([""], opts)
154
155 def get_isolation_level_values(self, dbapi_connection):
156 return (
157 "AUTOCOMMIT",
158 "READ COMMITTED",
159 "READ UNCOMMITTED",
160 "REPEATABLE READ",
161 "SERIALIZABLE",
162 )
163
164 def set_deferrable(self, connection, value):
165 connection.deferrable = value
166
167 def get_deferrable(self, connection):
168 return connection.deferrable
169
170 def _do_autocommit(self, connection, value):
171 connection.autocommit = value
172
173 def do_ping(self, dbapi_connection):
174 before_autocommit = dbapi_connection.autocommit
175
176 if not before_autocommit:
177 dbapi_connection.autocommit = True
178 cursor = dbapi_connection.cursor()
179 try:
180 cursor.execute(self._dialect_specific_select_one)
181 finally:
182 cursor.close()
183 if not before_autocommit and not dbapi_connection.closed:
184 dbapi_connection.autocommit = before_autocommit
185
186 return True