Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/django/db/models/functions/datetime.py: 51%

202 statements  

« prev     ^ index     » next       coverage.py v7.0.5, created at 2023-01-17 06:13 +0000

1from datetime import datetime 

2 

3from django.conf import settings 

4from django.db.models.expressions import Func 

5from django.db.models.fields import ( 

6 DateField, 

7 DateTimeField, 

8 DurationField, 

9 Field, 

10 IntegerField, 

11 TimeField, 

12) 

13from django.db.models.lookups import ( 

14 Transform, 

15 YearExact, 

16 YearGt, 

17 YearGte, 

18 YearLt, 

19 YearLte, 

20) 

21from django.utils import timezone 

22 

23 

24class TimezoneMixin: 

25 tzinfo = None 

26 

27 def get_tzname(self): 

28 # Timezone conversions must happen to the input datetime *before* 

29 # applying a function. 2015-12-31 23:00:00 -02:00 is stored in the 

30 # database as 2016-01-01 01:00:00 +00:00. Any results should be 

31 # based on the input datetime not the stored datetime. 

32 tzname = None 

33 if settings.USE_TZ: 

34 if self.tzinfo is None: 

35 tzname = timezone.get_current_timezone_name() 

36 else: 

37 tzname = timezone._get_timezone_name(self.tzinfo) 

38 return tzname 

39 

40 

41class Extract(TimezoneMixin, Transform): 

42 lookup_name = None 

43 output_field = IntegerField() 

44 

45 def __init__(self, expression, lookup_name=None, tzinfo=None, **extra): 

46 if self.lookup_name is None: 

47 self.lookup_name = lookup_name 

48 if self.lookup_name is None: 

49 raise ValueError("lookup_name must be provided") 

50 self.tzinfo = tzinfo 

51 super().__init__(expression, **extra) 

52 

53 def as_sql(self, compiler, connection): 

54 sql, params = compiler.compile(self.lhs) 

55 lhs_output_field = self.lhs.output_field 

56 if isinstance(lhs_output_field, DateTimeField): 

57 tzname = self.get_tzname() 

58 sql, params = connection.ops.datetime_extract_sql( 

59 self.lookup_name, sql, tuple(params), tzname 

60 ) 

61 elif self.tzinfo is not None: 

62 raise ValueError("tzinfo can only be used with DateTimeField.") 

63 elif isinstance(lhs_output_field, DateField): 

64 sql, params = connection.ops.date_extract_sql( 

65 self.lookup_name, sql, tuple(params) 

66 ) 

67 elif isinstance(lhs_output_field, TimeField): 

68 sql, params = connection.ops.time_extract_sql( 

69 self.lookup_name, sql, tuple(params) 

70 ) 

71 elif isinstance(lhs_output_field, DurationField): 

72 if not connection.features.has_native_duration_field: 

73 raise ValueError( 

74 "Extract requires native DurationField database support." 

75 ) 

76 sql, params = connection.ops.time_extract_sql( 

77 self.lookup_name, sql, tuple(params) 

78 ) 

79 else: 

80 # resolve_expression has already validated the output_field so this 

81 # assert should never be hit. 

82 assert False, "Tried to Extract from an invalid type." 

83 return sql, params 

84 

85 def resolve_expression( 

86 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False 

87 ): 

88 copy = super().resolve_expression( 

89 query, allow_joins, reuse, summarize, for_save 

90 ) 

91 field = getattr(copy.lhs, "output_field", None) 

92 if field is None: 

93 return copy 

94 if not isinstance(field, (DateField, DateTimeField, TimeField, DurationField)): 

95 raise ValueError( 

96 "Extract input expression must be DateField, DateTimeField, " 

97 "TimeField, or DurationField." 

98 ) 

99 # Passing dates to functions expecting datetimes is most likely a mistake. 

100 if type(field) == DateField and copy.lookup_name in ( 

101 "hour", 

102 "minute", 

103 "second", 

104 ): 

105 raise ValueError( 

106 "Cannot extract time component '%s' from DateField '%s'." 

107 % (copy.lookup_name, field.name) 

108 ) 

109 if isinstance(field, DurationField) and copy.lookup_name in ( 

110 "year", 

111 "iso_year", 

112 "month", 

113 "week", 

114 "week_day", 

115 "iso_week_day", 

116 "quarter", 

117 ): 

118 raise ValueError( 

119 "Cannot extract component '%s' from DurationField '%s'." 

120 % (copy.lookup_name, field.name) 

121 ) 

122 return copy 

123 

124 

125class ExtractYear(Extract): 

126 lookup_name = "year" 

127 

128 

129class ExtractIsoYear(Extract): 

130 """Return the ISO-8601 week-numbering year.""" 

131 

132 lookup_name = "iso_year" 

133 

134 

135class ExtractMonth(Extract): 

136 lookup_name = "month" 

137 

138 

139class ExtractDay(Extract): 

140 lookup_name = "day" 

141 

142 

143class ExtractWeek(Extract): 

144 """ 

145 Return 1-52 or 53, based on ISO-8601, i.e., Monday is the first of the 

146 week. 

147 """ 

148 

