Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/sqlalchemy/dialects/postgresql/pygresql.py: 30%

168 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# postgresql/pygresql.py 

2# Copyright (C) 2005-2022 the SQLAlchemy authors and contributors 

3# <see AUTHORS file> 

4# 

5# This module is part of SQLAlchemy and is released under 

6# the MIT License: https://www.opensource.org/licenses/mit-license.php 

7""" 

8.. dialect:: postgresql+pygresql 

9 :name: pygresql 

10 :dbapi: pgdb 

11 :connectstring: postgresql+pygresql://user:password@host:port/dbname[?key=value&key=value...] 

12 :url: https://www.pygresql.org/ 

13 

14.. note:: 

15 

16 The pygresql dialect is **not tested as part of SQLAlchemy's continuous 

17 integration** and may have unresolved issues. The recommended PostgreSQL 

18 dialect is psycopg2. 

19 

20.. deprecated:: 1.4 The pygresql DBAPI is deprecated and will be removed 

21 in a future version. Please use one of the supported DBAPIs to 

22 connect to PostgreSQL. 

23 

24""" # noqa 

25 

26import decimal 

27import re 

28 

29from .base import _DECIMAL_TYPES 

30from .base import _FLOAT_TYPES 

31from .base import _INT_TYPES 

32from .base import PGCompiler 

33from .base import PGDialect 

34from .base import PGIdentifierPreparer 

35from .base import UUID 

36from .hstore import HSTORE 

37from .json import JSON 

38from .json import JSONB 

39from ... import exc 

40from ... import processors 

41from ... import util 

42from ...sql.elements import Null 

43from ...types import JSON as Json 

44from ...types import Numeric 

45 

46 

47class _PGNumeric(Numeric): 

48 def bind_processor(self, dialect): 

49 return None 

50 

51 def result_processor(self, dialect, coltype): 

52 if not isinstance(coltype, int): 

53 coltype = coltype.oid 

54 if self.asdecimal: 

55 if coltype in _FLOAT_TYPES: 

56 return processors.to_decimal_processor_factory( 

57 decimal.Decimal, self._effective_decimal_return_scale 

58 ) 

59 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

60 # PyGreSQL returns Decimal natively for 1700 (numeric) 

61 return None 

62 else: 

63 raise exc.InvalidRequestError( 

64 "Unknown PG numeric type: %d" % coltype 

65 ) 

66 else: 

67 if coltype in _FLOAT_TYPES: 

68 # PyGreSQL returns float natively for 701 (float8) 

69 return None 

70 elif coltype in _DECIMAL_TYPES or coltype in _INT_TYPES: 

71 return processors.to_float 

72 else: 

73 raise exc.InvalidRequestError( 

74 "Unknown PG numeric type: %d" % coltype 

75 ) 

76 

77 

78class _PGHStore(HSTORE): 

79 def bind_processor(self, dialect): 

80 if not dialect.has_native_hstore: 

81 return super(_PGHStore, self).bind_processor(dialect) 

82 hstore = dialect.dbapi.Hstore 

83 

84 def process(value): 

85 if isinstance(value, dict): 

86 return hstore(value) 

87 return value 

88 

89 return process 

90 

91 def result_processor(self, dialect, coltype): 

92 if not dialect.has_native_hstore: 

93 return super(_PGHStore, self).result_processor(dialect, coltype) 

94 

95 

96class _PGJSON(JSON): 

97 def bind_processor(self, dialect): 

98 if not dialect.has_native_json: 

99 return super(_PGJSON, self).bind_processor(dialect) 

100 json = dialect.dbapi.Json 

101 

102 def process(value): 

103 if value is self.NULL: 

104 value = None 

105 elif isinstance(value, Null) or ( 

106 value is None and self.none_as_null 

107 ): 

108 return None 

109 if value is None or isinstance(value, (dict, list)): 

110 return json(value) 

111 return value 

112 

113 return process 

114 

115 def result_processor(self, dialect, coltype): 

116 if not dialect.has_native_json: 

117 return super(_PGJSON, self).result_processor(dialect, coltype) 

118 

119 

120class _PGJSONB(JSONB): 

121 def bind_processor(self, dialect): 

122 if not dialect.has_native_json: 

123 return super(_PGJSONB, self).bind_processor(dialect) 

124 json = dialect.dbapi.Json 

125 

126 def process(value): 

127 if value is self.NULL: 

128 value = None 

129 elif isinstance(value, Null) or ( 

130 value is None and self.none_as_null 

131 ): 

132 return None 

133 if value is None or isinstance(value, (dict, list)): 

