Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/_pytest/pathlib.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

482 statements  

1from __future__ import annotations 

2 

3from collections.abc import Callable 

4from collections.abc import Iterable 

5from collections.abc import Iterator 

6import contextlib 

7from enum import Enum 

8from errno import EBADF 

9from errno import ELOOP 

10from errno import ENOENT 

11from errno import ENOTDIR 

12import fnmatch 

13from functools import partial 

14from importlib.machinery import ModuleSpec 

15from importlib.machinery import PathFinder 

16import importlib.util 

17import itertools 

18import os 

19from os.path import expanduser 

20from os.path import expandvars 

21from os.path import isabs 

22from os.path import sep 

23from pathlib import Path 

24from pathlib import PurePath 

25from posixpath import sep as posix_sep 

26import shutil 

27import sys 

28import types 

29from types import ModuleType 

30from typing import Any 

31from typing import TypeVar 

32import uuid 

33import warnings 

34 

35from _pytest.compat import assert_never 

36from _pytest.outcomes import skip 

37from _pytest.warning_types import PytestWarning 

38 

39 

40if sys.version_info < (3, 11): 

41 from importlib._bootstrap_external import _NamespaceLoader as NamespaceLoader 

42else: 

43 from importlib.machinery import NamespaceLoader 

44 

45LOCK_TIMEOUT = 60 * 60 * 24 * 3 

46 

47_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath) 

48 

49# The following function, variables and comments were 

50# copied from cpython 3.9 Lib/pathlib.py file. 

51 

52# EBADF - guard against macOS `stat` throwing EBADF 

53_IGNORED_ERRORS = (ENOENT, ENOTDIR, EBADF, ELOOP) 

54 

55_IGNORED_WINERRORS = ( 

56 21, # ERROR_NOT_READY - drive exists but is not accessible 

57 1921, # ERROR_CANT_RESOLVE_FILENAME - fix for broken symlink pointing to itself 

58) 

59 

60 

61def _ignore_error(exception: Exception) -> bool: 

62 return ( 

63 getattr(exception, "errno", None) in _IGNORED_ERRORS 

64 or getattr(exception, "winerror", None) in _IGNORED_WINERRORS 

65 ) 

66 

67 

68def get_lock_path(path: _AnyPurePath) -> _AnyPurePath: 

69 return path.joinpath(".lock") 

70 

71 

72def on_rm_rf_error( 

73 func: Callable[..., Any] | None, 

74 path: str, 

75 excinfo: BaseException 

76 | tuple[type[BaseException], BaseException, types.TracebackType | None], 

77 *, 

78 start_path: Path, 

79) -> bool: 

80 """Handle known read-only errors during rmtree. 

81 

82 The returned value is used only by our own tests. 

83 """ 

84 if isinstance(excinfo, BaseException): 

85 exc = excinfo 

86 else: 

87 exc = excinfo[1] 

88 

89 # Another process removed the file in the middle of the "rm_rf" (xdist for example). 

90 # More context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018 

91 if isinstance(exc, FileNotFoundError): 

92 return False 

93 

94 if not isinstance(exc, PermissionError): 

95 warnings.warn( 

96 PytestWarning(f"(rm_rf) error removing {path}\n{type(exc)}: {exc}") 

97 ) 

98 return False 

99 

100 if func not in (os.rmdir, os.remove, os.unlink): 

101 if func not in (os.open,): 

102 warnings.warn( 

103 PytestWarning( 

104 f"(rm_rf) unknown function {func} when removing {path}:\n{type(exc)}: {exc}" 

105 ) 

106 ) 

107 return False 

108 

109 # Chmod + retry. 

110 import stat 

111 

112 def chmod_rw(p: str) -> None: 

113 mode = os.stat(p).st_mode 

114 os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) 

115 

116 # For files, we need to recursively go upwards in the directories to 

117 # ensure they all are also writable. 

118 p = Path(path) 

119 if p.is_file(): 

120 for parent in p.parents: 

121 chmod_rw(str(parent)) 

122 # Stop when we reach the original path passed to rm_rf. 

123 if parent == start_path: 

124 break 

125 chmod_rw(str(path)) 

126 

127 func(path) 

128 return True 

129 

130 

131def ensure_extended_length_path(path: Path) -> Path: 

132 """Get the extended-length version of a path (Windows). 

133 

134 On Windows, by default, the maximum length of a path (MAX_PATH) is 260 

135 characters, and operations on paths longer than that fail. But it is possible 

136 to overcome this by converting the path to "extended-length" form before 

137 performing the operation: 

138 https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation 

139 

140 On Windows, this function returns the extended-length absolute version of path. 

141 On other platforms it returns path unchanged. 

142 """ 

143 if sys.platform.startswith("win32"): 

144 path = path.resolve() 

