Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/time_machine/__init__.py: 33%

305 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-30 06:11 +0000

1from __future__ import annotations 

2 

3import datetime as dt 

4import functools 

5import inspect 

6import os 

7import sys 

8import uuid 

9from collections.abc import Generator 

10from time import gmtime as orig_gmtime 

11from time import struct_time 

12from types import TracebackType 

13from typing import Any 

14from typing import Awaitable 

15from typing import Callable 

16from typing import cast 

17from typing import Generator as TypingGenerator 

18from typing import overload 

19from typing import Tuple 

20from typing import Type 

21from typing import TypeVar 

22from typing import Union 

23from unittest import mock 

24from unittest import TestCase 

25 

26import _time_machine 

27from dateutil.parser import parse as parse_datetime 

28 

29# time.clock_gettime and time.CLOCK_REALTIME not always available 

30# e.g. on builds against old macOS = official Python.org installer 

31try: 

32 from time import CLOCK_REALTIME 

33except ImportError: 

34 # Dummy value that won't compare equal to any value 

35 CLOCK_REALTIME = sys.maxsize 

36 

37try: 

38 from time import tzset 

39 

40 HAVE_TZSET = True 

41except ImportError: # pragma: no cover 

42 # Windows 

43 HAVE_TZSET = False 

44 

45if sys.version_info >= (3, 9): 

46 from zoneinfo import ZoneInfo 

47 

48 HAVE_ZONEINFO = True 

49else: 

50 try: 

51 from backports.zoneinfo import ZoneInfo 

52 

53 HAVE_ZONEINFO = True 

54 except ImportError: # pragma: no cover 

55 HAVE_ZONEINFO = False 

56 

57 

58try: 

59 import pytest 

60except ImportError: # pragma: no cover 

61 HAVE_PYTEST = False 

62else: 

63 HAVE_PYTEST = True 

64 

65NANOSECONDS_PER_SECOND = 1_000_000_000 

66 

67# Windows' time epoch is not unix epoch but in 1601. This constant helps us 

68# translate to it. 

69_system_epoch = orig_gmtime(0) 

70SYSTEM_EPOCH_TIMESTAMP_NS = int( 

71 dt.datetime( 

72 _system_epoch.tm_year, 

73 _system_epoch.tm_mon, 

74 _system_epoch.tm_mday, 

75 _system_epoch.tm_hour, 

76 _system_epoch.tm_min, 

77 _system_epoch.tm_sec, 

78 tzinfo=dt.timezone.utc, 

79 ).timestamp() 

80 * NANOSECONDS_PER_SECOND 

81) 

82 

83DestinationBaseType = Union[ 

84 int, 

85 float, 

86 dt.datetime, 

87 dt.timedelta, 

88 dt.date, 

89 str, 

90] 

91DestinationType = Union[ 

92 DestinationBaseType, 

93 Callable[[], DestinationBaseType], 

94 TypingGenerator[DestinationBaseType, None, None], 

95] 

96 

97_F = TypeVar("_F", bound=Callable[..., Any]) 

98_AF = TypeVar("_AF", bound=Callable[..., Awaitable[Any]]) 

99TestCaseType = TypeVar("TestCaseType", bound=Type[TestCase]) 

100 

101# copied from typeshed: 

102_TimeTuple = Tuple[int, int, int, int, int, int, int, int, int] 

103 

104 

105def extract_timestamp_tzname( 

106 destination: DestinationType, 

107) -> tuple[float, str | None]: 

108 dest: DestinationBaseType 

109 if isinstance(destination, Generator): 

110 dest = next(destination) 

111 elif callable(destination): 

112 dest = destination() 

113 else: 

114 dest = destination 

115 

116 timestamp: float 

117 tzname: str | None = None 

118 if isinstance(dest, int): 

119 timestamp = float(dest) 

120 elif isinstance(dest, float): 

121 timestamp = dest 

122 elif isinstance(dest, dt.datetime): 

123 if HAVE_ZONEINFO and isinstance(dest.tzinfo, ZoneInfo): 

124 tzname = dest.tzinfo.key 

125 if dest.tzinfo is None: 

126 dest = dest.replace(tzinfo=dt.timezone.utc) 

127 timestamp = dest.timestamp() 

128 elif isinstance(dest, dt.timedelta): 

129 timestamp = time() + dest.total_seconds() 

130 elif isinstance(dest, dt.date): 

131 timestamp = dt.datetime.combine( 

132 dest, dt.time(0, 0), tzinfo=dt.timezone.utc 

133 ).timestamp() 

134 elif isinstance(dest, str): 

