Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/django/db/models/functions/datetime.py: 52%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

203 statements  

1from datetime import datetime 

2 

3from django.conf import settings 

4from django.db.models.expressions import Func 

5from django.db.models.fields import ( 

6 DateField, 

7 DateTimeField, 

8 DurationField, 

9 Field, 

10 IntegerField, 

11 TimeField, 

12) 

13from django.db.models.lookups import ( 

14 Transform, 

15 YearExact, 

16 YearGt, 

17 YearGte, 

18 YearLt, 

19 YearLte, 

20) 

21from django.utils import timezone 

22 

23 

24class TimezoneMixin: 

25 tzinfo = None 

26 

27 def get_tzname(self): 

28 # Timezone conversions must happen to the input datetime *before* 

29 # applying a function. 2015-12-31 23:00:00 -02:00 is stored in the 

30 # database as 2016-01-01 01:00:00 +00:00. Any results should be 

31 # based on the input datetime not the stored datetime. 

32 tzname = None 

33 if settings.USE_TZ: 

34 if self.tzinfo is None: 

35 tzname = timezone.get_current_timezone_name() 

36 else: 

37 tzname = timezone._get_timezone_name(self.tzinfo) 

38 return tzname 

39 

40 

41class Extract(TimezoneMixin, Transform): 

42 lookup_name = None 

43 output_field = IntegerField() 

44 

45 def __init__(self, expression, lookup_name=None, tzinfo=None, **extra): 

46 if self.lookup_name is None: 

47 self.lookup_name = lookup_name 

48 if self.lookup_name is None: 

49 raise ValueError("lookup_name must be provided") 

50 self.tzinfo = tzinfo 

51 super().__init__(expression, **extra) 

52 

53 def as_sql(self, compiler, connection): 

54 sql, params = compiler.compile(self.lhs) 

55 lhs_output_field = self.lhs.output_field 

56 if isinstance(lhs_output_field, DateTimeField): 

57 tzname = self.get_tzname() 

58 sql, params = connection.ops.datetime_extract_sql( 

59 self.lookup_name, sql, tuple(params), tzname 

60 ) 

61 elif self.tzinfo is not None: 

62 raise ValueError("tzinfo can only be used with DateTimeField.") 

63 elif isinstance(lhs_output_field, DateField): 

64 sql, params = connection.ops.date_extract_sql( 

65 self.lookup_name, sql, tuple(params) 

66 ) 

67 elif isinstance(lhs_output_field, TimeField): 

68 sql, params = connection.ops.time_extract_sql( 

69 self.lookup_name, sql, tuple(params) 

70 ) 

71 elif isinstance(lhs_output_field, DurationField): 

72 if not connection.features.has_native_duration_field: 

73 raise ValueError( 

74 "Extract requires native DurationField database support." 

75 ) 

76 sql, params = connection.ops.time_extract_sql( 

77 self.lookup_name, sql, tuple(params) 

78 ) 

79 else: 

80 # resolve_expression has already validated the output_field so this 

81 # assert should never be hit. 

82 assert False, "Tried to Extract from an invalid type." 

83 return sql, params 

84 

85 def resolve_expression( 

86 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False 

87 ): 

88 copy = super().resolve_expression( 

89 query, allow_joins, reuse, summarize, for_save 

90 ) 

91 field = getattr(copy.lhs, "output_field", None) 

92 if field is None: 

93 return copy 

94 if not isinstance(field, (DateField, DateTimeField, TimeField, DurationField)): 

95 raise ValueError( 

96 "Extract input expression must be DateField, DateTimeField, " 

97 "TimeField, or DurationField." 

98 ) 

99 # Passing dates to functions expecting datetimes is most likely a mistake. 

100 if type(field) is DateField and copy.lookup_name in ( 

101 "hour", 

102 "minute", 

103 "second", 

104 ): 

105 raise ValueError( 

106 "Cannot extract time component '%s' from DateField '%s'." 

107 % (copy.lookup_name, field.name) 

108 ) 

109 if isinstance(field, DurationField) and copy.lookup_name in ( 

110 "year", 

111 "iso_year", 

112 "month", 

113 "week", 

114 "week_day", 

115 "iso_week_day", 

116 "quarter", 

117 ): 

118 raise ValueError( 

119 "Cannot extract component '%s' from DurationField '%s'." 

120 % (copy.lookup_name, field.name) 

121 ) 

122 return copy 

123 

124 

125class ExtractYear(Extract): 

126 lookup_name = "year" 

127 

128 

129class ExtractIsoYear(Extract): 

130 """Return the ISO-8601 week-numbering year.""" 

131 

132 lookup_name = "iso_year" 

133 

134 

135class ExtractMonth(Extract): 

136 lookup_name = "month" 

137 

138 

139class ExtractDay(Extract): 

140 lookup_name = "day" 

141 

142 

143class ExtractWeek(Extract): 

144 """ 

145 Return 1-52 or 53, based on ISO-8601, i.e., Monday is the first of the 

146 week. 

147 """ 