145 path = Path(get_extended_length_path_str(str(path))) 

146 return path 

147 

148 

149def get_extended_length_path_str(path: str) -> str: 

150 """Convert a path to a Windows extended length path.""" 

151 long_path_prefix = "\\\\?\\" 

152 unc_long_path_prefix = "\\\\?\\UNC\\" 

153 if path.startswith((long_path_prefix, unc_long_path_prefix)): 

154 return path 

155 # UNC 

156 if path.startswith("\\\\"): 

157 return unc_long_path_prefix + path[2:] 

158 return long_path_prefix + path 

159 

160 

161def rm_rf(path: Path) -> None: 

162 """Remove the path contents recursively, even if some elements 

163 are read-only.""" 

164 path = ensure_extended_length_path(path) 

165 onerror = partial(on_rm_rf_error, start_path=path) 

166 if sys.version_info >= (3, 12): 

167 shutil.rmtree(str(path), onexc=onerror) 

168 else: 

169 shutil.rmtree(str(path), onerror=onerror) 

170 

171 

172def find_prefixed(root: Path, prefix: str) -> Iterator[os.DirEntry[str]]: 

173 """Find all elements in root that begin with the prefix, case-insensitive.""" 

174 l_prefix = prefix.lower() 

175 for x in os.scandir(root): 

176 if x.name.lower().startswith(l_prefix): 

177 yield x 

178 

179 

180def extract_suffixes(iter: Iterable[os.DirEntry[str]], prefix: str) -> Iterator[str]: 

181 """Return the parts of the paths following the prefix. 

182 

183 :param iter: Iterator over path names. 

184 :param prefix: Expected prefix of the path names. 

185 """ 

186 p_len = len(prefix) 

187 for entry in iter: 

188 yield entry.name[p_len:] 

189 

190 

191def find_suffixes(root: Path, prefix: str) -> Iterator[str]: 

192 """Combine find_prefixes and extract_suffixes.""" 

193 return extract_suffixes(find_prefixed(root, prefix), prefix) 

194 

195 

196def parse_num(maybe_num: str) -> int: 

197 """Parse number path suffixes, returns -1 on error.""" 

198 try: 

199 return int(maybe_num) 

200 except ValueError: 

201 return -1 

202 

203 

204def _force_symlink(root: Path, target: str | PurePath, link_to: str | Path) -> None: 

205 """Helper to create the current symlink. 

206 

207 It's full of race conditions that are reasonably OK to ignore 

208 for the context of best effort linking to the latest test run. 

209 

210 The presumption being that in case of much parallelism 

211 the inaccuracy is going to be acceptable. 

212 """ 

213 current_symlink = root.joinpath(target) 

214 try: 

215 current_symlink.unlink() 

216 except OSError: 

217 pass 

218 try: 

219 current_symlink.symlink_to(link_to) 

220 except Exception: 

221 pass 

222 

223 

224def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path: 

225 """Create a directory with an increased number as suffix for the given prefix.""" 

226 for i in range(10): 

227 # try up to 10 times to create the directory 

228 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) 

229 new_number = max_existing + 1 

230 new_path = root.joinpath(f"{prefix}{new_number}") 

231 try: 

232 new_path.mkdir(mode=mode) 

233 except Exception: 

234 pass 

235 else: 

236 _force_symlink(root, prefix + "current", new_path) 

237 return new_path 

238 else: 

239 raise OSError( 

240 "could not create numbered dir with prefix " 

241 f"{prefix} in {root} after 10 tries" 

242 ) 

243 

244 

245def create_cleanup_lock(p: Path) -> Path: 

246 """Create a lock to prevent premature directory cleanup.""" 

247 lock_path = get_lock_path(p) 

248 try: 

249 fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) 

250 except FileExistsError as e: 

251 raise OSError(f"cannot create lockfile in {p}") from e 

252 else: 

253 pid = os.getpid() 

254 spid = str(pid).encode() 

255 os.write(fd, spid) 

256 os.close(fd) 

257 if not lock_path.is_file(): 

258 raise OSError("lock path got renamed after successful creation") 

259 return lock_path 

260 

261 

262def register_cleanup_lock_removal(lock_path: Path, register: Any) -> Any: 

263 """Register a cleanup function for removing a lock.""" 

264 pid = os.getpid() 

265 

266 def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None: 

267 current_pid = os.getpid() 

268 if current_pid != original_pid: 

269 # fork 

270 return 

271 try: 

272 lock_path.unlink() 

273 except OSError: 

274 pass 

275 

276 return register(cleanup_on_exit) 

277 

278 

279def maybe_delete_a_numbered_dir(path: Path) -> None: 

280 """Remove a numbered directory if its lock can be obtained and it does 

281 not seem to be in use.""" 

282 path = ensure_extended_length_path(path) 

