1from __future__ import annotations
2
3from datetime import timedelta
4from typing import TYPE_CHECKING
5from typing import cast
6from typing import overload
7
8import pendulum
9
10from pendulum.constants import SECONDS_PER_DAY
11from pendulum.constants import SECONDS_PER_HOUR
12from pendulum.constants import SECONDS_PER_MINUTE
13from pendulum.constants import US_PER_SECOND
14from pendulum.utils._compat import PYPY
15
16
17if TYPE_CHECKING:
18 from typing_extensions import Self
19
20
21def _divide_and_round(a: float, b: float) -> int:
22 """divide a by b and round result to the nearest integer
23
24 When the ratio is exactly half-way between two integers,
25 the even integer is returned.
26 """
27 # Based on the reference implementation for divmod_near
28 # in Objects/longobject.c.
29 q, r = divmod(a, b)
30
31 # The output of divmod() is either a float or an int,
32 # but we always want it to be an int.
33 q = int(q)
34
35 # round up if either r / b > 0.5, or r / b == 0.5 and q is odd.
36 # The expression r / b > 0.5 is equivalent to 2 * r > b if b is
37 # positive, 2 * r < b if b negative.
38 r *= 2
39 greater_than_half = r > b if b > 0 else r < b
40 if greater_than_half or r == b and q % 2 == 1:
41 q += 1
42
43 return q
44
45
46class Duration(timedelta):
47 """
48 Replacement for the standard timedelta class.
49
50 Provides several improvements over the base class.
51 """
52
53 _total: float = 0
54 _years: int = 0
55 _months: int = 0
56 _weeks: int = 0
57 _days: int = 0
58 _remaining_days: int = 0
59 _seconds: int = 0
60 _microseconds: int = 0
61
62 _y = None
63 _m = None
64 _w = None
65 _d = None
66 _h = None
67 _i = None
68 _s = None
69 _invert = None
70
71 def __new__(
72 cls,
73 days: float = 0,
74 seconds: float = 0,
75 microseconds: float = 0,
76 milliseconds: float = 0,
77 minutes: float = 0,
78 hours: float = 0,
79 weeks: float = 0,
80 years: float = 0,
81 months: float = 0,
82 ) -> Self:
83 if not isinstance(years, int) or not isinstance(months, int):
84 raise ValueError("Float year and months are not supported")
85
86 self = timedelta.__new__(
87 cls,
88 days + years * 365 + months * 30,
89 seconds,
90 microseconds,
91 milliseconds,
92 minutes,
93 hours,
94 weeks,
95 )
96
97 # Intuitive normalization
98 total = self.total_seconds() - (years * 365 + months * 30) * SECONDS_PER_DAY
99 self._total = total
100
101 m = 1
102 if total < 0:
103 m = -1
104
105 self._microseconds = round(total % m * 1e6)
106 self._seconds = abs(int(total)) % SECONDS_PER_DAY * m
107
108 _days = abs(int(total)) // SECONDS_PER_DAY * m
109 self._days = _days
110 self._remaining_days = abs(_days) % 7 * m
111 self._weeks = abs(_days) // 7 * m
112 self._months = months
113 self._years = years
114
115 self._signature = { # type: ignore[attr-defined]
116 "years": years,
117 "months": months,
118 "weeks": weeks,
119 "days": days,
120 "hours": hours,
121 "minutes": minutes,
122 "seconds": seconds,
123 "microseconds": microseconds + milliseconds * 1000,
124 }
125
126 return self
127
128 def total_minutes(self) -> float:
129 return self.total_seconds() / SECONDS_PER_MINUTE
130
131 def total_hours(self) -> float:
132 return self.total_seconds() / SECONDS_PER_HOUR
133
134 def total_days(self) -> float:
135 return self.total_seconds() / SECONDS_PER_DAY
136
137 def total_weeks(self) -> float:
138 return self.total_days() / 7
139
140 if PYPY:
141
142 def total_seconds(self) -> float:
143 days = 0
144
145 if hasattr(self, "_years"):
146 days += self._years * 365
147
148 if hasattr(self, "_months"):
149 days += self._months * 30
150
151 if hasattr(self, "_remaining_days"):
152 days += self._weeks * 7 + self._remaining_days
153 else:
154 days += self._days
155
156 return (
157 (days * SECONDS_PER_DAY + self._seconds) * US_PER_SECOND
158 + self._microseconds
159 ) / US_PER_SECOND
160
161 @property
162 def years(self) -> int:
163 return self._years
164
165 @property
166 def months(self) -> int:
167 return self._months
168
169 @property
170 def weeks(self) -> int:
171 return self._weeks
172
173 if PYPY:
174
175 @property
176 def days(self) -> int:
177 return self._years * 365 + self._months * 30 + self._days
178
179 @property
180 def remaining_days(self) -> int:
181 return self._remaining_days
182
183 @property
184 def hours(self) -> int:
185 if self._h is None:
186 seconds = self._seconds
187 self._h = 0
188 if abs(seconds) >= 3600:
189 self._h = (abs(seconds) // 3600 % 24) * self._sign(seconds)
190
191 return self._h
192
193 @property
194 def minutes(self) -> int:
195 if self._i is None:
196 seconds = self._seconds
197 self._i = 0
198 if abs(seconds) >= 60:
199 self._i = (abs(seconds) // 60 % 60) * self._sign(seconds)
200
201 return self._i
202
203 @property
204 def seconds(self) -> int:
205 return self._seconds
206
207 @property
208 def remaining_seconds(self) -> int:
209 if self._s is None:
210 self._s = self._seconds
211 self._s = abs(self._s) % 60 * self._sign(self._s)
212
213 return self._s
214
215 @property
216 def microseconds(self) -> int:
217 return self._microseconds
218
219 @property
220 def invert(self) -> bool:
221 if self._invert is None:
222 self._invert = self.total_seconds() < 0
223
224 return self._invert
225
226 def in_weeks(self) -> int:
227 return int(self.total_weeks())
228
229 def in_days(self) -> int:
230 return int(self.total_days())
231
232 def in_hours(self) -> int:
233 return int(self.total_hours())
234
235 def in_minutes(self) -> int:
236 return int(self.total_minutes())
237
238 def in_seconds(self) -> int:
239 return int(self.total_seconds())
240
241 def in_words(self, locale: str | None = None, separator: str = " ") -> str:
242 """
243 Get the current interval in words in the current locale.
244
245 Ex: 6 jours 23 heures 58 minutes
246
247 :param locale: The locale to use. Defaults to current locale.
248 :param separator: The separator to use between each unit
249 """
250 intervals = [
251 ("year", self.years),
252 ("month", self.months),
253 ("week", self.weeks),
254 ("day", self.remaining_days),
255 ("hour", self.hours),
256 ("minute", self.minutes),
257 ("second", self.remaining_seconds),
258 ]
259
260 if locale is None:
261 locale = pendulum.get_locale()
262
263 loaded_locale = pendulum.locale(locale)
264
265 parts = []
266 for interval in intervals:
267 unit, interval_count = interval
268 if abs(interval_count) > 0:
269 translation = loaded_locale.translation(
270 f"units.{unit}.{loaded_locale.plural(abs(interval_count))}"
271 )
272 parts.append(translation.format(interval_count))
273
274 if not parts:
275 count: int | str = 0
276 if abs(self.microseconds) > 0:
277 unit = f"units.second.{loaded_locale.plural(1)}"
278 count = f"{abs(self.microseconds) / 1e6:.2f}"
279 else:
280 unit = f"units.microsecond.{loaded_locale.plural(0)}"
281 translation = loaded_locale.translation(unit)
282 parts.append(translation.format(count))
283
284 return separator.join(parts)
285
286 def _sign(self, value: float) -> int:
287 if value < 0:
288 return -1
289
290 return 1
291
292 def as_timedelta(self) -> timedelta:
293 """
294 Return the interval as a native timedelta.
295 """
296 return timedelta(seconds=self.total_seconds())
297
298 def __str__(self) -> str:
299 return self.in_words()
300
301 def __repr__(self) -> str:
302 rep = f"{self.__class__.__name__}("
303
304 if self._years:
305 rep += f"years={self._years}, "
306
307 if self._months:
308 rep += f"months={self._months}, "
309
310 if self._weeks:
311 rep += f"weeks={self._weeks}, "
312
313 if self._days:
314 rep += f"days={self._remaining_days}, "
315
316 if self.hours:
317 rep += f"hours={self.hours}, "
318
319 if self.minutes:
320 rep += f"minutes={self.minutes}, "
321
322 if self.remaining_seconds:
323 rep += f"seconds={self.remaining_seconds}, "
324
325 if self.microseconds:
326 rep += f"microseconds={self.microseconds}, "
327
328 rep += ")"
329
330 return rep.replace(", )", ")")
331
332 def __add__(self, other: timedelta) -> Self:
333 if isinstance(other, timedelta):
334 return self.__class__(seconds=self.total_seconds() + other.total_seconds())
335
336 return NotImplemented
337
338 __radd__ = __add__
339
340 def __sub__(self, other: timedelta) -> Self:
341 if isinstance(other, timedelta):
342 return self.__class__(seconds=self.total_seconds() - other.total_seconds())
343
344 return NotImplemented
345
346 def __neg__(self) -> Self:
347 return self.__class__(
348 years=-self._years,
349 months=-self._months,
350 weeks=-self._weeks,
351 days=-self._remaining_days,
352 seconds=-self._seconds,
353 microseconds=-self._microseconds,
354 )
355
356 def _to_microseconds(self) -> int:
357 return (self._days * (24 * 3600) + self._seconds) * 1000000 + self._microseconds
358
359 def __mul__(self, other: int | float) -> Self:
360 if isinstance(other, int):
361 return self.__class__(
362 years=self._years * other,
363 months=self._months * other,
364 seconds=self._total * other,
365 )
366
367 if isinstance(other, float):
368 usec = self._to_microseconds()
369 a, b = other.as_integer_ratio()
370
371 return self.__class__(0, 0, _divide_and_round(usec * a, b))
372
373 return NotImplemented
374
375 __rmul__ = __mul__
376
377 @overload
378 def __floordiv__(self, other: timedelta) -> int:
379 ...
380
381 @overload
382 def __floordiv__(self, other: int) -> Self:
383 ...
384
385 def __floordiv__(self, other: int | timedelta) -> int | Duration:
386 if not isinstance(other, (int, timedelta)):
387 return NotImplemented
388
389 usec = self._to_microseconds()
390 if isinstance(other, timedelta):
391 return cast(
392 int, usec // other._to_microseconds() # type: ignore[attr-defined]
393 )
394
395 if isinstance(other, int):
396 return self.__class__(
397 0,
398 0,
399 usec // other,
400 years=self._years // other,
401 months=self._months // other,
402 )
403
404 @overload
405 def __truediv__(self, other: timedelta) -> float:
406 ...
407
408 @overload
409 def __truediv__(self, other: float) -> Self:
410 ...
411
412 def __truediv__(self, other: int | float | timedelta) -> Self | float:
413 if not isinstance(other, (int, float, timedelta)):
414 return NotImplemented
415
416 usec = self._to_microseconds()
417 if isinstance(other, timedelta):
418 return cast(
419 float, usec / other._to_microseconds() # type: ignore[attr-defined]
420 )
421
422 if isinstance(other, int):
423 return self.__class__(
424 0,
425 0,
426 _divide_and_round(usec, other),
427 years=_divide_and_round(self._years, other),
428 months=_divide_and_round(self._months, other),
429 )
430
431 if isinstance(other, float):
432 a, b = other.as_integer_ratio()
433
434 return self.__class__(
435 0,
436 0,
437 _divide_and_round(b * usec, a),
438 years=_divide_and_round(self._years * b, a),
439 months=_divide_and_round(self._months, other),
440 )
441
442 __div__ = __floordiv__
443
444 def __mod__(self, other: timedelta) -> Self:
445 if isinstance(other, timedelta):
446 r = self._to_microseconds() % other._to_microseconds() # type: ignore[attr-defined] # noqa: E501
447
448 return self.__class__(0, 0, r)
449
450 return NotImplemented
451
452 def __divmod__(self, other: timedelta) -> tuple[int, Duration]:
453 if isinstance(other, timedelta):
454 q, r = divmod(
455 self._to_microseconds(),
456 other._to_microseconds(), # type: ignore[attr-defined]
457 )
458
459 return q, self.__class__(0, 0, r)
460
461 return NotImplemented
462
463 def __deepcopy__(self, _: dict[int, Self]) -> Self:
464 return self.__class__(
465 days=self.remaining_days,
466 seconds=self.remaining_seconds,
467 microseconds=self.microseconds,
468 minutes=self.minutes,
469 hours=self.hours,
470 years=self.years,
471 months=self.months,
472 )
473
474
475Duration.min = Duration(days=-999999999)
476Duration.max = Duration(
477 days=999999999, hours=23, minutes=59, seconds=59, microseconds=999999
478)
479Duration.resolution = Duration(microseconds=1)
480
481
482class AbsoluteDuration(Duration):
483 """
484 Duration that expresses a time difference in absolute values.
485 """
486
487 def __new__(
488 cls,
489 days: float = 0,
490 seconds: float = 0,
491 microseconds: float = 0,
492 milliseconds: float = 0,
493 minutes: float = 0,
494 hours: float = 0,
495 weeks: float = 0,
496 years: float = 0,
497 months: float = 0,
498 ) -> AbsoluteDuration:
499 if not isinstance(years, int) or not isinstance(months, int):
500 raise ValueError("Float year and months are not supported")
501
502 self = timedelta.__new__(
503 cls, days, seconds, microseconds, milliseconds, minutes, hours, weeks
504 )
505
506 # We need to compute the total_seconds() value
507 # on a native timedelta object
508 delta = timedelta(
509 days, seconds, microseconds, milliseconds, minutes, hours, weeks
510 )
511
512 # Intuitive normalization
513 self._total = delta.total_seconds()
514 total = abs(self._total)
515
516 self._microseconds = round(total % 1 * 1e6)
517 days, self._seconds = divmod(int(total), SECONDS_PER_DAY)
518 self._days = abs(days + years * 365 + months * 30)
519 self._weeks, self._remaining_days = divmod(days, 7)
520 self._months = abs(months)
521 self._years = abs(years)
522
523 return self
524
525 def total_seconds(self) -> float:
526 return abs(self._total)
527
528 @property
529 def invert(self) -> bool:
530 if self._invert is None:
531 self._invert = self._total < 0
532
533 return self._invert