135 timestamp = parse_datetime(dest).timestamp() 

136 else: 

137 raise TypeError(f"Unsupported destination {dest!r}") 

138 

139 return timestamp, tzname 

140 

141 

142class Coordinates: 

143 def __init__( 

144 self, 

145 destination_timestamp: float, 

146 destination_tzname: str | None, 

147 tick: bool, 

148 ) -> None: 

149 self._destination_timestamp_ns = int( 

150 destination_timestamp * NANOSECONDS_PER_SECOND 

151 ) 

152 self._destination_tzname = destination_tzname 

153 self._tick = tick 

154 self._requested = False 

155 

156 def time(self) -> float: 

157 return self.time_ns() / NANOSECONDS_PER_SECOND 

158 

159 def time_ns(self) -> int: 

160 if not self._tick: 

161 return self._destination_timestamp_ns 

162 

163 base = SYSTEM_EPOCH_TIMESTAMP_NS + self._destination_timestamp_ns 

164 now_ns: int = _time_machine.original_time_ns() 

165 

166 if not self._requested: 

167 self._requested = True 

168 self._real_start_timestamp_ns = now_ns 

169 return base 

170 

171 return base + (now_ns - self._real_start_timestamp_ns) 

172 

173 def shift(self, delta: dt.timedelta | int | float) -> None: 

174 if isinstance(delta, dt.timedelta): 

175 total_seconds = delta.total_seconds() 

176 elif isinstance(delta, (int, float)): 

177 total_seconds = delta 

178 else: 

179 raise TypeError(f"Unsupported type for delta argument: {delta!r}") 

180 

181 self._destination_timestamp_ns += int(total_seconds * NANOSECONDS_PER_SECOND) 

182 

183 def move_to( 

184 self, 

185 destination: DestinationType, 

186 tick: bool | None = None, 

187 ) -> None: 

188 self._stop() 

189 timestamp, self._destination_tzname = extract_timestamp_tzname(destination) 

190 self._destination_timestamp_ns = int(timestamp * NANOSECONDS_PER_SECOND) 

191 self._requested = False 

192 self._start() 

193 if tick is not None: 

194 self._tick = tick 

195 

196 def _start(self) -> None: 

197 if HAVE_TZSET and self._destination_tzname is not None: 

198 self._orig_tz = os.environ.get("TZ") 

199 os.environ["TZ"] = self._destination_tzname 

200 tzset() 

201 

202 def _stop(self) -> None: 

203 if HAVE_TZSET and self._destination_tzname is not None: 

204 if self._orig_tz is None: 

205 del os.environ["TZ"] 

206 else: 

207 os.environ["TZ"] = self._orig_tz 

208 tzset() 

209 

210 

211coordinates_stack: list[Coordinates] = [] 

212 

213# During time travel, patch the uuid module's time-based generation function to 

214# None, which makes it use time.time(). Otherwise it makes a system call to 

215# find the current datetime. The time it finds is stored in generated UUID1 

216# values. 

217uuid_generate_time_attr = "_generate_time_safe" 

218uuid_generate_time_patcher = mock.patch.object(uuid, uuid_generate_time_attr, new=None) 

219uuid_uuid_create_patcher = mock.patch.object(uuid, "_UuidCreate", new=None) 

220# We need to cause the functions to be loaded before we try patch them out, 

221# which is done by this internal function 

222uuid_idempotent_load_system_functions = ( 

223 uuid._load_system_functions # type: ignore[attr-defined] 

224) 

225 

226 

227class travel: 

228 def __init__(self, destination: DestinationType, *, tick: bool = True) -> None: 

229 self.destination_timestamp, self.destination_tzname = extract_timestamp_tzname( 

230 destination 

231 ) 

232 self.tick = tick 

233 

234 def start(self) -> Coordinates: 

235 global coordinates_stack 

236 

237 _time_machine.patch_if_needed() 

238 

239 if not coordinates_stack: 

240 uuid_idempotent_load_system_functions() 

241 uuid_generate_time_patcher.start() 

242 uuid_uuid_create_patcher.start() 

243 

244 coordinates = Coordinates( 

245 destination_timestamp=self.destination_timestamp, 

246 destination_tzname=self.destination_tzname, 

247 tick=self.tick, 

248 ) 

249 coordinates_stack.append(coordinates) 

250 coordinates._start() 

251 

252 return coordinates 

253 

254 def stop(self) -> None: 

255 global coordinates_stack 

256 coordinates_stack.pop()._stop() 

257 

258 if not coordinates_stack: 