283 lock_path = None 

284 try: 

285 lock_path = create_cleanup_lock(path) 

286 parent = path.parent 

287 

288 garbage = parent.joinpath(f"garbage-{uuid.uuid4()}") 

289 path.rename(garbage) 

290 rm_rf(garbage) 

291 except OSError: 

292 # known races: 

293 # * other process did a cleanup at the same time 

294 # * deletable directory was found 

295 # * process cwd (Windows) 

296 return 

297 finally: 

298 # If we created the lock, ensure we remove it even if we failed 

299 # to properly remove the numbered dir. 

300 if lock_path is not None: 

301 try: 

302 lock_path.unlink() 

303 except OSError: 

304 pass 

305 

306 

307def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool: 

308 """Check if `path` is deletable based on whether the lock file is expired.""" 

309 if path.is_symlink(): 

310 return False 

311 lock = get_lock_path(path) 

312 try: 

313 if not lock.is_file(): 

314 return True 

315 except OSError: 

316 # we might not have access to the lock file at all, in this case assume 

317 # we don't have access to the entire directory (#7491). 

318 return False 

319 try: 

320 lock_time = lock.stat().st_mtime 

321 except Exception: 

322 return False 

323 else: 

324 if lock_time < consider_lock_dead_if_created_before: 

325 # We want to ignore any errors while trying to remove the lock such as: 

326 # - PermissionDenied, like the file permissions have changed since the lock creation; 

327 # - FileNotFoundError, in case another pytest process got here first; 

328 # and any other cause of failure. 

329 with contextlib.suppress(OSError): 

330 lock.unlink() 

331 return True 

332 return False 

333 

334 

335def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None: 

336 """Try to cleanup a directory if we can ensure it's deletable.""" 

337 if ensure_deletable(path, consider_lock_dead_if_created_before): 

338 maybe_delete_a_numbered_dir(path) 

339 

340 

341def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]: 

342 """List candidates for numbered directories to be removed - follows py.path.""" 

343 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) 

344 max_delete = max_existing - keep 

345 entries = find_prefixed(root, prefix) 

346 entries, entries2 = itertools.tee(entries) 

347 numbers = map(parse_num, extract_suffixes(entries2, prefix)) 

348 for entry, number in zip(entries, numbers, strict=True): 

349 if number <= max_delete: 

350 yield Path(entry) 

351 

352 

353def cleanup_dead_symlinks(root: Path) -> None: 

354 for left_dir in root.iterdir(): 

355 if left_dir.is_symlink(): 

356 if not left_dir.resolve().exists(): 

357 left_dir.unlink() 

358 

359 

360def cleanup_numbered_dir( 

361 root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float 

362) -> None: 

363 """Cleanup for lock driven numbered directories.""" 

364 if not root.exists(): 

365 return 

366 for path in cleanup_candidates(root, prefix, keep): 

367 try_cleanup(path, consider_lock_dead_if_created_before) 

368 for path in root.glob("garbage-*"): 

369 try_cleanup(path, consider_lock_dead_if_created_before) 

370 

371 cleanup_dead_symlinks(root) 

372 

373 

374def make_numbered_dir_with_cleanup( 

375 *, 

376 root: Path, 

377 prefix: str, 

378 mode: int, 

379 keep: int, 

380 lock_timeout: float, 

381 register: Any, 

382) -> Path: 

383 """Create a numbered dir and register its cleanup. 

384 

385 Similar to make_numbered_dir, but also maintains a lock file indicating that 

386 the directory is currently in use, and registers the cleanup of the lock and 

387 of stale numbered directories. 

388 

389 :param keep: 

390 The number of sessions to retain the directory. 

391 :param lock_timeout: 

392 In case of a crash, the lock remains "stuck". The timeout is a time 

393 limit after which the lock is considered stale and can be removed. 

394 :param register: 

395 Called as register(cleanup_func, params...). Should schedule to call 

396 passed cleanup functions on session finish. 

397 """ 

398 e = None 

399 for i in range(10): 

400 try: 

401 p = make_numbered_dir(root, prefix, mode) 

402 # Only lock the current dir when keep is not 0 

403 if keep != 0: 

404 lock_path = create_cleanup_lock(p) 

405 register_cleanup_lock_removal(lock_path, register) 

406 except Exception as exc: 

407 e = exc 

408 else: 

409 consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout 

410 # Register a cleanup for program exit 

411 register( 

412 cleanup_numbered_dir, 

413 root, 

414 prefix, 

415 keep, 

416 consider_lock_dead_if_created_before, 

417 ) 

418 return p 

419 assert e is not None 

420 raise e 

421 

422 

423def resolve_from_str(input: str, rootpath: Path) -> Path: 

424 input = expanduser(input) 

425 input = expandvars(input) 

426 if isabs(input): 