148 

149 lookup_name = "week" 

150 

151 

152class ExtractWeekDay(Extract): 

153 """ 

154 Return Sunday=1 through Saturday=7. 

155 

156 To replicate this in Python: (mydatetime.isoweekday() % 7) + 1 

157 """ 

158 

159 lookup_name = "week_day" 

160 

161 

162class ExtractIsoWeekDay(Extract): 

163 """Return Monday=1 through Sunday=7, based on ISO-8601.""" 

164 

165 lookup_name = "iso_week_day" 

166 

167 

168class ExtractQuarter(Extract): 

169 lookup_name = "quarter" 

170 

171 

172class ExtractHour(Extract): 

173 lookup_name = "hour" 

174 

175 

176class ExtractMinute(Extract): 

177 lookup_name = "minute" 

178 

179 

180class ExtractSecond(Extract): 

181 lookup_name = "second" 

182 

183 

184DateField.register_lookup(ExtractYear) 

185DateField.register_lookup(ExtractMonth) 

186DateField.register_lookup(ExtractDay) 

187DateField.register_lookup(ExtractWeekDay) 

188DateField.register_lookup(ExtractIsoWeekDay) 

189DateField.register_lookup(ExtractWeek) 

190DateField.register_lookup(ExtractIsoYear) 

191DateField.register_lookup(ExtractQuarter) 

192 

193TimeField.register_lookup(ExtractHour) 

194TimeField.register_lookup(ExtractMinute) 

195TimeField.register_lookup(ExtractSecond) 

196 

197DateTimeField.register_lookup(ExtractHour) 

198DateTimeField.register_lookup(ExtractMinute) 

199DateTimeField.register_lookup(ExtractSecond) 

200 

201ExtractYear.register_lookup(YearExact) 

202ExtractYear.register_lookup(YearGt) 

203ExtractYear.register_lookup(YearGte) 

204ExtractYear.register_lookup(YearLt) 

205ExtractYear.register_lookup(YearLte) 

206 

207ExtractIsoYear.register_lookup(YearExact) 

208ExtractIsoYear.register_lookup(YearGt) 

209ExtractIsoYear.register_lookup(YearGte) 

210ExtractIsoYear.register_lookup(YearLt) 

211ExtractIsoYear.register_lookup(YearLte) 

212 

213 

214class Now(Func): 

215 template = "CURRENT_TIMESTAMP" 

216 output_field = DateTimeField() 

217 

218 def as_postgresql(self, compiler, connection, **extra_context): 

219 # PostgreSQL's CURRENT_TIMESTAMP means "the time at the start of the 

220 # transaction". Use STATEMENT_TIMESTAMP to be cross-compatible with 

221 # other databases. 

222 return self.as_sql( 

223 compiler, connection, template="STATEMENT_TIMESTAMP()", **extra_context 

224 ) 

225 

226 def as_mysql(self, compiler, connection, **extra_context): 

227 return self.as_sql( 

228 compiler, connection, template="CURRENT_TIMESTAMP(6)", **extra_context 

229 ) 

230 

231 def as_sqlite(self, compiler, connection, **extra_context): 

232 return self.as_sql( 

233 compiler, 

234 connection, 

235 template="STRFTIME('%%%%Y-%%%%m-%%%%d %%%%H:%%%%M:%%%%f', 'NOW')", 

236 **extra_context, 

237 ) 

238 

239 def as_oracle(self, compiler, connection, **extra_context): 

240 return self.as_sql( 

241 compiler, connection, template="LOCALTIMESTAMP", **extra_context 

242 ) 

243 

244 

245class TruncBase(TimezoneMixin, Transform): 

246 kind = None 

247 tzinfo = None 

248 

249 def __init__( 

250 self, 

251 expression, 

252 output_field=None, 

253 tzinfo=None, 

254 **extra, 

255 ): 

256 self.tzinfo = tzinfo 

257 super().__init__(expression, output_field=output_field, **extra) 

258 

259 def as_sql(self, compiler, connection): 

260 sql, params = compiler.compile(self.lhs) 

261 tzname = None 

262 if isinstance(self.lhs.output_field, DateTimeField): 

263 tzname = self.get_tzname() 

264 elif self.tzinfo is not None: 

265 raise ValueError("tzinfo can only be used with DateTimeField.") 

266 if isinstance(self.output_field, DateTimeField): 

267 sql, params = connection.ops.datetime_trunc_sql( 

268 self.kind, sql, tuple(params), tzname 

269 ) 

270 elif isinstance(self.output_field, DateField): 

271 sql, params = connection.ops.date_trunc_sql( 

272 self.kind, sql, tuple(params), tzname 

273 ) 

274 elif isinstance(self.output_field, TimeField): 

275 sql, params = connection.ops.time_trunc_sql( 

276 self.kind, sql, tuple(params), tzname 

277 ) 

278 else: 

279 raise ValueError( 

280 "Trunc only valid on DateField, TimeField, or DateTimeField." 

281 ) 

282 return sql, params 

283 

