Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/db/backends/utils.py: 25%
157 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
1import datetime
2import decimal
3import functools
4import logging
5import time
6from contextlib import contextmanager
8from django.db import NotSupportedError
9from django.utils.crypto import md5
10from django.utils.dateparse import parse_time
12logger = logging.getLogger("django.db.backends")
15class CursorWrapper:
16 def __init__(self, cursor, db):
17 self.cursor = cursor
18 self.db = db
20 WRAP_ERROR_ATTRS = frozenset(["fetchone", "fetchmany", "fetchall", "nextset"])
22 def __getattr__(self, attr):
23 cursor_attr = getattr(self.cursor, attr)
24 if attr in CursorWrapper.WRAP_ERROR_ATTRS:
25 return self.db.wrap_database_errors(cursor_attr)
26 else:
27 return cursor_attr
29 def __iter__(self):
30 with self.db.wrap_database_errors:
31 yield from self.cursor
33 def __enter__(self):
34 return self
36 def __exit__(self, type, value, traceback):
37 # Close instead of passing through to avoid backend-specific behavior
38 # (#17671). Catch errors liberally because errors in cleanup code
39 # aren't useful.
40 try:
41 self.close()
42 except self.db.Database.Error:
43 pass
45 # The following methods cannot be implemented in __getattr__, because the
46 # code must run when the method is invoked, not just when it is accessed.
48 def callproc(self, procname, params=None, kparams=None):
49 # Keyword parameters for callproc aren't supported in PEP 249, but the
50 # database driver may support them (e.g. cx_Oracle).
51 if kparams is not None and not self.db.features.supports_callproc_kwargs:
52 raise NotSupportedError(
53 "Keyword parameters for callproc are not supported on this "
54 "database backend."
55 )
56 self.db.validate_no_broken_transaction()
57 with self.db.wrap_database_errors:
58 if params is None and kparams is None:
59 return self.cursor.callproc(procname)
60 elif kparams is None:
61 return self.cursor.callproc(procname, params)
62 else:
63 params = params or ()
64 return self.cursor.callproc(procname, params, kparams)
66 def execute(self, sql, params=None):
67 return self._execute_with_wrappers(
68 sql, params, many=False, executor=self._execute
69 )
71 def executemany(self, sql, param_list):
72 return self._execute_with_wrappers(
73 sql, param_list, many=True, executor=self._executemany
74 )
76 def _execute_with_wrappers(self, sql, params, many, executor):
77 context = {"connection": self.db, "cursor": self}
78 for wrapper in reversed(self.db.execute_wrappers):
79 executor = functools.partial(wrapper, executor)
80 return executor(sql, params, many, context)
82 def _execute(self, sql, params, *ignored_wrapper_args):
83 self.db.validate_no_broken_transaction()
84 with self.db.wrap_database_errors:
85 if params is None:
86 # params default might be backend specific.
87 return self.cursor.execute(sql)
88 else:
89 return self.cursor.execute(sql, params)
91 def _executemany(self, sql, param_list, *ignored_wrapper_args):
92 self.db.validate_no_broken_transaction()
93 with self.db.wrap_database_errors:
94 return self.cursor.executemany(sql, param_list)
97class CursorDebugWrapper(CursorWrapper):
99 # XXX callproc isn't instrumented at this time.
101 def execute(self, sql, params=None):
102 with self.debug_sql(sql, params, use_last_executed_query=True):
103 return super().execute(sql, params)
105 def executemany(self, sql, param_list):
106 with self.debug_sql(sql, param_list, many=True):
107 return super().executemany(sql, param_list)
109 @contextmanager
110 def debug_sql(
111 self, sql=None, params=None, use_last_executed_query=False, many=False
112 ):
113 start = time.monotonic()
114 try:
115 yield
116 finally:
117 stop = time.monotonic()
118 duration = stop - start
119 if use_last_executed_query:
120 sql = self.db.ops.last_executed_query(self.cursor, sql, params)
121 try:
122 times = len(params) if many else ""
123 except TypeError:
124 # params could be an iterator.
125 times = "?"
126 self.db.queries_log.append(
127 {
128 "sql": "%s times: %s" % (times, sql) if many else sql,
129 "time": "%.3f" % duration,
130 }
131 )
132 logger.debug(
133 "(%.3f) %s; args=%s; alias=%s",
134 duration,
135 sql,
136 params,
137 self.db.alias,
138 extra={
139 "duration": duration,
140 "sql": sql,
141 "params": params,
142 "alias": self.db.alias,
143 },
144 )
147@contextmanager
148def debug_transaction(connection, sql):
149 start = time.monotonic()
150 try:
151 yield
152 finally:
153 if connection.queries_logged:
154 stop = time.monotonic()
155 duration = stop - start
156 connection.queries_log.append(
157 {
158 "sql": "%s" % sql,
159 "time": "%.3f" % duration,
160 }
161 )
162 logger.debug(
163 "(%.3f) %s; args=%s; alias=%s",
164 duration,
165 sql,
166 None,
167 connection.alias,
168 extra={
169 "duration": duration,
170 "sql": sql,
171 "alias": connection.alias,
172 },
173 )
176def split_tzname_delta(tzname):
177 """
178 Split a time zone name into a 3-tuple of (name, sign, offset).
179 """
180 for sign in ["+", "-"]:
181 if sign in tzname:
182 name, offset = tzname.rsplit(sign, 1)
183 if offset and parse_time(offset):
184 return name, sign, offset
185 return tzname, None, None
188###############################################
189# Converters from database (string) to Python #
190###############################################
193def typecast_date(s):
194 return (
195 datetime.date(*map(int, s.split("-"))) if s else None
196 ) # return None if s is null
199def typecast_time(s): # does NOT store time zone information
200 if not s:
201 return None
202 hour, minutes, seconds = s.split(":")
203 if "." in seconds: # check whether seconds have a fractional part
204 seconds, microseconds = seconds.split(".")
205 else:
206 microseconds = "0"
207 return datetime.time(
208 int(hour), int(minutes), int(seconds), int((microseconds + "000000")[:6])
209 )
212def typecast_timestamp(s): # does NOT store time zone information
213 # "2005-07-29 15:48:00.590358-05"
214 # "2005-07-29 09:56:00-05"
215 if not s:
216 return None
217 if " " not in s:
218 return typecast_date(s)
219 d, t = s.split()
220 # Remove timezone information.
221 if "-" in t:
222 t, _ = t.split("-", 1)
223 elif "+" in t:
224 t, _ = t.split("+", 1)
225 dates = d.split("-")
226 times = t.split(":")
227 seconds = times[2]
228 if "." in seconds: # check whether seconds have a fractional part
229 seconds, microseconds = seconds.split(".")
230 else:
231 microseconds = "0"
232 return datetime.datetime(
233 int(dates[0]),
234 int(dates[1]),
235 int(dates[2]),
236 int(times[0]),
237 int(times[1]),
238 int(seconds),
239 int((microseconds + "000000")[:6]),
240 )
243###############################################
244# Converters from Python to database (string) #
245###############################################
248def split_identifier(identifier):
249 """
250 Split an SQL identifier into a two element tuple of (namespace, name).
252 The identifier could be a table, column, or sequence name might be prefixed
253 by a namespace.
254 """
255 try:
256 namespace, name = identifier.split('"."')
257 except ValueError:
258 namespace, name = "", identifier
259 return namespace.strip('"'), name.strip('"')
262def truncate_name(identifier, length=None, hash_len=4):
263 """
264 Shorten an SQL identifier to a repeatable mangled version with the given
265 length.
267 If a quote stripped name contains a namespace, e.g. USERNAME"."TABLE,
268 truncate the table portion only.
269 """
270 namespace, name = split_identifier(identifier)
272 if length is None or len(name) <= length:
273 return identifier
275 digest = names_digest(name, length=hash_len)
276 return "%s%s%s" % (
277 '%s"."' % namespace if namespace else "",
278 name[: length - hash_len],
279 digest,
280 )
283def names_digest(*args, length):
284 """
285 Generate a 32-bit digest of a set of arguments that can be used to shorten
286 identifying names.
287 """
288 h = md5(usedforsecurity=False)
289 for arg in args:
290 h.update(arg.encode())
291 return h.hexdigest()[:length]
294def format_number(value, max_digits, decimal_places):
295 """
296 Format a number into a string with the requisite number of digits and
297 decimal places.
298 """
299 if value is None:
300 return None
301 context = decimal.getcontext().copy()
302 if max_digits is not None:
303 context.prec = max_digits
304 if decimal_places is not None:
305 value = value.quantize(
306 decimal.Decimal(1).scaleb(-decimal_places), context=context
307 )
308 else:
309 context.traps[decimal.Rounded] = 1
310 value = context.create_decimal(value)
311 return "{:f}".format(value)
314def strip_quotes(table_name):
315 """
316 Strip quotes off of quoted table names to make them safe for use in index
317 names, sequence names, etc. For example '"USER"."TABLE"' (an Oracle naming
318 scheme) becomes 'USER"."TABLE'.
319 """
320 has_quotes = table_name.startswith('"') and table_name.endswith('"')
321 return table_name[1:-1] if has_quotes else table_name