427 return Path(input) 

428 else: 

429 return rootpath.joinpath(input) 

430 

431 

432def fnmatch_ex(pattern: str, path: str | os.PathLike[str]) -> bool: 

433 """A port of FNMatcher from py.path.common which works with PurePath() instances. 

434 

435 The difference between this algorithm and PurePath.match() is that the 

436 latter matches "**" glob expressions for each part of the path, while 

437 this algorithm uses the whole path instead. 

438 

439 For example: 

440 "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" 

441 with this algorithm, but not with PurePath.match(). 

442 

443 This algorithm was ported to keep backward-compatibility with existing 

444 settings which assume paths match according this logic. 

445 

446 References: 

447 * https://bugs.python.org/issue29249 

448 * https://bugs.python.org/issue34731 

449 """ 

450 path = PurePath(path) 

451 iswin32 = sys.platform.startswith("win") 

452 

453 if iswin32 and sep not in pattern and posix_sep in pattern: 

454 # Running on Windows, the pattern has no Windows path separators, 

455 # and the pattern has one or more Posix path separators. Replace 

456 # the Posix path separators with the Windows path separator. 

457 pattern = pattern.replace(posix_sep, sep) 

458 

459 if sep not in pattern: 

460 name = path.name 

461 else: 

462 name = str(path) 

463 if path.is_absolute() and not os.path.isabs(pattern): 

464 pattern = f"*{os.sep}{pattern}" 

465 return fnmatch.fnmatch(name, pattern) 

466 

467 

468def parts(s: str) -> set[str]: 

469 parts = s.split(sep) 

470 return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))} 

471 

472 

473def symlink_or_skip( 

474 src: os.PathLike[str] | str, 

475 dst: os.PathLike[str] | str, 

476 **kwargs: Any, 

477) -> None: 

478 """Make a symlink, or skip the test in case symlinks are not supported.""" 

479 try: 

480 os.symlink(src, dst, **kwargs) 

481 except OSError as e: 

482 skip(f"symlinks not supported: {e}") 

483 

484 

485class ImportMode(Enum): 

486 """Possible values for `mode` parameter of `import_path`.""" 

487 

488 prepend = "prepend" 

489 append = "append" 

490 importlib = "importlib" 

491 

492 

493class ImportPathMismatchError(ImportError): 

494 """Raised on import_path() if there is a mismatch of __file__'s. 

495 

496 This can happen when `import_path` is called multiple times with different filenames that has 

497 the same basename but reside in packages 

498 (for example "/tests1/test_foo.py" and "/tests2/test_foo.py"). 

499 """ 

500 

501 

502def import_path( 

503 path: str | os.PathLike[str], 

504 *, 

505 mode: str | ImportMode = ImportMode.prepend, 

506 root: Path, 

507 consider_namespace_packages: bool, 

508) -> ModuleType: 

509 """ 

510 Import and return a module from the given path, which can be a file (a module) or 

511 a directory (a package). 

512 

513 :param path: 

514 Path to the file to import. 

515 

516 :param mode: 

517 Controls the underlying import mechanism that will be used: 

518 

519 * ImportMode.prepend: the directory containing the module (or package, taking 

520 `__init__.py` files into account) will be put at the *start* of `sys.path` before 

521 being imported with `importlib.import_module`. 

522 

523 * ImportMode.append: same as `prepend`, but the directory will be appended 

524 to the end of `sys.path`, if not already in `sys.path`. 

525 

526 * ImportMode.importlib: uses more fine control mechanisms provided by `importlib` 

527 to import the module, which avoids having to muck with `sys.path` at all. It effectively 

528 allows having same-named test modules in different places. 

529 

530 :param root: 

531 Used as an anchor when mode == ImportMode.importlib to obtain 

532 a unique name for the module being imported so it can safely be stored 

533 into ``sys.modules``. 

534 

535 :param consider_namespace_packages: 

536 If True, consider namespace packages when resolving module names. 

537 

538 :raises ImportPathMismatchError: 

539 If after importing the given `path` and the module `__file__` 

540 are different. Only raised in `prepend` and `append` modes. 

541 """ 

542 path = Path(path) 

543 mode = ImportMode(mode) 

544 

545 if not path.exists(): 

546 raise ImportError(path) 

547 

548 if mode is ImportMode.importlib: 

549 # Try to import this module using the standard import mechanisms, but 

550 # without touching sys.path. 

551 try: 

552 _, module_name = resolve_pkg_root_and_module_name( 

553 path, consider_namespace_packages=consider_namespace_packages 

554 ) 

555 except CouldNotResolvePathError: 

556 pass 

557 else: 

558 # If the given module name is already in sys.modules, do not import it again. 

559 with contextlib.suppress(KeyError): 

560 return sys.modules[module_name] 

561 

