Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/time_machine/__init__.py: 33%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import datetime as dt
4import functools
5import inspect
6import os
7import sys
8import time as time_module
9import uuid
10from collections.abc import Generator
11from time import gmtime as orig_gmtime
12from time import struct_time
13from types import TracebackType
14from typing import Any
15from typing import Awaitable
16from typing import Callable
17from typing import Generator as TypingGenerator
18from typing import Tuple
19from typing import Type
20from typing import TypeVar
21from typing import Union
22from typing import cast
23from typing import overload
24from unittest import TestCase
25from unittest import mock
27import _time_machine
28from dateutil.parser import parse as parse_datetime
30# time.clock_gettime and time.CLOCK_REALTIME not always available
31# e.g. on builds against old macOS = official Python.org installer
32try:
33 from time import CLOCK_REALTIME
34except ImportError:
35 # Dummy value that won't compare equal to any value
36 CLOCK_REALTIME = sys.maxsize
38try:
39 from time import tzset
41 HAVE_TZSET = True
42except ImportError: # pragma: no cover
43 # Windows
44 HAVE_TZSET = False
46if sys.version_info >= (3, 9):
47 from zoneinfo import ZoneInfo
49 HAVE_ZONEINFO = True
50else:
51 try:
52 from backports.zoneinfo import ZoneInfo
54 HAVE_ZONEINFO = True
55 except ImportError: # pragma: no cover
56 HAVE_ZONEINFO = False
59try:
60 import pytest
61except ImportError: # pragma: no cover
62 HAVE_PYTEST = False
63else:
64 HAVE_PYTEST = True
66NANOSECONDS_PER_SECOND = 1_000_000_000
68# Windows' time epoch is not unix epoch but in 1601. This constant helps us
69# translate to it.
70_system_epoch = orig_gmtime(0)
71SYSTEM_EPOCH_TIMESTAMP_NS = int(
72 dt.datetime(
73 _system_epoch.tm_year,
74 _system_epoch.tm_mon,
75 _system_epoch.tm_mday,
76 _system_epoch.tm_hour,
77 _system_epoch.tm_min,
78 _system_epoch.tm_sec,
79 tzinfo=dt.timezone.utc,
80 ).timestamp()
81 * NANOSECONDS_PER_SECOND
82)
84DestinationBaseType = Union[
85 int,
86 float,
87 dt.datetime,
88 dt.timedelta,
89 dt.date,
90 str,
91]
92DestinationType = Union[
93 DestinationBaseType,
94 Callable[[], DestinationBaseType],
95 TypingGenerator[DestinationBaseType, None, None],
96]
98_F = TypeVar("_F", bound=Callable[..., Any])
99_AF = TypeVar("_AF", bound=Callable[..., Awaitable[Any]])
100TestCaseType = TypeVar("TestCaseType", bound=Type[TestCase])
102# copied from typeshed:
103_TimeTuple = Tuple[int, int, int, int, int, int, int, int, int]
106def extract_timestamp_tzname(
107 destination: DestinationType,
108) -> tuple[float, str | None]:
109 dest: DestinationBaseType
110 if isinstance(destination, Generator):
111 dest = next(destination)
112 elif callable(destination):
113 dest = destination()
114 else:
115 dest = destination
117 timestamp: float
118 tzname: str | None = None
119 if isinstance(dest, int):
120 timestamp = float(dest)
121 elif isinstance(dest, float):
122 timestamp = dest
123 elif isinstance(dest, dt.datetime):
124 if HAVE_ZONEINFO and isinstance(dest.tzinfo, ZoneInfo):
125 tzname = dest.tzinfo.key
126 if dest.tzinfo is None:
127 dest = dest.replace(tzinfo=dt.timezone.utc)
128 timestamp = dest.timestamp()
129 elif isinstance(dest, dt.timedelta):
130 timestamp = time_module.time() + dest.total_seconds()
131 elif isinstance(dest, dt.date):
132 timestamp = dt.datetime.combine(
133 dest, dt.time(0, 0), tzinfo=dt.timezone.utc
134 ).timestamp()
135 elif isinstance(dest, str):
136 timestamp = parse_datetime(dest).timestamp()
137 else:
138 raise TypeError(f"Unsupported destination {dest!r}")
140 return timestamp, tzname
143class Coordinates:
144 def __init__(
145 self,
146 destination_timestamp: float,
147 destination_tzname: str | None,
148 tick: bool,
149 ) -> None:
150 self._destination_timestamp_ns = int(
151 destination_timestamp * NANOSECONDS_PER_SECOND
152 )
153 self._destination_tzname = destination_tzname
154 self._tick = tick
155 self._requested = False
157 def time(self) -> float:
158 return self.time_ns() / NANOSECONDS_PER_SECOND
160 def time_ns(self) -> int:
161 if not self._tick:
162 return self._destination_timestamp_ns
164 base = SYSTEM_EPOCH_TIMESTAMP_NS + self._destination_timestamp_ns
165 now_ns: int = _time_machine.original_time_ns()
167 if not self._requested:
168 self._requested = True
169 self._real_start_timestamp_ns = now_ns
170 return base
172 return base + (now_ns - self._real_start_timestamp_ns)
174 def shift(self, delta: dt.timedelta | int | float) -> None:
175 if isinstance(delta, dt.timedelta):
176 total_seconds = delta.total_seconds()
177 elif isinstance(delta, (int, float)):
178 total_seconds = delta
179 else:
180 raise TypeError(f"Unsupported type for delta argument: {delta!r}")
182 self._destination_timestamp_ns += int(total_seconds * NANOSECONDS_PER_SECOND)
184 def move_to(
185 self,
186 destination: DestinationType,
187 tick: bool | None = None,
188 ) -> None:
189 self._stop()
190 timestamp, self._destination_tzname = extract_timestamp_tzname(destination)
191 self._destination_timestamp_ns = int(timestamp * NANOSECONDS_PER_SECOND)
192 self._requested = False
193 self._start()
194 if tick is not None:
195 self._tick = tick
197 def _start(self) -> None:
198 if HAVE_TZSET and self._destination_tzname is not None:
199 self._orig_tz = os.environ.get("TZ")
200 os.environ["TZ"] = self._destination_tzname
201 tzset()
203 def _stop(self) -> None:
204 if HAVE_TZSET and self._destination_tzname is not None:
205 if self._orig_tz is None:
206 del os.environ["TZ"]
207 else:
208 os.environ["TZ"] = self._orig_tz
209 tzset()
212coordinates_stack: list[Coordinates] = []
214# During time travel, patch the uuid module's time-based generation function to
215# None, which makes it use time.time(). Otherwise it makes a system call to
216# find the current datetime. The time it finds is stored in generated UUID1
217# values.
218uuid_generate_time_attr = "_generate_time_safe"
219uuid_generate_time_patcher = mock.patch.object(uuid, uuid_generate_time_attr, new=None)
220uuid_uuid_create_patcher = mock.patch.object(uuid, "_UuidCreate", new=None)
223class travel:
224 def __init__(self, destination: DestinationType, *, tick: bool = True) -> None:
225 self.destination_timestamp, self.destination_tzname = extract_timestamp_tzname(
226 destination
227 )
228 self.tick = tick
230 def start(self) -> Coordinates:
231 global coordinates_stack
233 _time_machine.patch_if_needed()
235 if not coordinates_stack:
236 if sys.version_info < (3, 9):
237 # We need to cause the functions to be loaded before we patch
238 # them out, which is done by this internal function before:
239 # https://github.com/python/cpython/pull/19948
240 uuid._load_system_functions()
241 uuid_generate_time_patcher.start()
242 uuid_uuid_create_patcher.start()
244 coordinates = Coordinates(
245 destination_timestamp=self.destination_timestamp,
246 destination_tzname=self.destination_tzname,
247 tick=self.tick,
248 )
249 coordinates_stack.append(coordinates)
250 coordinates._start()
252 return coordinates
254 def stop(self) -> None:
255 global coordinates_stack
256 coordinates_stack.pop()._stop()
258 if not coordinates_stack:
259 uuid_generate_time_patcher.stop()
260 uuid_uuid_create_patcher.stop()
262 def __enter__(self) -> Coordinates:
263 return self.start()
265 def __exit__(
266 self,
267 exc_type: type[BaseException] | None,
268 exc_val: BaseException | None,
269 exc_tb: TracebackType | None,
270 ) -> None:
271 self.stop()
273 @overload
274 def __call__(self, wrapped: TestCaseType) -> TestCaseType: # pragma: no cover
275 ...
277 @overload
278 def __call__(self, wrapped: _AF) -> _AF: # pragma: no cover
279 ...
281 @overload
282 def __call__(self, wrapped: _F) -> _F: # pragma: no cover
283 ...
285 # 'Any' below is workaround for Mypy error:
286 # Overloaded function implementation does not accept all possible arguments
287 # of signature
288 def __call__(
289 self, wrapped: TestCaseType | _AF | _F | Any
290 ) -> TestCaseType | _AF | _F | Any:
291 if isinstance(wrapped, type):
292 # Class decorator
293 if not issubclass(wrapped, TestCase):
294 raise TypeError("Can only decorate unittest.TestCase subclasses.")
296 # Modify the setUpClass method
297 orig_setUpClass = wrapped.setUpClass.__func__ # type: ignore[attr-defined]
299 @functools.wraps(orig_setUpClass)
300 def setUpClass(cls: type[TestCase]) -> None:
301 self.__enter__()
302 try:
303 orig_setUpClass(cls)
304 except Exception:
305 self.__exit__(*sys.exc_info())
306 raise
308 wrapped.setUpClass = classmethod(setUpClass) # type: ignore[assignment]
310 orig_tearDownClass = (
311 wrapped.tearDownClass.__func__ # type: ignore[attr-defined]
312 )
314 @functools.wraps(orig_tearDownClass)
315 def tearDownClass(cls: type[TestCase]) -> None:
316 orig_tearDownClass(cls)
317 self.__exit__(None, None, None)
319 wrapped.tearDownClass = classmethod( # type: ignore[assignment]
320 tearDownClass
321 )
322 return cast(TestCaseType, wrapped)
323 elif inspect.iscoroutinefunction(wrapped):
325 @functools.wraps(wrapped)
326 async def wrapper(*args: Any, **kwargs: Any) -> Any:
327 with self:
328 return await wrapped(*args, **kwargs)
330 return cast(_AF, wrapper)
331 else:
332 assert callable(wrapped)
334 @functools.wraps(wrapped)
335 def wrapper(*args: Any, **kwargs: Any) -> Any:
336 with self:
337 return wrapped(*args, **kwargs)
339 return cast(_F, wrapper)
342# datetime module
345def now(tz: dt.tzinfo | None = None) -> dt.datetime:
346 if not coordinates_stack:
347 result: dt.datetime = _time_machine.original_now(tz)
348 return result
349 return dt.datetime.fromtimestamp(time(), tz)
352def utcnow() -> dt.datetime:
353 if not coordinates_stack:
354 result: dt.datetime = _time_machine.original_utcnow()
355 return result
356 return dt.datetime.fromtimestamp(time(), dt.timezone.utc).replace(tzinfo=None)
359# time module
362def clock_gettime(clk_id: int) -> float:
363 if not coordinates_stack or clk_id != CLOCK_REALTIME:
364 result: float = _time_machine.original_clock_gettime(clk_id)
365 return result
366 return time()
369def clock_gettime_ns(clk_id: int) -> int:
370 if not coordinates_stack or clk_id != CLOCK_REALTIME:
371 result: int = _time_machine.original_clock_gettime_ns(clk_id)
372 return result
373 return time_ns()
376def gmtime(secs: float | None = None) -> struct_time:
377 result: struct_time
378 if not coordinates_stack or secs is not None:
379 result = _time_machine.original_gmtime(secs)
380 else:
381 result = _time_machine.original_gmtime(coordinates_stack[-1].time())
382 return result
385def localtime(secs: float | None = None) -> struct_time:
386 result: struct_time
387 if not coordinates_stack or secs is not None:
388 result = _time_machine.original_localtime(secs)
389 else:
390 result = _time_machine.original_localtime(coordinates_stack[-1].time())
391 return result
394def strftime(format: str, t: _TimeTuple | struct_time | None = None) -> str:
395 result: str
396 if t is not None:
397 result = _time_machine.original_strftime(format, t)
398 elif not coordinates_stack:
399 result = _time_machine.original_strftime(format)
400 else:
401 result = _time_machine.original_strftime(format, localtime())
402 return result
405def time() -> float:
406 if not coordinates_stack:
407 result: float = _time_machine.original_time()
408 return result
409 return coordinates_stack[-1].time()
412def time_ns() -> int:
413 if not coordinates_stack:
414 result: int = _time_machine.original_time_ns()
415 return result
416 return coordinates_stack[-1].time_ns()
419# pytest plugin
421if HAVE_PYTEST: # pragma: no branch
423 class TimeMachineFixture:
424 traveller: travel | None
425 coordinates: Coordinates | None
427 def __init__(self) -> None:
428 self.traveller = None
429 self.coordinates = None
431 def move_to(
432 self,
433 destination: DestinationType,
434 tick: bool | None = None,
435 ) -> None:
436 if self.traveller is None:
437 if tick is None:
438 tick = True
439 self.traveller = travel(destination, tick=tick)
440 self.coordinates = self.traveller.start()
441 else:
442 assert self.coordinates is not None
443 self.coordinates.move_to(destination, tick=tick)
445 def shift(self, delta: dt.timedelta | int | float) -> None:
446 if self.traveller is None:
447 raise RuntimeError(
448 "Initialize time_machine with move_to() before using shift()."
449 )
450 assert self.coordinates is not None
451 self.coordinates.shift(delta=delta)
453 def stop(self) -> None:
454 if self.traveller is not None:
455 self.traveller.stop()
457 @pytest.fixture(name="time_machine")
458 def time_machine_fixture() -> TypingGenerator[TimeMachineFixture, None, None]:
459 fixture = TimeMachineFixture()
460 yield fixture
461 fixture.stop()
464# escape hatch
467class _EscapeHatchDatetimeDatetime:
468 def now(self, tz: dt.tzinfo | None = None) -> dt.datetime:
469 result: dt.datetime = _time_machine.original_now(tz)
470 return result
472 def utcnow(self) -> dt.datetime:
473 result: dt.datetime = _time_machine.original_utcnow()
474 return result
477class _EscapeHatchDatetime:
478 def __init__(self) -> None:
479 self.datetime = _EscapeHatchDatetimeDatetime()
482class _EscapeHatchTime:
483 def clock_gettime(self, clk_id: int) -> float:
484 result: float = _time_machine.original_clock_gettime(clk_id)
485 return result
487 def clock_gettime_ns(self, clk_id: int) -> int:
488 result: int = _time_machine.original_clock_gettime_ns(clk_id)
489 return result
491 def gmtime(self, secs: float | None = None) -> struct_time:
492 result: struct_time = _time_machine.original_gmtime(secs)
493 return result
495 def localtime(self, secs: float | None = None) -> struct_time:
496 result: struct_time = _time_machine.original_localtime(secs)
497 return result
499 def monotonic(self) -> float:
500 result: float = _time_machine.original_monotonic()
501 return result
503 def monotonic_ns(self) -> int:
504 result: int = _time_machine.original_monotonic_ns()
505 return result
507 def strftime(self, format: str, t: _TimeTuple | struct_time | None = None) -> str:
508 result: str
509 if t is not None:
510 result = _time_machine.original_strftime(format, t)
511 else:
512 result = _time_machine.original_strftime(format)
513 return result
515 def time(self) -> float:
516 result: float = _time_machine.original_time()
517 return result
519 def time_ns(self) -> int:
520 result: int = _time_machine.original_time_ns()
521 return result
524class _EscapeHatch:
525 def __init__(self) -> None:
526 self.datetime = _EscapeHatchDatetime()
527 self.time = _EscapeHatchTime()
529 def is_travelling(self) -> bool:
530 return bool(coordinates_stack)
533escape_hatch = _EscapeHatch()