Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/db/backends/utils.py: 25%

157 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-01-17 06:13 +0000

1import datetime 

2import decimal 

3import functools 

4import logging 

5import time 

6from contextlib import contextmanager 

7 

8from django.db import NotSupportedError 

9from django.utils.crypto import md5 

10from django.utils.dateparse import parse_time 

11 

12logger = logging.getLogger("django.db.backends") 

13 

14 

15class CursorWrapper: 

16 def __init__(self, cursor, db): 

17 self.cursor = cursor 

18 self.db = db 

19 

20 WRAP_ERROR_ATTRS = frozenset(["fetchone", "fetchmany", "fetchall", "nextset"]) 

21 

22 def __getattr__(self, attr): 

23 cursor_attr = getattr(self.cursor, attr) 

24 if attr in CursorWrapper.WRAP_ERROR_ATTRS: 

25 return self.db.wrap_database_errors(cursor_attr) 

26 else: 

27 return cursor_attr 

28 

29 def __iter__(self): 

30 with self.db.wrap_database_errors: 

31 yield from self.cursor 

32 

33 def __enter__(self): 

34 return self 

35 

36 def __exit__(self, type, value, traceback): 

37 # Close instead of passing through to avoid backend-specific behavior 

38 # (#17671). Catch errors liberally because errors in cleanup code 

39 # aren't useful. 

40 try: 

41 self.close() 

42 except self.db.Database.Error: 

43 pass 

44 

45 # The following methods cannot be implemented in __getattr__, because the 

46 # code must run when the method is invoked, not just when it is accessed. 

47 

48 def callproc(self, procname, params=None, kparams=None): 

49 # Keyword parameters for callproc aren't supported in PEP 249, but the 

50 # database driver may support them (e.g. cx_Oracle). 

51 if kparams is not None and not self.db.features.supports_callproc_kwargs: 

52 raise NotSupportedError( 

53 "Keyword parameters for callproc are not supported on this " 

54 "database backend." 

55 ) 

56 self.db.validate_no_broken_transaction() 

57 with self.db.wrap_database_errors: 

58 if params is None and kparams is None: 

59 return self.cursor.callproc(procname) 

60 elif kparams is None: 

61 return self.cursor.callproc(procname, params) 

62 else: 

63 params = params or () 

64 return self.cursor.callproc(procname, params, kparams) 

65 

66 def execute(self, sql, params=None): 

67 return self._execute_with_wrappers( 

68 sql, params, many=False, executor=self._execute 

69 ) 

70 

71 def executemany(self, sql, param_list): 

72 return self._execute_with_wrappers( 

73 sql, param_list, many=True, executor=self._executemany 

74 ) 

75 

76 def _execute_with_wrappers(self, sql, params, many, executor): 

77 context = {"connection": self.db, "cursor": self} 

78 for wrapper in reversed(self.db.execute_wrappers): 

79 executor = functools.partial(wrapper, executor) 

80 return executor(sql, params, many, context) 

81 

82 def _execute(self, sql, params, *ignored_wrapper_args): 

83 self.db.validate_no_broken_transaction() 

84 with self.db.wrap_database_errors: 

85 if params is None: 

86 # params default might be backend specific. 

87 return self.cursor.execute(sql) 

88 else: 

89 return self.cursor.execute(sql, params) 

90 

91 def _executemany(self, sql, param_list, *ignored_wrapper_args): 

92 self.db.validate_no_broken_transaction() 

93 with self.db.wrap_database_errors: 

94 return self.cursor.executemany(sql, param_list) 

95 

96 

97class CursorDebugWrapper(CursorWrapper): 

98 

99 # XXX callproc isn't instrumented at this time. 

100 

101 def execute(self, sql, params=None): 

102 with self.debug_sql(sql, params, use_last_executed_query=True): 

103 return super().execute(sql, params) 

104 

105 def executemany(self, sql, param_list): 

106 with self.debug_sql(sql, param_list, many=True): 

107 return super().executemany(sql, param_list) 

108 

109 @contextmanager 

110 def debug_sql( 

111 self, sql=None, params=None, use_last_executed_query=False, many=False 

112 ): 

113 start = time.monotonic() 

114 try: 

115 yield 

116 finally: 

117 stop = time.monotonic() 

118 duration = stop - start 

119 if use_last_executed_query: 

120 sql = self.db.ops.last_executed_query(self.cursor, sql, params) 

121 try: 

122 times = len(params) if many else "" 

123 except TypeError: 

124 # params could be an iterator. 

125 times = "?" 

126 self.db.queries_log.append( 

127 { 

128 "sql": "%s times: %s" % (times, sql) if many else sql, 

129 "time": "%.3f" % duration, 

130 } 

131 ) 

132 logger.debug( 

133 "(%.3f) %s; args=%s; alias=%s", 

134 duration, 

135 sql, 

136 params, 

137 self.db.alias, 

138 extra={ 

139 "duration": duration, 

140 "sql": sql, 

141 "params": params, 

142 "alias": self.db.alias, 

143 }, 

144 ) 

145 

146 

147@contextmanager 

148def debug_transaction(connection, sql): 

149 start = time.monotonic() 

150 try: 

151 yield 

152 finally: 

153 if connection.queries_logged: 