562 mod = _import_module_using_spec(module_name, path, insert_modules=False) 

563 if mod is not None: 

564 return mod 

565 

566 # Could not import the module with the current sys.path, so we fall back 

567 # to importing the file as a single module, not being a part of a package. 

568 module_name = module_name_from_path(path, root) 

569 with contextlib.suppress(KeyError): 

570 return sys.modules[module_name] 

571 

572 mod = _import_module_using_spec(module_name, path, insert_modules=True) 

573 if mod is None: 

574 raise ImportError(f"Can't find module {module_name} at location {path}") 

575 return mod 

576 

577 try: 

578 pkg_root, module_name = resolve_pkg_root_and_module_name( 

579 path, consider_namespace_packages=consider_namespace_packages 

580 ) 

581 except CouldNotResolvePathError: 

582 pkg_root, module_name = path.parent, path.stem 

583 

584 # Change sys.path permanently: restoring it at the end of this function would cause surprising 

585 # problems because of delayed imports: for example, a conftest.py file imported by this function 

586 # might have local imports, which would fail at runtime if we restored sys.path. 

587 if mode is ImportMode.append: 

588 if str(pkg_root) not in sys.path: 

589 sys.path.append(str(pkg_root)) 

590 elif mode is ImportMode.prepend: 

591 if str(pkg_root) != sys.path[0]: 

592 sys.path.insert(0, str(pkg_root)) 

593 else: 

594 assert_never(mode) 

595 

596 importlib.import_module(module_name) 

597 

598 mod = sys.modules[module_name] 

599 if path.name == "__init__.py": 

600 return mod 

601 

602 ignore = os.environ.get("PY_IGNORE_IMPORTMISMATCH", "") 

603 if ignore != "1": 

604 module_file = mod.__file__ 

605 if module_file is None: 

606 raise ImportPathMismatchError(module_name, module_file, path) 

607 

608 if module_file.endswith((".pyc", ".pyo")): 

609 module_file = module_file[:-1] 

610 if module_file.endswith(os.sep + "__init__.py"): 

611 module_file = module_file[: -(len(os.sep + "__init__.py"))] 

612 

613 try: 

614 is_same = _is_same(str(path), module_file) 

615 except FileNotFoundError: 

616 is_same = False 

617 

618 if not is_same: 

619 raise ImportPathMismatchError(module_name, module_file, path) 

620 

621 return mod 

622 

623 

624def _import_module_using_spec( 

625 module_name: str, module_path: Path, *, insert_modules: bool 

626) -> ModuleType | None: 

627 """ 

628 Tries to import a module by its canonical name, path, and its parent location. 

629 

630 :param module_name: 

631 The expected module name, will become the key of `sys.modules`. 

632 

633 :param module_path: 

634 The file path of the module, for example `/foo/bar/test_demo.py`. 

635 If module is a package, pass the path to the `__init__.py` of the package. 

636 If module is a namespace package, pass directory path. 

637 

638 :param insert_modules: 

639 If True, will call `insert_missing_modules` to create empty intermediate modules 

640 with made-up module names (when importing test files not reachable from `sys.path`). 

641 

642 Example 1 of parent_module_*: 

643 

644 module_name: "a.b.c.demo" 

645 module_path: Path("a/b/c/demo.py") 

646 if "a.b.c" is package ("a/b/c/__init__.py" exists), then 

647 parent_module_name: "a.b.c" 

648 parent_module_path: Path("a/b/c/__init__.py") 

649 else: 

650 parent_module_name: "a.b.c" 

651 parent_module_path: Path("a/b/c") 

652 

653 Example 2 of parent_module_*: 

654 

655 module_name: "a.b.c" 

656 module_path: Path("a/b/c/__init__.py") 

657 if "a.b" is package ("a/b/__init__.py" exists), then 

658 parent_module_name: "a.b" 

659 parent_module_path: Path("a/b/__init__.py") 

660 else: 

661 parent_module_name: "a.b" 

662 parent_module_path: Path("a/b/") 

663 """ 

664 # Attempt to import the parent module, seems is our responsibility: 

665 # https://github.com/python/cpython/blob/73906d5c908c1e0b73c5436faeff7d93698fc074/Lib/importlib/_bootstrap.py#L1308-L1311 

666 parent_module_name, _, name = module_name.rpartition(".") 

667 parent_module: ModuleType | None = None 

668 if parent_module_name: 

669 parent_module = sys.modules.get(parent_module_name) 

670 # If the parent_module lacks the `__path__` attribute, AttributeError when finding a submodule's spec, 

671 # requiring re-import according to the path. 

672 need_reimport = not hasattr(parent_module, "__path__") 

673 if parent_module is None or need_reimport: 

674 # Get parent_location based on location, get parent_path based on path. 

675 if module_path.name == "__init__.py": 

