Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/time_machine/__init__.py: 33%
305 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 06:11 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-30 06:11 +0000
1from __future__ import annotations
3import datetime as dt
4import functools
5import inspect
6import os
7import sys
8import uuid
9from collections.abc import Generator
10from time import gmtime as orig_gmtime
11from time import struct_time
12from types import TracebackType
13from typing import Any
14from typing import Awaitable
15from typing import Callable
16from typing import cast
17from typing import Generator as TypingGenerator
18from typing import overload
19from typing import Tuple
20from typing import Type
21from typing import TypeVar
22from typing import Union
23from unittest import mock
24from unittest import TestCase
26import _time_machine
27from dateutil.parser import parse as parse_datetime
29# time.clock_gettime and time.CLOCK_REALTIME not always available
30# e.g. on builds against old macOS = official Python.org installer
31try:
32 from time import CLOCK_REALTIME
33except ImportError:
34 # Dummy value that won't compare equal to any value
35 CLOCK_REALTIME = sys.maxsize
37try:
38 from time import tzset
40 HAVE_TZSET = True
41except ImportError: # pragma: no cover
42 # Windows
43 HAVE_TZSET = False
45if sys.version_info >= (3, 9):
46 from zoneinfo import ZoneInfo
48 HAVE_ZONEINFO = True
49else:
50 try:
51 from backports.zoneinfo import ZoneInfo
53 HAVE_ZONEINFO = True
54 except ImportError: # pragma: no cover
55 HAVE_ZONEINFO = False
58try:
59 import pytest
60except ImportError: # pragma: no cover
61 HAVE_PYTEST = False
62else:
63 HAVE_PYTEST = True
65NANOSECONDS_PER_SECOND = 1_000_000_000
67# Windows' time epoch is not unix epoch but in 1601. This constant helps us
68# translate to it.
69_system_epoch = orig_gmtime(0)
70SYSTEM_EPOCH_TIMESTAMP_NS = int(
71 dt.datetime(
72 _system_epoch.tm_year,
73 _system_epoch.tm_mon,
74 _system_epoch.tm_mday,
75 _system_epoch.tm_hour,
76 _system_epoch.tm_min,
77 _system_epoch.tm_sec,
78 tzinfo=dt.timezone.utc,
79 ).timestamp()
80 * NANOSECONDS_PER_SECOND
81)
83DestinationBaseType = Union[
84 int,
85 float,
86 dt.datetime,
87 dt.timedelta,
88 dt.date,
89 str,
90]
91DestinationType = Union[
92 DestinationBaseType,
93 Callable[[], DestinationBaseType],
94 TypingGenerator[DestinationBaseType, None, None],
95]
97_F = TypeVar("_F", bound=Callable[..., Any])
98_AF = TypeVar("_AF", bound=Callable[..., Awaitable[Any]])
99TestCaseType = TypeVar("TestCaseType", bound=Type[TestCase])
101# copied from typeshed:
102_TimeTuple = Tuple[int, int, int, int, int, int, int, int, int]
105def extract_timestamp_tzname(
106 destination: DestinationType,
107) -> tuple[float, str | None]:
108 dest: DestinationBaseType
109 if isinstance(destination, Generator):
110 dest = next(destination)
111 elif callable(destination):
112 dest = destination()
113 else:
114 dest = destination
116 timestamp: float
117 tzname: str | None = None
118 if isinstance(dest, int):
119 timestamp = float(dest)
120 elif isinstance(dest, float):
121 timestamp = dest
122 elif isinstance(dest, dt.datetime):
123 if HAVE_ZONEINFO and isinstance(dest.tzinfo, ZoneInfo):
124 tzname = dest.tzinfo.key
125 if dest.tzinfo is None:
126 dest = dest.replace(tzinfo=dt.timezone.utc)
127 timestamp = dest.timestamp()
128 elif isinstance(dest, dt.timedelta):
129 timestamp = time() + dest.total_seconds()
130 elif isinstance(dest, dt.date):
131 timestamp = dt.datetime.combine(
132 dest, dt.time(0, 0), tzinfo=dt.timezone.utc
133 ).timestamp()
134 elif isinstance(dest, str):
135 timestamp = parse_datetime(dest).timestamp()
136 else:
137 raise TypeError(f"Unsupported destination {dest!r}")
139 return timestamp, tzname
142class Coordinates:
143 def __init__(
144 self,
145 destination_timestamp: float,
146 destination_tzname: str | None,
147 tick: bool,
148 ) -> None:
149 self._destination_timestamp_ns = int(
150 destination_timestamp * NANOSECONDS_PER_SECOND
151 )
152 self._destination_tzname = destination_tzname
153 self._tick = tick
154 self._requested = False
156 def time(self) -> float:
157 return self.time_ns() / NANOSECONDS_PER_SECOND
159 def time_ns(self) -> int:
160 if not self._tick:
161 return self._destination_timestamp_ns
163 base = SYSTEM_EPOCH_TIMESTAMP_NS + self._destination_timestamp_ns
164 now_ns: int = _time_machine.original_time_ns()
166 if not self._requested:
167 self._requested = True
168 self._real_start_timestamp_ns = now_ns
169 return base
171 return base + (now_ns - self._real_start_timestamp_ns)
173 def shift(self, delta: dt.timedelta | int | float) -> None:
174 if isinstance(delta, dt.timedelta):
175 total_seconds = delta.total_seconds()
176 elif isinstance(delta, (int, float)):
177 total_seconds = delta
178 else:
179 raise TypeError(f"Unsupported type for delta argument: {delta!r}")
181 self._destination_timestamp_ns += int(total_seconds * NANOSECONDS_PER_SECOND)
183 def move_to(
184 self,
185 destination: DestinationType,
186 tick: bool | None = None,
187 ) -> None:
188 self._stop()
189 timestamp, self._destination_tzname = extract_timestamp_tzname(destination)
190 self._destination_timestamp_ns = int(timestamp * NANOSECONDS_PER_SECOND)
191 self._requested = False
192 self._start()
193 if tick is not None:
194 self._tick = tick
196 def _start(self) -> None:
197 if HAVE_TZSET and self._destination_tzname is not None:
198 self._orig_tz = os.environ.get("TZ")
199 os.environ["TZ"] = self._destination_tzname
200 tzset()
202 def _stop(self) -> None:
203 if HAVE_TZSET and self._destination_tzname is not None:
204 if self._orig_tz is None:
205 del os.environ["TZ"]
206 else:
207 os.environ["TZ"] = self._orig_tz
208 tzset()
211coordinates_stack: list[Coordinates] = []
213# During time travel, patch the uuid module's time-based generation function to
214# None, which makes it use time.time(). Otherwise it makes a system call to
215# find the current datetime. The time it finds is stored in generated UUID1
216# values.
217uuid_generate_time_attr = "_generate_time_safe"
218uuid_generate_time_patcher = mock.patch.object(uuid, uuid_generate_time_attr, new=None)
219uuid_uuid_create_patcher = mock.patch.object(uuid, "_UuidCreate", new=None)
220# We need to cause the functions to be loaded before we try patch them out,
221# which is done by this internal function
222uuid_idempotent_load_system_functions = (
223 uuid._load_system_functions # type: ignore[attr-defined]
224)
227class travel:
228 def __init__(self, destination: DestinationType, *, tick: bool = True) -> None:
229 self.destination_timestamp, self.destination_tzname = extract_timestamp_tzname(
230 destination
231 )
232 self.tick = tick
234 def start(self) -> Coordinates:
235 global coordinates_stack
237 _time_machine.patch_if_needed()
239 if not coordinates_stack:
240 uuid_idempotent_load_system_functions()
241 uuid_generate_time_patcher.start()
242 uuid_uuid_create_patcher.start()
244 coordinates = Coordinates(
245 destination_timestamp=self.destination_timestamp,
246 destination_tzname=self.destination_tzname,
247 tick=self.tick,
248 )
249 coordinates_stack.append(coordinates)
250 coordinates._start()
252 return coordinates
254 def stop(self) -> None:
255 global coordinates_stack
256 coordinates_stack.pop()._stop()
258 if not coordinates_stack:
259 uuid_generate_time_patcher.stop()
260 uuid_uuid_create_patcher.stop()
262 def __enter__(self) -> Coordinates:
263 return self.start()
265 def __exit__(
266 self,
267 exc_type: type[BaseException] | None,
268 exc_val: BaseException | None,
269 exc_tb: TracebackType | None,
270 ) -> None:
271 self.stop()
273 @overload
274 def __call__(self, wrapped: TestCaseType) -> TestCaseType: # pragma: no cover
275 ...
277 @overload
278 def __call__(self, wrapped: _AF) -> _AF: # pragma: no cover
279 ...
281 @overload
282 def __call__(self, wrapped: _F) -> _F: # pragma: no cover
283 ...
285 # 'Any' below is workaround for Mypy error:
286 # Overloaded function implementation does not accept all possible arguments
287 # of signature
288 def __call__(
289 self, wrapped: TestCaseType | _AF | _F | Any
290 ) -> TestCaseType | _AF | _F | Any:
291 if isinstance(wrapped, type):
292 # Class decorator
293 if not issubclass(wrapped, TestCase):
294 raise TypeError("Can only decorate unittest.TestCase subclasses.")
296 # Modify the setUpClass method
297 orig_setUpClass = wrapped.setUpClass
299 @functools.wraps(orig_setUpClass)
300 def setUpClass(cls: type[TestCase]) -> None:
301 self.__enter__()
302 try:
303 orig_setUpClass()
304 except Exception:
305 self.__exit__(*sys.exc_info())
306 raise
308 wrapped.setUpClass = classmethod(setUpClass) # type: ignore[assignment]
310 orig_tearDownClass = wrapped.tearDownClass
312 @functools.wraps(orig_tearDownClass)
313 def tearDownClass(cls: type[TestCase]) -> None:
314 orig_tearDownClass()
315 self.__exit__(None, None, None)
317 wrapped.tearDownClass = classmethod( # type: ignore[assignment]
318 tearDownClass
319 )
320 return cast(TestCaseType, wrapped)
321 elif inspect.iscoroutinefunction(wrapped):
323 @functools.wraps(wrapped)
324 async def wrapper(*args: Any, **kwargs: Any) -> Any:
325 with self:
326 return await wrapped(*args, **kwargs)
328 return cast(_AF, wrapper)
329 else:
330 assert callable(wrapped)
332 @functools.wraps(wrapped)
333 def wrapper(*args: Any, **kwargs: Any) -> Any:
334 with self:
335 return wrapped(*args, **kwargs)
337 return cast(_F, wrapper)
340# datetime module
343def now(tz: dt.tzinfo | None = None) -> dt.datetime:
344 if not coordinates_stack:
345 result: dt.datetime = _time_machine.original_now(tz)
346 return result
347 return dt.datetime.fromtimestamp(time(), tz)
350def utcnow() -> dt.datetime:
351 if not coordinates_stack:
352 result: dt.datetime = _time_machine.original_utcnow()
353 return result
354 return dt.datetime.utcfromtimestamp(time())
357# time module
360def clock_gettime(clk_id: int) -> float:
361 if not coordinates_stack or clk_id != CLOCK_REALTIME:
362 result: float = _time_machine.original_clock_gettime(clk_id)
363 return result
364 return time()
367def clock_gettime_ns(clk_id: int) -> int:
368 if not coordinates_stack or clk_id != CLOCK_REALTIME:
369 result: int = _time_machine.original_clock_gettime_ns(clk_id)
370 return result
371 return time_ns()
374def gmtime(secs: float | None = None) -> struct_time:
375 result: struct_time
376 if not coordinates_stack or secs is not None:
377 result = _time_machine.original_gmtime(secs)
378 else:
379 result = _time_machine.original_gmtime(coordinates_stack[-1].time())
380 return result
383def localtime(secs: float | None = None) -> struct_time:
384 result: struct_time
385 if not coordinates_stack or secs is not None:
386 result = _time_machine.original_localtime(secs)
387 else:
388 result = _time_machine.original_localtime(coordinates_stack[-1].time())
389 return result
392def strftime(format: str, t: _TimeTuple | struct_time | None = None) -> str:
393 result: str
394 if t is not None:
395 result = _time_machine.original_strftime(format, t)
396 elif not coordinates_stack:
397 result = _time_machine.original_strftime(format)
398 else:
399 result = _time_machine.original_strftime(format, localtime())
400 return result
403def time() -> float:
404 if not coordinates_stack:
405 result: float = _time_machine.original_time()
406 return result
407 return coordinates_stack[-1].time()
410def time_ns() -> int:
411 if not coordinates_stack:
412 result: int = _time_machine.original_time_ns()
413 return result
414 return coordinates_stack[-1].time_ns()
417# pytest plugin
419if HAVE_PYTEST: # pragma: no branch
421 class TimeMachineFixture:
422 traveller: travel | None
423 coordinates: Coordinates | None
425 def __init__(self) -> None:
426 self.traveller = None
427 self.coordinates = None
429 def move_to(
430 self,
431 destination: DestinationType,
432 tick: bool | None = None,
433 ) -> None:
434 if self.traveller is None:
435 if tick is None:
436 tick = True
437 self.traveller = travel(destination, tick=tick)
438 self.coordinates = self.traveller.start()
439 else:
440 assert self.coordinates is not None
441 self.coordinates.move_to(destination, tick=tick)
443 def shift(self, delta: dt.timedelta | int | float) -> None:
444 if self.traveller is None:
445 raise RuntimeError(
446 "Initialize time_machine with move_to() before using shift()."
447 )
448 assert self.coordinates is not None
449 self.coordinates.shift(delta=delta)
451 def stop(self) -> None:
452 if self.traveller is not None:
453 self.traveller.stop()
455 @pytest.fixture(name="time_machine")
456 def time_machine_fixture() -> TypingGenerator[TimeMachineFixture, None, None]:
457 fixture = TimeMachineFixture()
458 yield fixture
459 fixture.stop()
462# escape hatch
465class _EscapeHatchDatetimeDatetime:
466 def now(self, tz: dt.tzinfo | None = None) -> dt.datetime:
467 result: dt.datetime = _time_machine.original_now(tz)
468 return result
470 def utcnow(self) -> dt.datetime:
471 result: dt.datetime = _time_machine.original_utcnow()
472 return result
475class _EscapeHatchDatetime:
476 def __init__(self) -> None:
477 self.datetime = _EscapeHatchDatetimeDatetime()
480class _EscapeHatchTime:
481 def clock_gettime(self, clk_id: int) -> float:
482 result: float = _time_machine.original_clock_gettime(clk_id)
483 return result
485 def clock_gettime_ns(self, clk_id: int) -> int:
486 result: int = _time_machine.original_clock_gettime_ns(clk_id)
487 return result
489 def gmtime(self, secs: float | None = None) -> struct_time:
490 result: struct_time = _time_machine.original_gmtime(secs)
491 return result
493 def localtime(self, secs: float | None = None) -> struct_time:
494 result: struct_time = _time_machine.original_localtime(secs)
495 return result
497 def monotonic(self) -> float:
498 result: float = _time_machine.original_monotonic()
499 return result
501 def monotonic_ns(self) -> int:
502 result: int = _time_machine.original_monotonic_ns()
503 return result
505 def strftime(self, format: str, t: _TimeTuple | struct_time | None = None) -> str:
506 result: str
507 if t is not None:
508 result = _time_machine.original_strftime(format, t)
509 else:
510 result = _time_machine.original_strftime(format)
511 return result
513 def time(self) -> float:
514 result: float = _time_machine.original_time()
515 return result
517 def time_ns(self) -> int:
518 result: int = _time_machine.original_time_ns()
519 return result
522class _EscapeHatch:
523 def __init__(self) -> None:
524 self.datetime = _EscapeHatchDatetime()
525 self.time = _EscapeHatchTime()
527 def is_travelling(self) -> bool:
528 return bool(coordinates_stack)
531escape_hatch = _EscapeHatch()