Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/db/models/functions/datetime.py: 51%
202 statements
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
« prev ^ index » next coverage.py v7.0.5, created at 2023-01-17 06:13 +0000
1from datetime import datetime
3from django.conf import settings
4from django.db.models.expressions import Func
5from django.db.models.fields import (
6 DateField,
7 DateTimeField,
8 DurationField,
9 Field,
10 IntegerField,
11 TimeField,
12)
13from django.db.models.lookups import (
14 Transform,
15 YearExact,
16 YearGt,
17 YearGte,
18 YearLt,
19 YearLte,
20)
21from django.utils import timezone
24class TimezoneMixin:
25 tzinfo = None
27 def get_tzname(self):
28 # Timezone conversions must happen to the input datetime *before*
29 # applying a function. 2015-12-31 23:00:00 -02:00 is stored in the
30 # database as 2016-01-01 01:00:00 +00:00. Any results should be
31 # based on the input datetime not the stored datetime.
32 tzname = None
33 if settings.USE_TZ:
34 if self.tzinfo is None:
35 tzname = timezone.get_current_timezone_name()
36 else:
37 tzname = timezone._get_timezone_name(self.tzinfo)
38 return tzname
41class Extract(TimezoneMixin, Transform):
42 lookup_name = None
43 output_field = IntegerField()
45 def __init__(self, expression, lookup_name=None, tzinfo=None, **extra):
46 if self.lookup_name is None:
47 self.lookup_name = lookup_name
48 if self.lookup_name is None:
49 raise ValueError("lookup_name must be provided")
50 self.tzinfo = tzinfo
51 super().__init__(expression, **extra)
53 def as_sql(self, compiler, connection):
54 sql, params = compiler.compile(self.lhs)
55 lhs_output_field = self.lhs.output_field
56 if isinstance(lhs_output_field, DateTimeField):
57 tzname = self.get_tzname()
58 sql, params = connection.ops.datetime_extract_sql(
59 self.lookup_name, sql, tuple(params), tzname
60 )
61 elif self.tzinfo is not None:
62 raise ValueError("tzinfo can only be used with DateTimeField.")
63 elif isinstance(lhs_output_field, DateField):
64 sql, params = connection.ops.date_extract_sql(
65 self.lookup_name, sql, tuple(params)
66 )
67 elif isinstance(lhs_output_field, TimeField):
68 sql, params = connection.ops.time_extract_sql(
69 self.lookup_name, sql, tuple(params)
70 )
71 elif isinstance(lhs_output_field, DurationField):
72 if not connection.features.has_native_duration_field:
73 raise ValueError(
74 "Extract requires native DurationField database support."
75 )
76 sql, params = connection.ops.time_extract_sql(
77 self.lookup_name, sql, tuple(params)
78 )
79 else:
80 # resolve_expression has already validated the output_field so this
81 # assert should never be hit.
82 assert False, "Tried to Extract from an invalid type."
83 return sql, params
85 def resolve_expression(
86 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False
87 ):
88 copy = super().resolve_expression(
89 query, allow_joins, reuse, summarize, for_save
90 )
91 field = getattr(copy.lhs, "output_field", None)
92 if field is None:
93 return copy
94 if not isinstance(field, (DateField, DateTimeField, TimeField, DurationField)):
95 raise ValueError(
96 "Extract input expression must be DateField, DateTimeField, "
97 "TimeField, or DurationField."
98 )
99 # Passing dates to functions expecting datetimes is most likely a mistake.
100 if type(field) == DateField and copy.lookup_name in (
101 "hour",
102 "minute",
103 "second",
104 ):
105 raise ValueError(
106 "Cannot extract time component '%s' from DateField '%s'."
107 % (copy.lookup_name, field.name)
108 )
109 if isinstance(field, DurationField) and copy.lookup_name in (
110 "year",
111 "iso_year",
112 "month",
113 "week",
114 "week_day",
115 "iso_week_day",
116 "quarter",
117 ):
118 raise ValueError(
119 "Cannot extract component '%s' from DurationField '%s'."
120 % (copy.lookup_name, field.name)
121 )
122 return copy
125class ExtractYear(Extract):
126 lookup_name = "year"
129class ExtractIsoYear(Extract):
130 """Return the ISO-8601 week-numbering year."""
132 lookup_name = "iso_year"
135class ExtractMonth(Extract):
136 lookup_name = "month"
139class ExtractDay(Extract):
140 lookup_name = "day"
143class ExtractWeek(Extract):
144 """
145 Return 1-52 or 53, based on ISO-8601, i.e., Monday is the first of the
146 week.
147 """
149 lookup_name = "week"
152class ExtractWeekDay(Extract):
153 """
154 Return Sunday=1 through Saturday=7.
156 To replicate this in Python: (mydatetime.isoweekday() % 7) + 1
157 """
159 lookup_name = "week_day"
162class ExtractIsoWeekDay(Extract):
163 """Return Monday=1 through Sunday=7, based on ISO-8601."""
165 lookup_name = "iso_week_day"
168class ExtractQuarter(Extract):
169 lookup_name = "quarter"
172class ExtractHour(Extract):
173 lookup_name = "hour"
176class ExtractMinute(Extract):
177 lookup_name = "minute"
180class ExtractSecond(Extract):
181 lookup_name = "second"
184DateField.register_lookup(ExtractYear)
185DateField.register_lookup(ExtractMonth)
186DateField.register_lookup(ExtractDay)
187DateField.register_lookup(ExtractWeekDay)
188DateField.register_lookup(ExtractIsoWeekDay)
189DateField.register_lookup(ExtractWeek)
190DateField.register_lookup(ExtractIsoYear)
191DateField.register_lookup(ExtractQuarter)
193TimeField.register_lookup(ExtractHour)
194TimeField.register_lookup(ExtractMinute)
195TimeField.register_lookup(ExtractSecond)
197DateTimeField.register_lookup(ExtractHour)
198DateTimeField.register_lookup(ExtractMinute)
199DateTimeField.register_lookup(ExtractSecond)
201ExtractYear.register_lookup(YearExact)
202ExtractYear.register_lookup(YearGt)
203ExtractYear.register_lookup(YearGte)
204ExtractYear.register_lookup(YearLt)
205ExtractYear.register_lookup(YearLte)
207ExtractIsoYear.register_lookup(YearExact)
208ExtractIsoYear.register_lookup(YearGt)
209ExtractIsoYear.register_lookup(YearGte)
210ExtractIsoYear.register_lookup(YearLt)
211ExtractIsoYear.register_lookup(YearLte)
214class Now(Func):
215 template = "CURRENT_TIMESTAMP"
216 output_field = DateTimeField()
218 def as_postgresql(self, compiler, connection, **extra_context):
219 # PostgreSQL's CURRENT_TIMESTAMP means "the time at the start of the
220 # transaction". Use STATEMENT_TIMESTAMP to be cross-compatible with
221 # other databases.
222 return self.as_sql(
223 compiler, connection, template="STATEMENT_TIMESTAMP()", **extra_context
224 )
226 def as_mysql(self, compiler, connection, **extra_context):
227 return self.as_sql(
228 compiler, connection, template="CURRENT_TIMESTAMP(6)", **extra_context
229 )
231 def as_sqlite(self, compiler, connection, **extra_context):
232 return self.as_sql(
233 compiler,
234 connection,
235 template="STRFTIME('%%%%Y-%%%%m-%%%%d %%%%H:%%%%M:%%%%f', 'NOW')",
236 **extra_context,
237 )
240class TruncBase(TimezoneMixin, Transform):
241 kind = None
242 tzinfo = None
244 # RemovedInDjango50Warning: when the deprecation ends, remove is_dst
245 # argument.
246 def __init__(
247 self,
248 expression,
249 output_field=None,
250 tzinfo=None,
251 is_dst=timezone.NOT_PASSED,
252 **extra,
253 ):
254 self.tzinfo = tzinfo
255 self.is_dst = is_dst
256 super().__init__(expression, output_field=output_field, **extra)
258 def as_sql(self, compiler, connection):
259 sql, params = compiler.compile(self.lhs)
260 tzname = None
261 if isinstance(self.lhs.output_field, DateTimeField):
262 tzname = self.get_tzname()
263 elif self.tzinfo is not None:
264 raise ValueError("tzinfo can only be used with DateTimeField.")
265 if isinstance(self.output_field, DateTimeField):
266 sql, params = connection.ops.datetime_trunc_sql(
267 self.kind, sql, tuple(params), tzname
268 )
269 elif isinstance(self.output_field, DateField):
270 sql, params = connection.ops.date_trunc_sql(
271 self.kind, sql, tuple(params), tzname
272 )
273 elif isinstance(self.output_field, TimeField):
274 sql, params = connection.ops.time_trunc_sql(
275 self.kind, sql, tuple(params), tzname
276 )
277 else:
278 raise ValueError(
279 "Trunc only valid on DateField, TimeField, or DateTimeField."
280 )
281 return sql, params
283 def resolve_expression(
284 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False
285 ):
286 copy = super().resolve_expression(
287 query, allow_joins, reuse, summarize, for_save
288 )
289 field = copy.lhs.output_field
290 # DateTimeField is a subclass of DateField so this works for both.
291 if not isinstance(field, (DateField, TimeField)):
292 raise TypeError(
293 "%r isn't a DateField, TimeField, or DateTimeField." % field.name
294 )
295 # If self.output_field was None, then accessing the field will trigger
296 # the resolver to assign it to self.lhs.output_field.
297 if not isinstance(copy.output_field, (DateField, DateTimeField, TimeField)):
298 raise ValueError(
299 "output_field must be either DateField, TimeField, or DateTimeField"
300 )
301 # Passing dates or times to functions expecting datetimes is most
302 # likely a mistake.
303 class_output_field = (
304 self.__class__.output_field
305 if isinstance(self.__class__.output_field, Field)
306 else None
307 )
308 output_field = class_output_field or copy.output_field
309 has_explicit_output_field = (
310 class_output_field or field.__class__ is not copy.output_field.__class__
311 )
312 if type(field) == DateField and (
313 isinstance(output_field, DateTimeField)
314 or copy.kind in ("hour", "minute", "second", "time")
315 ):
316 raise ValueError(
317 "Cannot truncate DateField '%s' to %s."
318 % (
319 field.name,
320 output_field.__class__.__name__
321 if has_explicit_output_field
322 else "DateTimeField",
323 )
324 )
325 elif isinstance(field, TimeField) and (
326 isinstance(output_field, DateTimeField)
327 or copy.kind in ("year", "quarter", "month", "week", "day", "date")
328 ):
329 raise ValueError(
330 "Cannot truncate TimeField '%s' to %s."
331 % (
332 field.name,
333 output_field.__class__.__name__
334 if has_explicit_output_field
335 else "DateTimeField",
336 )
337 )
338 return copy
340 def convert_value(self, value, expression, connection):
341 if isinstance(self.output_field, DateTimeField):
342 if not settings.USE_TZ:
343 pass
344 elif value is not None:
345 value = value.replace(tzinfo=None)
346 value = timezone.make_aware(value, self.tzinfo, is_dst=self.is_dst)
347 elif not connection.features.has_zoneinfo_database:
348 raise ValueError(
349 "Database returned an invalid datetime value. Are time "
350 "zone definitions for your database installed?"
351 )
352 elif isinstance(value, datetime):
353 if value is None:
354 pass
355 elif isinstance(self.output_field, DateField):
356 value = value.date()
357 elif isinstance(self.output_field, TimeField):
358 value = value.time()
359 return value
362class Trunc(TruncBase):
364 # RemovedInDjango50Warning: when the deprecation ends, remove is_dst
365 # argument.
366 def __init__(
367 self,
368 expression,
369 kind,
370 output_field=None,
371 tzinfo=None,
372 is_dst=timezone.NOT_PASSED,
373 **extra,
374 ):
375 self.kind = kind
376 super().__init__(
377 expression, output_field=output_field, tzinfo=tzinfo, is_dst=is_dst, **extra
378 )
381class TruncYear(TruncBase):
382 kind = "year"
385class TruncQuarter(TruncBase):
386 kind = "quarter"
389class TruncMonth(TruncBase):
390 kind = "month"
393class TruncWeek(TruncBase):
394 """Truncate to midnight on the Monday of the week."""
396 kind = "week"
399class TruncDay(TruncBase):
400 kind = "day"
403class TruncDate(TruncBase):
404 kind = "date"
405 lookup_name = "date"
406 output_field = DateField()
408 def as_sql(self, compiler, connection):
409 # Cast to date rather than truncate to date.
410 sql, params = compiler.compile(self.lhs)
411 tzname = self.get_tzname()
412 return connection.ops.datetime_cast_date_sql(sql, tuple(params), tzname)
415class TruncTime(TruncBase):
416 kind = "time"
417 lookup_name = "time"
418 output_field = TimeField()
420 def as_sql(self, compiler, connection):
421 # Cast to time rather than truncate to time.
422 sql, params = compiler.compile(self.lhs)
423 tzname = self.get_tzname()
424 return connection.ops.datetime_cast_time_sql(sql, tuple(params), tzname)
427class TruncHour(TruncBase):
428 kind = "hour"
431class TruncMinute(TruncBase):
432 kind = "minute"
435class TruncSecond(TruncBase):
436 kind = "second"
439DateTimeField.register_lookup(TruncDate)
440DateTimeField.register_lookup(TruncTime)