149 lookup_name = "week" 

150 

151 

152class ExtractWeekDay(Extract): 

153 """ 

154 Return Sunday=1 through Saturday=7. 

155 

156 To replicate this in Python: (mydatetime.isoweekday() % 7) + 1 

157 """ 

158 

159 lookup_name = "week_day" 

160 

161 

162class ExtractIsoWeekDay(Extract): 

163 """Return Monday=1 through Sunday=7, based on ISO-8601.""" 

164 

165 lookup_name = "iso_week_day" 

166 

167 

168class ExtractQuarter(Extract): 

169 lookup_name = "quarter" 

170 

171 

172class ExtractHour(Extract): 

173 lookup_name = "hour" 

174 

175 

176class ExtractMinute(Extract): 

177 lookup_name = "minute" 

178 

179 

180class ExtractSecond(Extract): 

181 lookup_name = "second" 

182 

183 

184DateField.register_lookup(ExtractYear) 

185DateField.register_lookup(ExtractMonth) 

186DateField.register_lookup(ExtractDay) 

187DateField.register_lookup(ExtractWeekDay) 

188DateField.register_lookup(ExtractIsoWeekDay) 

189DateField.register_lookup(ExtractWeek) 

190DateField.register_lookup(ExtractIsoYear) 

191DateField.register_lookup(ExtractQuarter) 

192 

193TimeField.register_lookup(ExtractHour) 

194TimeField.register_lookup(ExtractMinute) 

195TimeField.register_lookup(ExtractSecond) 

196 

197DateTimeField.register_lookup(ExtractHour) 

198DateTimeField.register_lookup(ExtractMinute) 

199DateTimeField.register_lookup(ExtractSecond) 

200 

201ExtractYear.register_lookup(YearExact) 

202ExtractYear.register_lookup(YearGt) 

203ExtractYear.register_lookup(YearGte) 

204ExtractYear.register_lookup(YearLt) 

205ExtractYear.register_lookup(YearLte) 

206 

207ExtractIsoYear.register_lookup(YearExact) 

208ExtractIsoYear.register_lookup(YearGt) 

209ExtractIsoYear.register_lookup(YearGte) 

210ExtractIsoYear.register_lookup(YearLt) 

211ExtractIsoYear.register_lookup(YearLte) 

212 

213 

214class Now(Func): 

215 template = "CURRENT_TIMESTAMP" 

216 output_field = DateTimeField() 

217 

218 def as_postgresql(self, compiler, connection, **extra_context): 

219 # PostgreSQL's CURRENT_TIMESTAMP means "the time at the start of the 

220 # transaction". Use STATEMENT_TIMESTAMP to be cross-compatible with 

221 # other databases. 

222 return self.as_sql( 

223 compiler, connection, template="STATEMENT_TIMESTAMP()", **extra_context 

224 ) 

225 

226 def as_mysql(self, compiler, connection, **extra_context): 

227 return self.as_sql( 

228 compiler, connection, template="CURRENT_TIMESTAMP(6)", **extra_context 

229 ) 

230 

231 def as_sqlite(self, compiler, connection, **extra_context): 

232 return self.as_sql( 

233 compiler, 

234 connection, 

235 template="STRFTIME('%%%%Y-%%%%m-%%%%d %%%%H:%%%%M:%%%%f', 'NOW')", 

236 **extra_context, 

237 ) 

238 

239 

240class TruncBase(TimezoneMixin, Transform): 

241 kind = None 

242 tzinfo = None 

243 

244 # RemovedInDjango50Warning: when the deprecation ends, remove is_dst 

245 # argument. 

246 def __init__( 

247 self, 

248 expression, 

249 output_field=None, 

250 tzinfo=None, 

251 is_dst=timezone.NOT_PASSED, 

252 **extra, 

253 ): 

254 self.tzinfo = tzinfo 

255 self.is_dst = is_dst 

256 super().__init__(expression, output_field=output_field, **extra) 

257 

258 def as_sql(self, compiler, connection): 

259 sql, params = compiler.compile(self.lhs) 

260 tzname = None 

261 if isinstance(self.lhs.output_field, DateTimeField): 

262 tzname = self.get_tzname() 

263 elif self.tzinfo is not None: 

264 raise ValueError("tzinfo can only be used with DateTimeField.") 

265 if isinstance(self.output_field, DateTimeField): 

266 sql, params = connection.ops.datetime_trunc_sql( 

267 self.kind, sql, tuple(params), tzname 

268 ) 

269 elif isinstance(self.output_field, DateField): 

270 sql, params = connection.ops.date_trunc_sql( 

271 self.kind, sql, tuple(params), tzname 

272 ) 

273 elif isinstance(self.output_field, TimeField): 

274 sql, params = connection.ops.time_trunc_sql( 

275 self.kind, sql, tuple(params), tzname 

276 ) 

277 else: 

278 raise ValueError( 

279 "Trunc only valid on DateField, TimeField, or DateTimeField." 

280 ) 

281 return sql, params 

282 

283 def resolve_expression( 

284 self, query=None, allow_joins=True, reuse=None, summarize=False, for_save=False 

285 ): 