284 def resolve_expression( 

285 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False 

286 ): 

287 copy = super().resolve_expression( 

288 query, allow_joins, reuse, summarize, for_save 

289 ) 

290 field = copy.lhs.output_field 

291 # DateTimeField is a subclass of DateField so this works for both. 

292 if not isinstance(field, (DateField, TimeField)): 

293 raise TypeError( 

294 "%r isn't a DateField, TimeField, or DateTimeField." % field.name 

295 ) 

296 # If self.output_field was None, then accessing the field will trigger 

297 # the resolver to assign it to self.lhs.output_field. 

298 if not isinstance(copy.output_field, (DateField, DateTimeField, TimeField)): 

299 raise ValueError( 

300 "output_field must be either DateField, TimeField, or DateTimeField" 

301 ) 

302 # Passing dates or times to functions expecting datetimes is most 

303 # likely a mistake. 

304 class_output_field = ( 

305 self.__class__.output_field 

306 if isinstance(self.__class__.output_field, Field) 

307 else None 

308 ) 

309 output_field = class_output_field or copy.output_field 

310 has_explicit_output_field = ( 

311 class_output_field or field.__class__ is not copy.output_field.__class__ 

312 ) 

313 if type(field) is DateField and ( 

314 isinstance(output_field, DateTimeField) 

315 or copy.kind in ("hour", "minute", "second", "time") 

316 ): 

317 raise ValueError( 

318 "Cannot truncate DateField '%s' to %s." 

319 % ( 

320 field.name, 

321 ( 

322 output_field.__class__.__name__ 

323 if has_explicit_output_field 

324 else "DateTimeField" 

325 ), 

326 ) 

327 ) 

328 elif isinstance(field, TimeField) and ( 

329 isinstance(output_field, DateTimeField) 

330 or copy.kind in ("year", "quarter", "month", "week", "day", "date") 

331 ): 

332 raise ValueError( 

333 "Cannot truncate TimeField '%s' to %s." 

334 % ( 

335 field.name, 

336 ( 

337 output_field.__class__.__name__ 

338 if has_explicit_output_field 

339 else "DateTimeField" 

340 ), 

341 ) 

342 ) 

343 return copy 

344 

345 def convert_value(self, value, expression, connection): 

346 if isinstance(self.output_field, DateTimeField): 

347 if not settings.USE_TZ: 

348 pass 

349 elif value is not None: 

350 value = value.replace(tzinfo=None) 

351 value = timezone.make_aware(value, self.tzinfo) 

352 elif not connection.features.has_zoneinfo_database: 

353 raise ValueError( 

354 "Database returned an invalid datetime value. Are time " 

355 "zone definitions for your database installed?" 

356 ) 

357 elif isinstance(value, datetime): 

358 if value is None: 

359 pass 

360 elif isinstance(self.output_field, DateField): 

361 value = value.date() 

362 elif isinstance(self.output_field, TimeField): 

363 value = value.time() 

364 return value 

365 

366 

367class Trunc(TruncBase): 

368 def __init__( 

369 self, 

370 expression, 

371 kind, 

372 output_field=None, 

373 tzinfo=None, 

374 **extra, 

375 ): 

376 self.kind = kind 

377 super().__init__(expression, output_field=output_field, tzinfo=tzinfo, **extra) 

378 

379 

380class TruncYear(TruncBase): 

381 kind = "year" 

382 

383 

384class TruncQuarter(TruncBase): 

385 kind = "quarter" 

386 

387 

388class TruncMonth(TruncBase): 

389 kind = "month" 

390 

391 

392class TruncWeek(TruncBase): 

393 """Truncate to midnight on the Monday of the week.""" 

394 

395 kind = "week" 

396 

397 

398class TruncDay(TruncBase): 

399 kind = "day" 

400 

401 

402class TruncDate(TruncBase): 

403 kind = "date" 

404 lookup_name = "date" 

405 output_field = DateField() 

406 

407 def as_sql(self, compiler, connection): 

408 # Cast to date rather than truncate to date. 

409 sql, params = compiler.compile(self.lhs) 

410 tzname = self.get_tzname() 

411 return connection.ops.datetime_cast_date_sql(sql, tuple(params), tzname) 

412 

413 

414class TruncTime(TruncBase): 

415 kind = "time" 

416 lookup_name = "time" 

417 output_field = TimeField() 

418 

419 def as_sql(self, compiler, connection): 

420 # Cast to time rather than truncate to time. 

421 sql, params = compiler.compile(self.lhs) 

422 tzname = self.get_tzname() 

423 return connection.ops.datetime_cast_time_sql(sql, tuple(params), tzname) 

424 

425 

426class TruncHour(TruncBase): 

427 kind = "hour" 

428 

429 

430class TruncMinute(TruncBase): 

431 kind = "minute" 

432 

433 

434class TruncSecond(TruncBase): 

435 kind = "second" 

436 

437 

438DateTimeField.register_lookup(TruncDate) 

439DateTimeField.register_lookup(TruncTime)