134 return json(value) 

135 return value 

136 

137 return process 

138 

139 def result_processor(self, dialect, coltype): 

140 if not dialect.has_native_json: 

141 return super(_PGJSONB, self).result_processor(dialect, coltype) 

142 

143 

144class _PGUUID(UUID): 

145 def bind_processor(self, dialect): 

146 if not dialect.has_native_uuid: 

147 return super(_PGUUID, self).bind_processor(dialect) 

148 uuid = dialect.dbapi.Uuid 

149 

150 def process(value): 

151 if value is None: 

152 return None 

153 if isinstance(value, (str, bytes)): 

154 if len(value) == 16: 

155 return uuid(bytes=value) 

156 return uuid(value) 

157 if isinstance(value, int): 

158 return uuid(int=value) 

159 return value 

160 

161 return process 

162 

163 def result_processor(self, dialect, coltype): 

164 if not dialect.has_native_uuid: 

165 return super(_PGUUID, self).result_processor(dialect, coltype) 

166 if not self.as_uuid: 

167 

168 def process(value): 

169 if value is not None: 

170 return str(value) 

171 

172 return process 

173 

174 

175class _PGCompiler(PGCompiler): 

176 def visit_mod_binary(self, binary, operator, **kw): 

177 return ( 

178 self.process(binary.left, **kw) 

179 + " %% " 

180 + self.process(binary.right, **kw) 

181 ) 

182 

183 def post_process_text(self, text): 

184 return text.replace("%", "%%") 

185 

186 

187class _PGIdentifierPreparer(PGIdentifierPreparer): 

188 def _escape_identifier(self, value): 

189 value = value.replace(self.escape_quote, self.escape_to_quote) 

190 return value.replace("%", "%%") 

191 

192 

193class PGDialect_pygresql(PGDialect): 

194 

195 driver = "pygresql" 

196 supports_statement_cache = True 

197 

198 statement_compiler = _PGCompiler 

199 preparer = _PGIdentifierPreparer 

200 

201 @classmethod 

202 def dbapi(cls): 

203 import pgdb 

204 

205 util.warn_deprecated( 

206 "The pygresql DBAPI is deprecated and will be removed " 

207 "in a future version. Please use one of the supported DBAPIs to " 

208 "connect to PostgreSQL.", 

209 version="1.4", 

210 ) 

211 

212 return pgdb 

213 

214 colspecs = util.update_copy( 

215 PGDialect.colspecs, 

216 { 

217 Numeric: _PGNumeric, 

218 HSTORE: _PGHStore, 

219 Json: _PGJSON, 

220 JSON: _PGJSON, 

221 JSONB: _PGJSONB, 

222 UUID: _PGUUID, 

223 }, 

224 ) 

225 

226 def __init__(self, **kwargs): 

227 super(PGDialect_pygresql, self).__init__(**kwargs) 

228 try: 

229 version = self.dbapi.version 

230 m = re.match(r"(\d+)\.(\d+)", version) 

231 version = (int(m.group(1)), int(m.group(2))) 

232 except (AttributeError, ValueError, TypeError): 

233 version = (0, 0) 

234 self.dbapi_version = version 

235 if version < (5, 0): 

236 has_native_hstore = has_native_json = has_native_uuid = False 

237 if version != (0, 0): 

238 util.warn( 

239 "PyGreSQL is only fully supported by SQLAlchemy" 

240 " since version 5.0." 

241 ) 

242 else: 

243 self.supports_unicode_statements = True 

244 self.supports_unicode_binds = True 

245 has_native_hstore = has_native_json = has_native_uuid = True 

246 self.has_native_hstore = has_native_hstore 

247 self.has_native_json = has_native_json 

248 self.has_native_uuid = has_native_uuid 

249 

250 def create_connect_args(self, url): 

251 opts = url.translate_connect_args(username="user") 

252 if "port" in opts: 

253 opts["host"] = "%s:%s" % ( 

254 opts.get("host", "").rsplit(":", 1)[0], 

255 opts.pop("port"), 

256 ) 

257 opts.update(url.query) 

258 return [], opts 

259 

260 def is_disconnect(self, e, connection, cursor): 

261 if isinstance(e, self.dbapi.Error): 

262 if not connection: 

263 return False 

264 try: 

265 connection = connection.connection 

266 except AttributeError: 

267 pass 

268 else: 

269 if not connection: 

270 return False 

271 try: 

272 return connection.closed 

273 except AttributeError: # PyGreSQL < 5.0 

274 return connection._cnx is None 

275 return False 

276 

277 

278dialect = PGDialect_pygresql