676 # If the current module is in a package, 

677 # need to leave the package first and then enter the parent module. 

678 parent_module_path = module_path.parent.parent 

679 else: 

680 parent_module_path = module_path.parent 

681 

682 if (parent_module_path / "__init__.py").is_file(): 

683 # If the parent module is a package, loading by __init__.py file. 

684 parent_module_path = parent_module_path / "__init__.py" 

685 

686 parent_module = _import_module_using_spec( 

687 parent_module_name, 

688 parent_module_path, 

689 insert_modules=insert_modules, 

690 ) 

691 

692 # Checking with sys.meta_path first in case one of its hooks can import this module, 

693 # such as our own assertion-rewrite hook. 

694 find_spec_path = [str(module_path.parent)] 

695 for meta_importer in sys.meta_path: 

696 spec = meta_importer.find_spec(module_name, find_spec_path) 

697 

698 if spec_matches_module_path(spec, module_path): 

699 break 

700 else: 

701 loader = None 

702 if module_path.is_dir(): 

703 # The `spec_from_file_location` matches a loader based on the file extension by default. 

704 # For a namespace package, need to manually specify a loader. 

705 loader = NamespaceLoader(name, module_path, PathFinder()) # type: ignore[arg-type] 

706 

707 spec = importlib.util.spec_from_file_location( 

708 module_name, str(module_path), loader=loader 

709 ) 

710 

711 if spec_matches_module_path(spec, module_path): 

712 assert spec is not None 

713 # Find spec and import this module. 

714 mod = importlib.util.module_from_spec(spec) 

715 sys.modules[module_name] = mod 

716 spec.loader.exec_module(mod) # type: ignore[union-attr] 

717 

718 # Set this module as an attribute of the parent module (#12194). 

719 if parent_module is not None: 

720 setattr(parent_module, name, mod) 

721 

722 if insert_modules: 

723 insert_missing_modules(sys.modules, module_name) 

724 return mod 

725 

726 return None 

727 

728 

729def spec_matches_module_path(module_spec: ModuleSpec | None, module_path: Path) -> bool: 

730 """Return true if the given ModuleSpec can be used to import the given module path.""" 

731 if module_spec is None: 

732 return False 

733 

734 if module_spec.origin: 

735 return Path(module_spec.origin) == module_path 

736 

737 # Compare the path with the `module_spec.submodule_Search_Locations` in case 

738 # the module is part of a namespace package. 

739 # https://docs.python.org/3/library/importlib.html#importlib.machinery.ModuleSpec.submodule_search_locations 

740 if module_spec.submodule_search_locations: # can be None. 

741 for path in module_spec.submodule_search_locations: 

742 if Path(path) == module_path: 

743 return True 

744 

745 return False 

746 

747 

748# Implement a special _is_same function on Windows which returns True if the two filenames 

749# compare equal, to circumvent os.path.samefile returning False for mounts in UNC (#7678). 

750if sys.platform.startswith("win"): 

751 

752 def _is_same(f1: str, f2: str) -> bool: 

753 return Path(f1) == Path(f2) or os.path.samefile(f1, f2) 

754 

755else: 

756 

757 def _is_same(f1: str, f2: str) -> bool: 

758 return os.path.samefile(f1, f2) 

759 

760 

761def module_name_from_path(path: Path, root: Path) -> str: 

762 """ 

763 Return a dotted module name based on the given path, anchored on root. 

764 

765 For example: path="projects/src/tests/test_foo.py" and root="/projects", the 

766 resulting module name will be "src.tests.test_foo". 

767 """ 

768 path = path.with_suffix("") 

769 try: 

770 relative_path = path.relative_to(root) 

771 except ValueError: 

772 # If we can't get a relative path to root, use the full path, except 

773 # for the first part ("d:\\" or "/" depending on the platform, for example). 

774 path_parts = path.parts[1:] 

775 else: 

776 # Use the parts for the relative path to the root path. 

777 path_parts = relative_path.parts 

778 

779 # Module name for packages do not contain the __init__ file, unless 

780 # the `__init__.py` file is at the root. 

781 if len(path_parts) >= 2 and path_parts[-1] == "__init__": 

782 path_parts = path_parts[:-1] 

783 

784 # Module names cannot contain ".", normalize them to "_". This prevents 

785 # a directory having a "." in the name (".env.310" for example) causing extra intermediate modules. 

786 # Also, important to replace "." at the start of paths, as those are considered relative imports. 

787 path_parts = tuple(x.replace(".", "_") for x in path_parts) 

788 

789 return ".".join(path_parts) 

790 

791 

792def insert_missing_modules(modules: dict[str, ModuleType], module_name: str) -> None: 