286 copy = super().resolve_expression( 

287 query, allow_joins, reuse, summarize, for_save 

288 ) 

289 field = copy.lhs.output_field 

290 # DateTimeField is a subclass of DateField so this works for both. 

291 if not isinstance(field, (DateField, TimeField)): 

292 raise TypeError( 

293 "%r isn't a DateField, TimeField, or DateTimeField." % field.name 

294 ) 

295 # If self.output_field was None, then accessing the field will trigger 

296 # the resolver to assign it to self.lhs.output_field. 

297 if not isinstance(copy.output_field, (DateField, DateTimeField, TimeField)): 

298 raise ValueError( 

299 "output_field must be either DateField, TimeField, or DateTimeField" 

300 ) 

301 # Passing dates or times to functions expecting datetimes is most 

302 # likely a mistake. 

303 class_output_field = ( 

304 self.__class__.output_field 

305 if isinstance(self.__class__.output_field, Field) 

306 else None 

307 ) 

308 output_field = class_output_field or copy.output_field 

309 has_explicit_output_field = ( 

310 class_output_field or field.__class__ is not copy.output_field.__class__ 

311 ) 

312 if type(field) == DateField and ( 

313 isinstance(output_field, DateTimeField) 

314 or copy.kind in ("hour", "minute", "second", "time") 

315 ): 

316 raise ValueError( 

317 "Cannot truncate DateField '%s' to %s." 

318 % ( 

319 field.name, 

320 output_field.__class__.__name__ 

321 if has_explicit_output_field 

322 else "DateTimeField", 

323 ) 

324 ) 

325 elif isinstance(field, TimeField) and ( 

326 isinstance(output_field, DateTimeField) 

327 or copy.kind in ("year", "quarter", "month", "week", "day", "date") 

328 ): 

329 raise ValueError( 

330 "Cannot truncate TimeField '%s' to %s." 

331 % ( 

332 field.name, 

333 output_field.__class__.__name__ 

334 if has_explicit_output_field 

335 else "DateTimeField", 

336 ) 

337 ) 

338 return copy 

339 

340 def convert_value(self, value, expression, connection): 

341 if isinstance(self.output_field, DateTimeField): 

342 if not settings.USE_TZ: 

343 pass 

344 elif value is not None: 

345 value = value.replace(tzinfo=None) 

346 value = timezone.make_aware(value, self.tzinfo, is_dst=self.is_dst) 

347 elif not connection.features.has_zoneinfo_database: 

348 raise ValueError( 

349 "Database returned an invalid datetime value. Are time " 

350 "zone definitions for your database installed?" 

351 ) 

352 elif isinstance(value, datetime): 

353 if value is None: 

354 pass 

355 elif isinstance(self.output_field, DateField): 

356 value = value.date() 

357 elif isinstance(self.output_field, TimeField): 

358 value = value.time() 

359 return value 

360 

361 

362class Trunc(TruncBase): 

363 

364 # RemovedInDjango50Warning: when the deprecation ends, remove is_dst 

365 # argument. 

366 def __init__( 

367 self, 

368 expression, 

369 kind, 

370 output_field=None, 

371 tzinfo=None, 

372 is_dst=timezone.NOT_PASSED, 

373 **extra, 

374 ): 

375 self.kind = kind 

376 super().__init__( 

377 expression, output_field=output_field, tzinfo=tzinfo, is_dst=is_dst, **extra 

378 ) 

379 

380 

381class TruncYear(TruncBase): 

382 kind = "year" 

383 

384 

385class TruncQuarter(TruncBase): 

386 kind = "quarter" 

387 

388 

389class TruncMonth(TruncBase): 

390 kind = "month" 

391 

392 

393class TruncWeek(TruncBase): 

394 """Truncate to midnight on the Monday of the week.""" 

395 

396 kind = "week" 

397 

398 

399class TruncDay(TruncBase): 

400 kind = "day" 

401 

402 

403class TruncDate(TruncBase): 

404 kind = "date" 

405 lookup_name = "date" 

406 output_field = DateField() 

407 

408 def as_sql(self, compiler, connection): 

409 # Cast to date rather than truncate to date. 

410 sql, params = compiler.compile(self.lhs) 

411 tzname = self.get_tzname() 

412 return connection.ops.datetime_cast_date_sql(sql, tuple(params), tzname) 

413 

414 

415class TruncTime(TruncBase): 

416 kind = "time" 

417 lookup_name = "time" 

418 output_field = TimeField() 

419 

420 def as_sql(self, compiler, connection): 

421 # Cast to time rather than truncate to time. 

422 sql, params = compiler.compile(self.lhs) 

423 tzname = self.get_tzname() 

424 return connection.ops.datetime_cast_time_sql(sql, tuple(params), tzname) 

425 

426 

427class TruncHour(TruncBase): 

428 kind = "hour" 

429 

430 

431class TruncMinute(TruncBase): 

432 kind = "minute" 

433 

434 

435class TruncSecond(TruncBase): 

436 kind = "second" 

437 

438 

439DateTimeField.register_lookup(TruncDate) 

440DateTimeField.register_lookup(TruncTime)