Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/sqlalchemy/dialects/postgresql/_psycopg_common.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

101 statements  

1# dialects/postgresql/_psycopg_common.py 

2# Copyright (C) 2005-2025 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7# mypy: ignore-errors 

8from __future__ import annotations 

9 

10import decimal 

11 

12from .array import ARRAY as PGARRAY 

13from .base import _DECIMAL_TYPES 

14from .base import _FLOAT_TYPES 

15from .base import _INT_TYPES 

16from .base import PGDialect 

17from .base import PGExecutionContext 

18from .hstore import HSTORE 

19from .pg_catalog import _SpaceVector 

20from .pg_catalog import INT2VECTOR 

21from .pg_catalog import OIDVECTOR 

22from ... import exc 

23from ... import types as sqltypes 

24from ... import util 

25from ...engine import processors 

26 

27_server_side_id = util.counter() 

28 

29 

30class _PsycopgNumeric(sqltypes.Numeric): 

31 def bind_processor(self, dialect): 

32 return None 

33 

34 def result_processor(self, dialect, coltype): 

35 if self.asdecimal: 

36 if coltype in _FLOAT_TYPES: 

37 return processors.to_decimal_processor_factory( 

38 decimal.Decimal, self._effective_decimal_return_scale 

39 ) 

40 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

41 # psycopg returns Decimal natively for 1700 

42 return None 

43 else: 

44 raise exc.InvalidRequestError( 

45 "Unknown PG numeric type: %d" % coltype 

46 ) 

47 else: 

48 if coltype in _FLOAT_TYPES: 

49 # psycopg returns float natively for 701 

50 return None 

51 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

52 return processors.to_float 

53 else: 

54 raise exc.InvalidRequestError( 

55 "Unknown PG numeric type: %d" % coltype 

56 ) 

57 

58 

59class _PsycopgFloat(_PsycopgNumeric): 

60 __visit_name__ = "float" 

61 

62 

63class _PsycopgHStore(HSTORE): 

64 def bind_processor(self, dialect): 

65 if dialect._has_native_hstore: 

66 return None 

67 else: 

68 return super().bind_processor(dialect) 

69 

70 def result_processor(self, dialect, coltype): 

71 if dialect._has_native_hstore: 

72 return None 

73 else: 

74 return super().result_processor(dialect, coltype) 

75 

76 

77class _PsycopgARRAY(PGARRAY): 

78 render_bind_cast = True 

79 

80 

81class _PsycopgINT2VECTOR(_SpaceVector, INT2VECTOR): 

82 pass 

83 

84 

85class _PsycopgOIDVECTOR(_SpaceVector, OIDVECTOR): 

86 pass 

87 

88 

89class _PGExecutionContext_common_psycopg(PGExecutionContext): 

90 def create_server_side_cursor(self): 

91 # use server-side cursors: 

92 # psycopg 

93 # https://www.psycopg.org/psycopg3/docs/advanced/cursors.html#server-side-cursors 

94 # psycopg2 

95 # https://www.psycopg.org/docs/usage.html#server-side-cursors 

96 ident = "c_%s_%s" % (hex(id(self))[2:], hex(_server_side_id())[2:]) 

97 return self._dbapi_connection.cursor(ident) 

98 

99 

100class _PGDialect_common_psycopg(PGDialect): 

101 supports_statement_cache = True 

102 supports_server_side_cursors = True 

103 

104 default_paramstyle = "pyformat" 

105 

106 _has_native_hstore = True 

107 

108 colspecs = util.update_copy( 

109 PGDialect.colspecs, 

110 { 

111 sqltypes.Numeric: _PsycopgNumeric, 

112 sqltypes.Float: _PsycopgFloat, 

113 HSTORE: _PsycopgHStore, 

114 sqltypes.ARRAY: _PsycopgARRAY, 

115 INT2VECTOR: _PsycopgINT2VECTOR, 

116 OIDVECTOR: _PsycopgOIDVECTOR, 

117 }, 

118 ) 

119 

120 def __init__( 

121 self, 

122 client_encoding=None, 

123 use_native_hstore=True, 

124 **kwargs, 

125 ): 

126 PGDialect.__init__(self, **kwargs) 

127 if not use_native_hstore: 

128 self._has_native_hstore = False 

129 self.use_native_hstore = use_native_hstore 

130 self.client_encoding = client_encoding 

131 

132 def create_connect_args(self, url): 

133 opts = url.translate_connect_args(username="user", database="dbname") 

134 

135 multihosts, multiports = self._split_multihost_from_url(url) 

136 

137 if opts or url.query: 

138 if not opts: 

139 opts = {} 

140 if "port" in opts: 

141 opts["port"] = int(opts["port"]) 

142 opts.update(url.query) 

143 

144 if multihosts: 

145 opts["host"] = ",".join(multihosts) 

146 comma_ports = ",".join(str(p) if p else "" for p in multiports) 

147 if comma_ports: 

148 opts["port"] = comma_ports 

149 return ([], opts) 

150 else: 

151 # no connection arguments whatsoever; psycopg2.connect() 

152 # requires that "dsn" be present as a blank string. 

153 return ([""], opts) 

154 

155 def get_isolation_level_values(self, dbapi_connection): 

156 return ( 

157 "AUTOCOMMIT", 

158 "READ COMMITTED", 

159 "READ UNCOMMITTED", 

160 "REPEATABLE READ", 

161 "SERIALIZABLE", 

162 ) 

163 

164 def set_deferrable(self, connection, value): 

165 connection.deferrable = value 

166 

167 def get_deferrable(self, connection): 

168 return connection.deferrable 

169 

170 def _do_autocommit(self, connection, value): 

171 connection.autocommit = value 

172 

173 def do_ping(self, dbapi_connection): 

174 before_autocommit = dbapi_connection.autocommit 

175 

176 if not before_autocommit: 

177 dbapi_connection.autocommit = True 

178 cursor = dbapi_connection.cursor() 

179 try: 

180 cursor.execute(self._dialect_specific_select_one) 

181 finally: 

182 cursor.close() 

183 if not before_autocommit and not dbapi_connection.closed: 

184 dbapi_connection.autocommit = before_autocommit 

185 

186 return True