793 """ 

794 Used by ``import_path`` to create intermediate modules when using mode=importlib. 

795 

796 When we want to import a module as "src.tests.test_foo" for example, we need 

797 to create empty modules "src" and "src.tests" after inserting "src.tests.test_foo", 

798 otherwise "src.tests.test_foo" is not importable by ``__import__``. 

799 """ 

800 module_parts = module_name.split(".") 

801 while module_name: 

802 parent_module_name, _, child_name = module_name.rpartition(".") 

803 if parent_module_name: 

804 parent_module = modules.get(parent_module_name) 

805 if parent_module is None: 

806 try: 

807 # If sys.meta_path is empty, calling import_module will issue 

808 # a warning and raise ModuleNotFoundError. To avoid the 

809 # warning, we check sys.meta_path explicitly and raise the error 

810 # ourselves to fall back to creating a dummy module. 

811 if not sys.meta_path: 

812 raise ModuleNotFoundError 

813 parent_module = importlib.import_module(parent_module_name) 

814 except ModuleNotFoundError: 

815 parent_module = ModuleType( 

816 module_name, 

817 doc="Empty module created by pytest's importmode=importlib.", 

818 ) 

819 modules[parent_module_name] = parent_module 

820 

821 # Add child attribute to the parent that can reference the child 

822 # modules. 

823 if not hasattr(parent_module, child_name): 

824 setattr(parent_module, child_name, modules[module_name]) 

825 

826 module_parts.pop(-1) 

827 module_name = ".".join(module_parts) 

828 

829 

830def resolve_package_path(path: Path) -> Path | None: 

831 """Return the Python package path by looking for the last 

832 directory upwards which still contains an __init__.py. 

833 

834 Returns None if it cannot be determined. 

835 """ 

836 result = None 

837 for parent in itertools.chain((path,), path.parents): 

838 if parent.is_dir(): 

839 if not (parent / "__init__.py").is_file(): 

840 break 

841 if not parent.name.isidentifier(): 

842 break 

843 result = parent 

844 return result 

845 

846 

847def resolve_pkg_root_and_module_name( 

848 path: Path, *, consider_namespace_packages: bool = False 

849) -> tuple[Path, str]: 

850 """ 

851 Return the path to the directory of the root package that contains the 

852 given Python file, and its module name: 

853 

854 src/ 

855 app/ 

856 __init__.py 

857 core/ 

858 __init__.py 

859 models.py 

860 

861 Passing the full path to `models.py` will yield Path("src") and "app.core.models". 

862 

863 If consider_namespace_packages is True, then we additionally check upwards in the hierarchy 

864 for namespace packages: 

865 

866 https://packaging.python.org/en/latest/guides/packaging-namespace-packages 

867 

868 Raises CouldNotResolvePathError if the given path does not belong to a package (missing any __init__.py files). 

869 """ 

870 pkg_root: Path | None = None 

871 pkg_path = resolve_package_path(path) 

872 if pkg_path is not None: 

873 pkg_root = pkg_path.parent 

874 if consider_namespace_packages: 

875 start = pkg_root if pkg_root is not None else path.parent 

876 for candidate in (start, *start.parents): 

877 module_name = compute_module_name(candidate, path) 

878 if module_name and is_importable(module_name, path): 

879 # Point the pkg_root to the root of the namespace package. 

880 pkg_root = candidate 

881 break 

882 

883 if pkg_root is not None: 

884 module_name = compute_module_name(pkg_root, path) 

885 if module_name: 

886 return pkg_root, module_name 

887 

888 raise CouldNotResolvePathError(f"Could not resolve for {path}") 

889 

890 

891def is_importable(module_name: str, module_path: Path) -> bool: 

892 """ 

893 Return if the given module path could be imported normally by Python, akin to the user 

894 entering the REPL and importing the corresponding module name directly, and corresponds 

895 to the module_path specified. 

896 

897 :param module_name: 

898 Full module name that we want to check if is importable. 

899 For example, "app.models". 

900 

901 :param module_path: 

902 Full path to the python module/package we want to check if is importable. 

903 For example, "/projects/src/app/models.py". 

904 """ 

905 try: 

906 # Note this is different from what we do in ``_import_module_using_spec``, where we explicitly search through 

907 # sys.meta_path to be able to pass the path of the module that we want to import (``meta_importer.find_spec``). 

908 # Using importlib.util.find_spec() is different, it gives the same results as trying to import 

909 # the module normally in the REPL. 

910 spec = importlib.util.find_spec(module_name) 

911 except (ImportError, ValueError, ImportWarning): 

912 return False 

913 else: 

914 return spec_matches_module_path(spec, module_path) 

915 

916 

917def compute_module_name(root: Path, module_path: Path) -> str | None: 

918 """Compute a module name based on a path and a root anchor.""" 

919 try: 

920 path_without_suffix = module_path.with_suffix("") 

921 except ValueError: 