259 uuid_generate_time_patcher.stop() 

260 uuid_uuid_create_patcher.stop() 

261 

262 def __enter__(self) -> Coordinates: 

263 return self.start() 

264 

265 def __exit__( 

266 self, 

267 exc_type: type[BaseException] | None, 

268 exc_val: BaseException | None, 

269 exc_tb: TracebackType | None, 

270 ) -> None: 

271 self.stop() 

272 

273 @overload 

274 def __call__(self, wrapped: TestCaseType) -> TestCaseType: # pragma: no cover 

275 ... 

276 

277 @overload 

278 def __call__(self, wrapped: _AF) -> _AF: # pragma: no cover 

279 ... 

280 

281 @overload 

282 def __call__(self, wrapped: _F) -> _F: # pragma: no cover 

283 ... 

284 

285 # 'Any' below is workaround for Mypy error: 

286 # Overloaded function implementation does not accept all possible arguments 

287 # of signature 

288 def __call__( 

289 self, wrapped: TestCaseType | _AF | _F | Any 

290 ) -> TestCaseType | _AF | _F | Any: 

291 if isinstance(wrapped, type): 

292 # Class decorator 

293 if not issubclass(wrapped, TestCase): 

294 raise TypeError("Can only decorate unittest.TestCase subclasses.") 

295 

296 # Modify the setUpClass method 

297 orig_setUpClass = wrapped.setUpClass 

298 

299 @functools.wraps(orig_setUpClass) 

300 def setUpClass(cls: type[TestCase]) -> None: 

301 self.__enter__() 

302 try: 

303 orig_setUpClass() 

304 except Exception: 

305 self.__exit__(*sys.exc_info()) 

306 raise 

307 

308 wrapped.setUpClass = classmethod(setUpClass) # type: ignore[assignment] 

309 

310 orig_tearDownClass = wrapped.tearDownClass 

311 

312 @functools.wraps(orig_tearDownClass) 

313 def tearDownClass(cls: type[TestCase]) -> None: 

314 orig_tearDownClass() 

315 self.__exit__(None, None, None) 

316 

317 wrapped.tearDownClass = classmethod( # type: ignore[assignment] 

318 tearDownClass 

319 ) 

320 return cast(TestCaseType, wrapped) 

321 elif inspect.iscoroutinefunction(wrapped): 

322 

323 @functools.wraps(wrapped) 

324 async def wrapper(*args: Any, **kwargs: Any) -> Any: 

325 with self: 

326 return await wrapped(*args, **kwargs) 

327 

328 return cast(_AF, wrapper) 

329 else: 

330 assert callable(wrapped) 

331 

332 @functools.wraps(wrapped) 

333 def wrapper(*args: Any, **kwargs: Any) -> Any: 

334 with self: 

335 return wrapped(*args, **kwargs) 

336 

337 return cast(_F, wrapper) 

338 

339 

340# datetime module 

341 

342 

343def now(tz: dt.tzinfo | None = None) -> dt.datetime: 

344 if not coordinates_stack: 

345 result: dt.datetime = _time_machine.original_now(tz) 

346 return result 

347 return dt.datetime.fromtimestamp(time(), tz) 

348 

349 

350def utcnow() -> dt.datetime: 

351 if not coordinates_stack: 

352 result: dt.datetime = _time_machine.original_utcnow() 

353 return result 

354 return dt.datetime.utcfromtimestamp(time()) 

355 

356 

357# time module 

358 

359 

360def clock_gettime(clk_id: int) -> float: 

361 if not coordinates_stack or clk_id != CLOCK_REALTIME: 

362 result: float = _time_machine.original_clock_gettime(clk_id) 

363 return result 

364 return time() 

365 

366 

367def clock_gettime_ns(clk_id: int) -> int: 

368 if not coordinates_stack or clk_id != CLOCK_REALTIME: 

369 result: int = _time_machine.original_clock_gettime_ns(clk_id) 

370 return result 

371 return time_ns() 

372 

373 

374def gmtime(secs: float | None = None) -> struct_time: 

375 result: struct_time 

376 if not coordinates_stack or secs is not None: 

377 result = _time_machine.original_gmtime(secs) 

378 else: 

379 result = _time_machine.original_gmtime(coordinates_stack[-1].time()) 

380 return result 

381 

382 

383def localtime(secs: float | None = None) -> struct_time: 

384 result: struct_time 

385 if not coordinates_stack or secs is not None: 

386 result = _time_machine.original_localtime(secs) 

387 else: 

388 result = _time_machine.original_localtime(coordinates_stack[-1].time()) 

389 return result 