154 stop = time.monotonic() 

155 duration = stop - start 

156 connection.queries_log.append( 

157 { 

158 "sql": "%s" % sql, 

159 "time": "%.3f" % duration, 

160 } 

161 ) 

162 logger.debug( 

163 "(%.3f) %s; args=%s; alias=%s", 

164 duration, 

165 sql, 

166 None, 

167 connection.alias, 

168 extra={ 

169 "duration": duration, 

170 "sql": sql, 

171 "alias": connection.alias, 

172 }, 

173 ) 

174 

175 

176def split_tzname_delta(tzname): 

177 """ 

178 Split a time zone name into a 3-tuple of (name, sign, offset). 

179 """ 

180 for sign in ["+", "-"]: 

181 if sign in tzname: 

182 name, offset = tzname.rsplit(sign, 1) 

183 if offset and parse_time(offset): 

184 return name, sign, offset 

185 return tzname, None, None 

186 

187 

188############################################### 

189# Converters from database (string) to Python # 

190############################################### 

191 

192 

193def typecast_date(s): 

194 return ( 

195 datetime.date(*map(int, s.split("-"))) if s else None 

196 ) # return None if s is null 

197 

198 

199def typecast_time(s): # does NOT store time zone information 

200 if not s: 

201 return None 

202 hour, minutes, seconds = s.split(":") 

203 if "." in seconds: # check whether seconds have a fractional part 

204 seconds, microseconds = seconds.split(".") 

205 else: 

206 microseconds = "0" 

207 return datetime.time( 

208 int(hour), int(minutes), int(seconds), int((microseconds + "000000")[:6]) 

209 ) 

210 

211 

212def typecast_timestamp(s): # does NOT store time zone information 

213 # "2005-07-29 15:48:00.590358-05" 

214 # "2005-07-29 09:56:00-05" 

215 if not s: 

216 return None 

217 if " " not in s: 

218 return typecast_date(s) 

219 d, t = s.split() 

220 # Remove timezone information. 

221 if "-" in t: 

222 t, _ = t.split("-", 1) 

223 elif "+" in t: 

224 t, _ = t.split("+", 1) 

225 dates = d.split("-") 

226 times = t.split(":") 

227 seconds = times[2] 

228 if "." in seconds: # check whether seconds have a fractional part 

229 seconds, microseconds = seconds.split(".") 

230 else: 

231 microseconds = "0" 

232 return datetime.datetime( 

233 int(dates[0]), 

234 int(dates[1]), 

235 int(dates[2]), 

236 int(times[0]), 

237 int(times[1]), 

238 int(seconds), 

239 int((microseconds + "000000")[:6]), 

240 ) 

241 

242 

243############################################### 

244# Converters from Python to database (string) # 

245############################################### 

246 

247 

248def split_identifier(identifier): 

249 """ 

250 Split an SQL identifier into a two element tuple of (namespace, name). 

251 

252 The identifier could be a table, column, or sequence name might be prefixed 

253 by a namespace. 

254 """ 

255 try: 

256 namespace, name = identifier.split('"."') 

257 except ValueError: 

258 namespace, name = "", identifier 

259 return namespace.strip('"'), name.strip('"') 

260 

261 

262def truncate_name(identifier, length=None, hash_len=4): 

263 """ 

264 Shorten an SQL identifier to a repeatable mangled version with the given 

265 length. 

266 

267 If a quote stripped name contains a namespace, e.g. USERNAME"."TABLE, 

268 truncate the table portion only. 

269 """ 

270 namespace, name = split_identifier(identifier) 

271 

272 if length is None or len(name) <= length: 

273 return identifier 

274 

275 digest = names_digest(name, length=hash_len) 

276 return "%s%s%s" % ( 

277 '%s"."' % namespace if namespace else "", 

278 name[: length - hash_len], 

279 digest, 

280 ) 

281 

282 

283def names_digest(*args, length): 

284 """ 

285 Generate a 32-bit digest of a set of arguments that can be used to shorten 

286 identifying names. 

287 """ 

288 h = md5(usedforsecurity=False) 

289 for arg in args: 

290 h.update(arg.encode()) 

291 return h.hexdigest()[:length] 

292 

293 

294def format_number(value, max_digits, decimal_places): 

295 """ 

296 Format a number into a string with the requisite number of digits and 

297 decimal places. 

298 """ 

299 if value is None: 

300 return None 

301 context = decimal.getcontext().copy() 

302 if max_digits is not None: 

303 context.prec = max_digits 

304 if decimal_places is not None: 

305 value = value.quantize( 

306 decimal.Decimal(1).scaleb(-decimal_places), context=context 

307 ) 

308 else: 

309 context.traps[decimal.Rounded] = 1 

310 value = context.create_decimal(value) 

311 return "{:f}".format(value) 

312 

313 

314def strip_quotes(table_name): 

315 """ 

316 Strip quotes off of quoted table names to make them safe for use in index 

317 names, sequence names, etc. For example '"USER"."TABLE"' (an Oracle naming 

318 scheme) becomes 'USER"."TABLE'. 

319 """ 

320 has_quotes = table_name.startswith('"') and table_name.endswith('"') 

321 return table_name[1:-1] if has_quotes else table_name