922 # Empty paths (such as Path.cwd()) might break meta_path hooks (like our own assertion rewriter). 

923 return None 

924 

925 try: 

926 relative = path_without_suffix.relative_to(root) 

927 except ValueError: # pragma: no cover 

928 return None 

929 names = list(relative.parts) 

930 if not names: 

931 return None 

932 if names[-1] == "__init__": 

933 names.pop() 

934 return ".".join(names) 

935 

936 

937class CouldNotResolvePathError(Exception): 

938 """Custom exception raised by resolve_pkg_root_and_module_name.""" 

939 

940 

941def scandir( 

942 path: str | os.PathLike[str], 

943 sort_key: Callable[[os.DirEntry[str]], object] = lambda entry: entry.name, 

944) -> list[os.DirEntry[str]]: 

945 """Scan a directory recursively, in breadth-first order. 

946 

947 The returned entries are sorted according to the given key. 

948 The default is to sort by name. 

949 If the directory does not exist, return an empty list. 

950 """ 

951 entries = [] 

952 # Attempt to create a scandir iterator for the given path. 

953 try: 

954 scandir_iter = os.scandir(path) 

955 except FileNotFoundError: 

956 # If the directory does not exist, return an empty list. 

957 return [] 

958 # Use the scandir iterator in a context manager to ensure it is properly closed. 

959 with scandir_iter as s: 

960 for entry in s: 

961 try: 

962 entry.is_file() 

963 except OSError as err: 

964 if _ignore_error(err): 

965 continue 

966 # Reraise non-ignorable errors to avoid hiding issues. 

967 raise 

968 entries.append(entry) 

969 entries.sort(key=sort_key) # type: ignore[arg-type] 

970 return entries 

971 

972 

973def visit( 

974 path: str | os.PathLike[str], recurse: Callable[[os.DirEntry[str]], bool] 

975) -> Iterator[os.DirEntry[str]]: 

976 """Walk a directory recursively, in breadth-first order. 

977 

978 The `recurse` predicate determines whether a directory is recursed. 

979 

980 Entries at each directory level are sorted. 

981 """ 

982 entries = scandir(path) 

983 yield from entries 

984 for entry in entries: 

985 if entry.is_dir() and recurse(entry): 

986 yield from visit(entry.path, recurse) 

987 

988 

989def absolutepath(path: str | os.PathLike[str]) -> Path: 

990 """Convert a path to an absolute path using os.path.abspath. 

991 

992 Prefer this over Path.resolve() (see #6523). 

993 Prefer this over Path.absolute() (not public, doesn't normalize). 

994 """ 

995 return Path(os.path.abspath(path)) 

996 

997 

998def commonpath(path1: Path, path2: Path) -> Path | None: 

999 """Return the common part shared with the other path, or None if there is 

1000 no common part. 

1001 

1002 If one path is relative and one is absolute, returns None. 

1003 """ 

1004 try: 

1005 return Path(os.path.commonpath((str(path1), str(path2)))) 

1006 except ValueError: 

1007 return None 

1008 

1009 

1010def bestrelpath(directory: Path, dest: Path) -> str: 

1011 """Return a string which is a relative path from directory to dest such 

1012 that directory/bestrelpath == dest. 

1013 

1014 The paths must be either both absolute or both relative. 

1015 

1016 If no such path can be determined, returns dest. 

1017 """ 

1018 assert isinstance(directory, Path) 

1019 assert isinstance(dest, Path) 

1020 if dest == directory: 

1021 return os.curdir 

1022 # Find the longest common directory. 

1023 base = commonpath(directory, dest) 

1024 # Can be the case on Windows for two absolute paths on different drives. 

1025 # Can be the case for two relative paths without common prefix. 

1026 # Can be the case for a relative path and an absolute path. 

1027 if not base: 

1028 return str(dest) 

1029 reldirectory = directory.relative_to(base) 

1030 reldest = dest.relative_to(base) 

1031 return os.path.join( 

1032 # Back from directory to base. 

1033 *([os.pardir] * len(reldirectory.parts)), 

1034 # Forward from base to dest. 

1035 *reldest.parts, 

1036 ) 

1037 

1038 

1039def safe_exists(p: Path) -> bool: 

1040 """Like Path.exists(), but account for input arguments that might be too long (#11394).""" 

1041 try: 

1042 return p.exists() 

1043 except (ValueError, OSError): 

1044 # ValueError: stat: path too long for Windows 

1045 # OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect 

1046 return False 

1047 

1048 

1049def samefile_nofollow(p1: Path, p2: Path) -> bool: 

1050 """Test whether two paths reference the same actual file or directory. 

1051 

1052 Unlike Path.samefile(), does not resolve symlinks. 

1053 """ 

1054 return os.path.samestat(p1.lstat(), p2.lstat())