390 

391 

392def strftime(format: str, t: _TimeTuple | struct_time | None = None) -> str: 

393 result: str 

394 if t is not None: 

395 result = _time_machine.original_strftime(format, t) 

396 elif not coordinates_stack: 

397 result = _time_machine.original_strftime(format) 

398 else: 

399 result = _time_machine.original_strftime(format, localtime()) 

400 return result 

401 

402 

403def time() -> float: 

404 if not coordinates_stack: 

405 result: float = _time_machine.original_time() 

406 return result 

407 return coordinates_stack[-1].time() 

408 

409 

410def time_ns() -> int: 

411 if not coordinates_stack: 

412 result: int = _time_machine.original_time_ns() 

413 return result 

414 return coordinates_stack[-1].time_ns() 

415 

416 

417# pytest plugin 

418 

419if HAVE_PYTEST: # pragma: no branch 

420 

421 class TimeMachineFixture: 

422 traveller: travel | None 

423 coordinates: Coordinates | None 

424 

425 def __init__(self) -> None: 

426 self.traveller = None 

427 self.coordinates = None 

428 

429 def move_to( 

430 self, 

431 destination: DestinationType, 

432 tick: bool | None = None, 

433 ) -> None: 

434 if self.traveller is None: 

435 if tick is None: 

436 tick = True 

437 self.traveller = travel(destination, tick=tick) 

438 self.coordinates = self.traveller.start() 

439 else: 

440 assert self.coordinates is not None 

441 self.coordinates.move_to(destination, tick=tick) 

442 

443 def shift(self, delta: dt.timedelta | int | float) -> None: 

444 if self.traveller is None: 

445 raise RuntimeError( 

446 "Initialize time_machine with move_to() before using shift()." 

447 ) 

448 assert self.coordinates is not None 

449 self.coordinates.shift(delta=delta) 

450 

451 def stop(self) -> None: 

452 if self.traveller is not None: 

453 self.traveller.stop() 

454 

455 @pytest.fixture(name="time_machine") 

456 def time_machine_fixture() -> TypingGenerator[TimeMachineFixture, None, None]: 

457 fixture = TimeMachineFixture() 

458 yield fixture 

459 fixture.stop() 

460 

461 

462# escape hatch 

463 

464 

465class _EscapeHatchDatetimeDatetime: 

466 def now(self, tz: dt.tzinfo | None = None) -> dt.datetime: 

467 result: dt.datetime = _time_machine.original_now(tz) 

468 return result 

469 

470 def utcnow(self) -> dt.datetime: 

471 result: dt.datetime = _time_machine.original_utcnow() 

472 return result 

473 

474 

475class _EscapeHatchDatetime: 

476 def __init__(self) -> None: 

477 self.datetime = _EscapeHatchDatetimeDatetime() 

478 

479 

480class _EscapeHatchTime: 

481 def clock_gettime(self, clk_id: int) -> float: 

482 result: float = _time_machine.original_clock_gettime(clk_id) 

483 return result 

484 

485 def clock_gettime_ns(self, clk_id: int) -> int: 

486 result: int = _time_machine.original_clock_gettime_ns(clk_id) 

487 return result 

488 

489 def gmtime(self, secs: float | None = None) -> struct_time: 

490 result: struct_time = _time_machine.original_gmtime(secs) 

491 return result 

492 

493 def localtime(self, secs: float | None = None) -> struct_time: 

494 result: struct_time = _time_machine.original_localtime(secs) 

495 return result 

496 

497 def monotonic(self) -> float: 

498 result: float = _time_machine.original_monotonic() 

499 return result 

500 

501 def monotonic_ns(self) -> int: 

502 result: int = _time_machine.original_monotonic_ns() 

503 return result 

504 

505 def strftime(self, format: str, t: _TimeTuple | struct_time | None = None) -> str: 

506 result: str 

507 if t is not None: 

508 result = _time_machine.original_strftime(format, t) 

509 else: 

510 result = _time_machine.original_strftime(format) 

511 return result 

512 

513 def time(self) -> float: 

514 result: float = _time_machine.original_time() 

515 return result 

516 

517 def time_ns(self) -> int: 

518 result: int = _time_machine.original_time_ns() 

519 return result 

520 

521 

522class _EscapeHatch: 

523 def __init__(self) -> None: 

524 self.datetime = _EscapeHatchDatetime() 

525 self.time = _EscapeHatchTime() 

526 

527 def is_travelling(self) -> bool: 

528 return bool(coordinates_stack) 

529 

530 

531escape_hatch = _EscapeHatch()