Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/_pytest/pathlib.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

484 statements  

1from __future__ import annotations 

2 

3import atexit 

4from collections.abc import Callable 

5from collections.abc import Iterable 

6from collections.abc import Iterator 

7import contextlib 

8from enum import Enum 

9from errno import EBADF 

10from errno import ELOOP 

11from errno import ENOENT 

12from errno import ENOTDIR 

13import fnmatch 

14from functools import partial 

15from importlib.machinery import ModuleSpec 

16from importlib.machinery import PathFinder 

17import importlib.util 

18import itertools 

19import os 

20from os.path import expanduser 

21from os.path import expandvars 

22from os.path import isabs 

23from os.path import sep 

24from pathlib import Path 

25from pathlib import PurePath 

26from posixpath import sep as posix_sep 

27import shutil 

28import sys 

29import types 

30from types import ModuleType 

31from typing import Any 

32from typing import TypeVar 

33import uuid 

34import warnings 

35 

36from _pytest.compat import assert_never 

37from _pytest.outcomes import skip 

38from _pytest.warning_types import PytestWarning 

39 

40 

41if sys.version_info < (3, 11): 

42 from importlib._bootstrap_external import _NamespaceLoader as NamespaceLoader 

43else: 

44 from importlib.machinery import NamespaceLoader 

45 

46LOCK_TIMEOUT = 60 * 60 * 24 * 3 

47 

48_AnyPurePath = TypeVar("_AnyPurePath", bound=PurePath) 

49 

50# The following function, variables and comments were 

51# copied from cpython 3.9 Lib/pathlib.py file. 

52 

53# EBADF - guard against macOS `stat` throwing EBADF 

54_IGNORED_ERRORS = (ENOENT, ENOTDIR, EBADF, ELOOP) 

55 

56_IGNORED_WINERRORS = ( 

57 21, # ERROR_NOT_READY - drive exists but is not accessible 

58 1921, # ERROR_CANT_RESOLVE_FILENAME - fix for broken symlink pointing to itself 

59) 

60 

61 

62def _ignore_error(exception: Exception) -> bool: 

63 return ( 

64 getattr(exception, "errno", None) in _IGNORED_ERRORS 

65 or getattr(exception, "winerror", None) in _IGNORED_WINERRORS 

66 ) 

67 

68 

69def get_lock_path(path: _AnyPurePath) -> _AnyPurePath: 

70 return path.joinpath(".lock") 

71 

72 

73def on_rm_rf_error( 

74 func: Callable[..., Any] | None, 

75 path: str, 

76 excinfo: BaseException 

77 | tuple[type[BaseException], BaseException, types.TracebackType | None], 

78 *, 

79 start_path: Path, 

80) -> bool: 

81 """Handle known read-only errors during rmtree. 

82 

83 The returned value is used only by our own tests. 

84 """ 

85 if isinstance(excinfo, BaseException): 

86 exc = excinfo 

87 else: 

88 exc = excinfo[1] 

89 

90 # Another process removed the file in the middle of the "rm_rf" (xdist for example). 

91 # More context: https://github.com/pytest-dev/pytest/issues/5974#issuecomment-543799018 

92 if isinstance(exc, FileNotFoundError): 

93 return False 

94 

95 if not isinstance(exc, PermissionError): 

96 warnings.warn( 

97 PytestWarning(f"(rm_rf) error removing {path}\n{type(exc)}: {exc}") 

98 ) 

99 return False 

100 

101 if func not in (os.rmdir, os.remove, os.unlink): 

102 if func not in (os.open,): 

103 warnings.warn( 

104 PytestWarning( 

105 f"(rm_rf) unknown function {func} when removing {path}:\n{type(exc)}: {exc}" 

106 ) 

107 ) 

108 return False 

109 

110 # Chmod + retry. 

111 import stat 

112 

113 def chmod_rw(p: str) -> None: 

114 mode = os.stat(p).st_mode 

115 os.chmod(p, mode | stat.S_IRUSR | stat.S_IWUSR) 

116 

117 # For files, we need to recursively go upwards in the directories to 

118 # ensure they all are also writable. 

119 p = Path(path) 

120 if p.is_file(): 

121 for parent in p.parents: 

122 chmod_rw(str(parent)) 

123 # Stop when we reach the original path passed to rm_rf. 

124 if parent == start_path: 

125 break 

126 chmod_rw(str(path)) 

127 

128 func(path) 

129 return True 

130 

131 

132def ensure_extended_length_path(path: Path) -> Path: 

133 """Get the extended-length version of a path (Windows). 

134 

135 On Windows, by default, the maximum length of a path (MAX_PATH) is 260 

136 characters, and operations on paths longer than that fail. But it is possible 

137 to overcome this by converting the path to "extended-length" form before 

138 performing the operation: 

139 https://docs.microsoft.com/en-us/windows/win32/fileio/naming-a-file#maximum-path-length-limitation 

140 

141 On Windows, this function returns the extended-length absolute version of path. 

142 On other platforms it returns path unchanged. 

143 """ 

144 if sys.platform.startswith("win32"): 

145 path = path.resolve() 

146 path = Path(get_extended_length_path_str(str(path))) 

147 return path 

148 

149 

150def get_extended_length_path_str(path: str) -> str: 

151 """Convert a path to a Windows extended length path.""" 

152 long_path_prefix = "\\\\?\\" 

153 unc_long_path_prefix = "\\\\?\\UNC\\" 

154 if path.startswith((long_path_prefix, unc_long_path_prefix)): 

155 return path 

156 # UNC 

157 if path.startswith("\\\\"): 

158 return unc_long_path_prefix + path[2:] 

159 return long_path_prefix + path 

160 

161 

162def rm_rf(path: Path) -> None: 

163 """Remove the path contents recursively, even if some elements 

164 are read-only.""" 

165 path = ensure_extended_length_path(path) 

166 onerror = partial(on_rm_rf_error, start_path=path) 

167 if sys.version_info >= (3, 12): 

168 shutil.rmtree(str(path), onexc=onerror) 

169 else: 

170 shutil.rmtree(str(path), onerror=onerror) 

171 

172 

173def find_prefixed(root: Path, prefix: str) -> Iterator[os.DirEntry[str]]: 

174 """Find all elements in root that begin with the prefix, case-insensitive.""" 

175 l_prefix = prefix.lower() 

176 for x in os.scandir(root): 

177 if x.name.lower().startswith(l_prefix): 

178 yield x 

179 

180 

181def extract_suffixes(iter: Iterable[os.DirEntry[str]], prefix: str) -> Iterator[str]: 

182 """Return the parts of the paths following the prefix. 

183 

184 :param iter: Iterator over path names. 

185 :param prefix: Expected prefix of the path names. 

186 """ 

187 p_len = len(prefix) 

188 for entry in iter: 

189 yield entry.name[p_len:] 

190 

191 

192def find_suffixes(root: Path, prefix: str) -> Iterator[str]: 

193 """Combine find_prefixes and extract_suffixes.""" 

194 return extract_suffixes(find_prefixed(root, prefix), prefix) 

195 

196 

197def parse_num(maybe_num: str) -> int: 

198 """Parse number path suffixes, returns -1 on error.""" 

199 try: 

200 return int(maybe_num) 

201 except ValueError: 

202 return -1 

203 

204 

205def _force_symlink(root: Path, target: str | PurePath, link_to: str | Path) -> None: 

206 """Helper to create the current symlink. 

207 

208 It's full of race conditions that are reasonably OK to ignore 

209 for the context of best effort linking to the latest test run. 

210 

211 The presumption being that in case of much parallelism 

212 the inaccuracy is going to be acceptable. 

213 """ 

214 current_symlink = root.joinpath(target) 

215 try: 

216 current_symlink.unlink() 

217 except OSError: 

218 pass 

219 try: 

220 current_symlink.symlink_to(link_to) 

221 except Exception: 

222 pass 

223 

224 

225def make_numbered_dir(root: Path, prefix: str, mode: int = 0o700) -> Path: 

226 """Create a directory with an increased number as suffix for the given prefix.""" 

227 for i in range(10): 

228 # try up to 10 times to create the folder 

229 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) 

230 new_number = max_existing + 1 

231 new_path = root.joinpath(f"{prefix}{new_number}") 

232 try: 

233 new_path.mkdir(mode=mode) 

234 except Exception: 

235 pass 

236 else: 

237 _force_symlink(root, prefix + "current", new_path) 

238 return new_path 

239 else: 

240 raise OSError( 

241 "could not create numbered dir with prefix " 

242 f"{prefix} in {root} after 10 tries" 

243 ) 

244 

245 

246def create_cleanup_lock(p: Path) -> Path: 

247 """Create a lock to prevent premature folder cleanup.""" 

248 lock_path = get_lock_path(p) 

249 try: 

250 fd = os.open(str(lock_path), os.O_WRONLY | os.O_CREAT | os.O_EXCL, 0o644) 

251 except FileExistsError as e: 

252 raise OSError(f"cannot create lockfile in {p}") from e 

253 else: 

254 pid = os.getpid() 

255 spid = str(pid).encode() 

256 os.write(fd, spid) 

257 os.close(fd) 

258 if not lock_path.is_file(): 

259 raise OSError("lock path got renamed after successful creation") 

260 return lock_path 

261 

262 

263def register_cleanup_lock_removal( 

264 lock_path: Path, register: Any = atexit.register 

265) -> Any: 

266 """Register a cleanup function for removing a lock, by default on atexit.""" 

267 pid = os.getpid() 

268 

269 def cleanup_on_exit(lock_path: Path = lock_path, original_pid: int = pid) -> None: 

270 current_pid = os.getpid() 

271 if current_pid != original_pid: 

272 # fork 

273 return 

274 try: 

275 lock_path.unlink() 

276 except OSError: 

277 pass 

278 

279 return register(cleanup_on_exit) 

280 

281 

282def maybe_delete_a_numbered_dir(path: Path) -> None: 

283 """Remove a numbered directory if its lock can be obtained and it does 

284 not seem to be in use.""" 

285 path = ensure_extended_length_path(path) 

286 lock_path = None 

287 try: 

288 lock_path = create_cleanup_lock(path) 

289 parent = path.parent 

290 

291 garbage = parent.joinpath(f"garbage-{uuid.uuid4()}") 

292 path.rename(garbage) 

293 rm_rf(garbage) 

294 except OSError: 

295 # known races: 

296 # * other process did a cleanup at the same time 

297 # * deletable folder was found 

298 # * process cwd (Windows) 

299 return 

300 finally: 

301 # If we created the lock, ensure we remove it even if we failed 

302 # to properly remove the numbered dir. 

303 if lock_path is not None: 

304 try: 

305 lock_path.unlink() 

306 except OSError: 

307 pass 

308 

309 

310def ensure_deletable(path: Path, consider_lock_dead_if_created_before: float) -> bool: 

311 """Check if `path` is deletable based on whether the lock file is expired.""" 

312 if path.is_symlink(): 

313 return False 

314 lock = get_lock_path(path) 

315 try: 

316 if not lock.is_file(): 

317 return True 

318 except OSError: 

319 # we might not have access to the lock file at all, in this case assume 

320 # we don't have access to the entire directory (#7491). 

321 return False 

322 try: 

323 lock_time = lock.stat().st_mtime 

324 except Exception: 

325 return False 

326 else: 

327 if lock_time < consider_lock_dead_if_created_before: 

328 # We want to ignore any errors while trying to remove the lock such as: 

329 # - PermissionDenied, like the file permissions have changed since the lock creation; 

330 # - FileNotFoundError, in case another pytest process got here first; 

331 # and any other cause of failure. 

332 with contextlib.suppress(OSError): 

333 lock.unlink() 

334 return True 

335 return False 

336 

337 

338def try_cleanup(path: Path, consider_lock_dead_if_created_before: float) -> None: 

339 """Try to cleanup a folder if we can ensure it's deletable.""" 

340 if ensure_deletable(path, consider_lock_dead_if_created_before): 

341 maybe_delete_a_numbered_dir(path) 

342 

343 

344def cleanup_candidates(root: Path, prefix: str, keep: int) -> Iterator[Path]: 

345 """List candidates for numbered directories to be removed - follows py.path.""" 

346 max_existing = max(map(parse_num, find_suffixes(root, prefix)), default=-1) 

347 max_delete = max_existing - keep 

348 entries = find_prefixed(root, prefix) 

349 entries, entries2 = itertools.tee(entries) 

350 numbers = map(parse_num, extract_suffixes(entries2, prefix)) 

351 for entry, number in zip(entries, numbers): 

352 if number <= max_delete: 

353 yield Path(entry) 

354 

355 

356def cleanup_dead_symlinks(root: Path) -> None: 

357 for left_dir in root.iterdir(): 

358 if left_dir.is_symlink(): 

359 if not left_dir.resolve().exists(): 

360 left_dir.unlink() 

361 

362 

363def cleanup_numbered_dir( 

364 root: Path, prefix: str, keep: int, consider_lock_dead_if_created_before: float 

365) -> None: 

366 """Cleanup for lock driven numbered directories.""" 

367 if not root.exists(): 

368 return 

369 for path in cleanup_candidates(root, prefix, keep): 

370 try_cleanup(path, consider_lock_dead_if_created_before) 

371 for path in root.glob("garbage-*"): 

372 try_cleanup(path, consider_lock_dead_if_created_before) 

373 

374 cleanup_dead_symlinks(root) 

375 

376 

377def make_numbered_dir_with_cleanup( 

378 root: Path, 

379 prefix: str, 

380 keep: int, 

381 lock_timeout: float, 

382 mode: int, 

383) -> Path: 

384 """Create a numbered dir with a cleanup lock and remove old ones.""" 

385 e = None 

386 for i in range(10): 

387 try: 

388 p = make_numbered_dir(root, prefix, mode) 

389 # Only lock the current dir when keep is not 0 

390 if keep != 0: 

391 lock_path = create_cleanup_lock(p) 

392 register_cleanup_lock_removal(lock_path) 

393 except Exception as exc: 

394 e = exc 

395 else: 

396 consider_lock_dead_if_created_before = p.stat().st_mtime - lock_timeout 

397 # Register a cleanup for program exit 

398 atexit.register( 

399 cleanup_numbered_dir, 

400 root, 

401 prefix, 

402 keep, 

403 consider_lock_dead_if_created_before, 

404 ) 

405 return p 

406 assert e is not None 

407 raise e 

408 

409 

410def resolve_from_str(input: str, rootpath: Path) -> Path: 

411 input = expanduser(input) 

412 input = expandvars(input) 

413 if isabs(input): 

414 return Path(input) 

415 else: 

416 return rootpath.joinpath(input) 

417 

418 

419def fnmatch_ex(pattern: str, path: str | os.PathLike[str]) -> bool: 

420 """A port of FNMatcher from py.path.common which works with PurePath() instances. 

421 

422 The difference between this algorithm and PurePath.match() is that the 

423 latter matches "**" glob expressions for each part of the path, while 

424 this algorithm uses the whole path instead. 

425 

426 For example: 

427 "tests/foo/bar/doc/test_foo.py" matches pattern "tests/**/doc/test*.py" 

428 with this algorithm, but not with PurePath.match(). 

429 

430 This algorithm was ported to keep backward-compatibility with existing 

431 settings which assume paths match according this logic. 

432 

433 References: 

434 * https://bugs.python.org/issue29249 

435 * https://bugs.python.org/issue34731 

436 """ 

437 path = PurePath(path) 

438 iswin32 = sys.platform.startswith("win") 

439 

440 if iswin32 and sep not in pattern and posix_sep in pattern: 

441 # Running on Windows, the pattern has no Windows path separators, 

442 # and the pattern has one or more Posix path separators. Replace 

443 # the Posix path separators with the Windows path separator. 

444 pattern = pattern.replace(posix_sep, sep) 

445 

446 if sep not in pattern: 

447 name = path.name 

448 else: 

449 name = str(path) 

450 if path.is_absolute() and not os.path.isabs(pattern): 

451 pattern = f"*{os.sep}{pattern}" 

452 return fnmatch.fnmatch(name, pattern) 

453 

454 

455def parts(s: str) -> set[str]: 

456 parts = s.split(sep) 

457 return {sep.join(parts[: i + 1]) or sep for i in range(len(parts))} 

458 

459 

460def symlink_or_skip( 

461 src: os.PathLike[str] | str, 

462 dst: os.PathLike[str] | str, 

463 **kwargs: Any, 

464) -> None: 

465 """Make a symlink, or skip the test in case symlinks are not supported.""" 

466 try: 

467 os.symlink(src, dst, **kwargs) 

468 except OSError as e: 

469 skip(f"symlinks not supported: {e}") 

470 

471 

472class ImportMode(Enum): 

473 """Possible values for `mode` parameter of `import_path`.""" 

474 

475 prepend = "prepend" 

476 append = "append" 

477 importlib = "importlib" 

478 

479 

480class ImportPathMismatchError(ImportError): 

481 """Raised on import_path() if there is a mismatch of __file__'s. 

482 

483 This can happen when `import_path` is called multiple times with different filenames that has 

484 the same basename but reside in packages 

485 (for example "/tests1/test_foo.py" and "/tests2/test_foo.py"). 

486 """ 

487 

488 

489def import_path( 

490 path: str | os.PathLike[str], 

491 *, 

492 mode: str | ImportMode = ImportMode.prepend, 

493 root: Path, 

494 consider_namespace_packages: bool, 

495) -> ModuleType: 

496 """ 

497 Import and return a module from the given path, which can be a file (a module) or 

498 a directory (a package). 

499 

500 :param path: 

501 Path to the file to import. 

502 

503 :param mode: 

504 Controls the underlying import mechanism that will be used: 

505 

506 * ImportMode.prepend: the directory containing the module (or package, taking 

507 `__init__.py` files into account) will be put at the *start* of `sys.path` before 

508 being imported with `importlib.import_module`. 

509 

510 * ImportMode.append: same as `prepend`, but the directory will be appended 

511 to the end of `sys.path`, if not already in `sys.path`. 

512 

513 * ImportMode.importlib: uses more fine control mechanisms provided by `importlib` 

514 to import the module, which avoids having to muck with `sys.path` at all. It effectively 

515 allows having same-named test modules in different places. 

516 

517 :param root: 

518 Used as an anchor when mode == ImportMode.importlib to obtain 

519 a unique name for the module being imported so it can safely be stored 

520 into ``sys.modules``. 

521 

522 :param consider_namespace_packages: 

523 If True, consider namespace packages when resolving module names. 

524 

525 :raises ImportPathMismatchError: 

526 If after importing the given `path` and the module `__file__` 

527 are different. Only raised in `prepend` and `append` modes. 

528 """ 

529 path = Path(path) 

530 mode = ImportMode(mode) 

531 

532 if not path.exists(): 

533 raise ImportError(path) 

534 

535 if mode is ImportMode.importlib: 

536 # Try to import this module using the standard import mechanisms, but 

537 # without touching sys.path. 

538 try: 

539 pkg_root, module_name = resolve_pkg_root_and_module_name( 

540 path, consider_namespace_packages=consider_namespace_packages 

541 ) 

542 except CouldNotResolvePathError: 

543 pass 

544 else: 

545 # If the given module name is already in sys.modules, do not import it again. 

546 with contextlib.suppress(KeyError): 

547 return sys.modules[module_name] 

548 

549 mod = _import_module_using_spec( 

550 module_name, path, pkg_root, insert_modules=False 

551 ) 

552 if mod is not None: 

553 return mod 

554 

555 # Could not import the module with the current sys.path, so we fall back 

556 # to importing the file as a single module, not being a part of a package. 

557 module_name = module_name_from_path(path, root) 

558 with contextlib.suppress(KeyError): 

559 return sys.modules[module_name] 

560 

561 mod = _import_module_using_spec( 

562 module_name, path, path.parent, insert_modules=True 

563 ) 

564 if mod is None: 

565 raise ImportError(f"Can't find module {module_name} at location {path}") 

566 return mod 

567 

568 try: 

569 pkg_root, module_name = resolve_pkg_root_and_module_name( 

570 path, consider_namespace_packages=consider_namespace_packages 

571 ) 

572 except CouldNotResolvePathError: 

573 pkg_root, module_name = path.parent, path.stem 

574 

575 # Change sys.path permanently: restoring it at the end of this function would cause surprising 

576 # problems because of delayed imports: for example, a conftest.py file imported by this function 

577 # might have local imports, which would fail at runtime if we restored sys.path. 

578 if mode is ImportMode.append: 

579 if str(pkg_root) not in sys.path: 

580 sys.path.append(str(pkg_root)) 

581 elif mode is ImportMode.prepend: 

582 if str(pkg_root) != sys.path[0]: 

583 sys.path.insert(0, str(pkg_root)) 

584 else: 

585 assert_never(mode) 

586 

587 importlib.import_module(module_name) 

588 

589 mod = sys.modules[module_name] 

590 if path.name == "__init__.py": 

591 return mod 

592 

593 ignore = os.environ.get("PY_IGNORE_IMPORTMISMATCH", "") 

594 if ignore != "1": 

595 module_file = mod.__file__ 

596 if module_file is None: 

597 raise ImportPathMismatchError(module_name, module_file, path) 

598 

599 if module_file.endswith((".pyc", ".pyo")): 

600 module_file = module_file[:-1] 

601 if module_file.endswith(os.sep + "__init__.py"): 

602 module_file = module_file[: -(len(os.sep + "__init__.py"))] 

603 

604 try: 

605 is_same = _is_same(str(path), module_file) 

606 except FileNotFoundError: 

607 is_same = False 

608 

609 if not is_same: 

610 raise ImportPathMismatchError(module_name, module_file, path) 

611 

612 return mod 

613 

614 

615def _import_module_using_spec( 

616 module_name: str, module_path: Path, module_location: Path, *, insert_modules: bool 

617) -> ModuleType | None: 

618 """ 

619 Tries to import a module by its canonical name, path, and its parent location. 

620 

621 :param module_name: 

622 The expected module name, will become the key of `sys.modules`. 

623 

624 :param module_path: 

625 The file path of the module, for example `/foo/bar/test_demo.py`. 

626 If module is a package, pass the path to the `__init__.py` of the package. 

627 If module is a namespace package, pass directory path. 

628 

629 :param module_location: 

630 The parent location of the module. 

631 If module is a package, pass the directory containing the `__init__.py` file. 

632 

633 :param insert_modules: 

634 If True, will call `insert_missing_modules` to create empty intermediate modules 

635 with made-up module names (when importing test files not reachable from `sys.path`). 

636 

637 Example 1 of parent_module_*: 

638 

639 module_name: "a.b.c.demo" 

640 module_path: Path("a/b/c/demo.py") 

641 module_location: Path("a/b/c/") 

642 if "a.b.c" is package ("a/b/c/__init__.py" exists), then 

643 parent_module_name: "a.b.c" 

644 parent_module_path: Path("a/b/c/__init__.py") 

645 parent_module_location: Path("a/b/c/") 

646 else: 

647 parent_module_name: "a.b.c" 

648 parent_module_path: Path("a/b/c") 

649 parent_module_location: Path("a/b/") 

650 

651 Example 2 of parent_module_*: 

652 

653 module_name: "a.b.c" 

654 module_path: Path("a/b/c/__init__.py") 

655 module_location: Path("a/b/c/") 

656 if "a.b" is package ("a/b/__init__.py" exists), then 

657 parent_module_name: "a.b" 

658 parent_module_path: Path("a/b/__init__.py") 

659 parent_module_location: Path("a/b/") 

660 else: 

661 parent_module_name: "a.b" 

662 parent_module_path: Path("a/b/") 

663 parent_module_location: Path("a/") 

664 """ 

665 # Attempt to import the parent module, seems is our responsibility: 

666 # https://github.com/python/cpython/blob/73906d5c908c1e0b73c5436faeff7d93698fc074/Lib/importlib/_bootstrap.py#L1308-L1311 

667 parent_module_name, _, name = module_name.rpartition(".") 

668 parent_module: ModuleType | None = None 

669 if parent_module_name: 

670 parent_module = sys.modules.get(parent_module_name) 

671 # If the parent_module lacks the `__path__` attribute, AttributeError when finding a submodule's spec, 

672 # requiring re-import according to the path. 

673 need_reimport = not hasattr(parent_module, "__path__") 

674 if parent_module is None or need_reimport: 

675 # Get parent_location based on location, get parent_path based on path. 

676 if module_path.name == "__init__.py": 

677 # If the current module is in a package, 

678 # need to leave the package first and then enter the parent module. 

679 parent_module_path = module_path.parent.parent 

680 else: 

681 parent_module_path = module_path.parent 

682 

683 if (parent_module_path / "__init__.py").is_file(): 

684 # If the parent module is a package, loading by __init__.py file. 

685 parent_module_path = parent_module_path / "__init__.py" 

686 

687 parent_module = _import_module_using_spec( 

688 parent_module_name, 

689 parent_module_path, 

690 parent_module_path.parent, 

691 insert_modules=insert_modules, 

692 ) 

693 

694 # Checking with sys.meta_path first in case one of its hooks can import this module, 

695 # such as our own assertion-rewrite hook. 

696 for meta_importer in sys.meta_path: 

697 module_name_of_meta = getattr(meta_importer.__class__, "__module__", "") 

698 if module_name_of_meta == "_pytest.assertion.rewrite" and module_path.is_file(): 

699 # Import modules in subdirectories by module_path 

700 # to ensure assertion rewrites are not missed (#12659). 

701 find_spec_path = [str(module_location), str(module_path)] 

702 else: 

703 find_spec_path = [str(module_location)] 

704 

705 spec = meta_importer.find_spec(module_name, find_spec_path) 

706 

707 if spec_matches_module_path(spec, module_path): 

708 break 

709 else: 

710 loader = None 

711 if module_path.is_dir(): 

712 # The `spec_from_file_location` matches a loader based on the file extension by default. 

713 # For a namespace package, need to manually specify a loader. 

714 loader = NamespaceLoader(name, module_path, PathFinder()) # type: ignore[arg-type] 

715 

716 spec = importlib.util.spec_from_file_location( 

717 module_name, str(module_path), loader=loader 

718 ) 

719 

720 if spec_matches_module_path(spec, module_path): 

721 assert spec is not None 

722 # Find spec and import this module. 

723 mod = importlib.util.module_from_spec(spec) 

724 sys.modules[module_name] = mod 

725 spec.loader.exec_module(mod) # type: ignore[union-attr] 

726 

727 # Set this module as an attribute of the parent module (#12194). 

728 if parent_module is not None: 

729 setattr(parent_module, name, mod) 

730 

731 if insert_modules: 

732 insert_missing_modules(sys.modules, module_name) 

733 return mod 

734 

735 return None 

736 

737 

738def spec_matches_module_path(module_spec: ModuleSpec | None, module_path: Path) -> bool: 

739 """Return true if the given ModuleSpec can be used to import the given module path.""" 

740 if module_spec is None: 

741 return False 

742 

743 if module_spec.origin: 

744 return Path(module_spec.origin) == module_path 

745 

746 # Compare the path with the `module_spec.submodule_Search_Locations` in case 

747 # the module is part of a namespace package. 

748 # https://docs.python.org/3/library/importlib.html#importlib.machinery.ModuleSpec.submodule_search_locations 

749 if module_spec.submodule_search_locations: # can be None. 

750 for path in module_spec.submodule_search_locations: 

751 if Path(path) == module_path: 

752 return True 

753 

754 return False 

755 

756 

757# Implement a special _is_same function on Windows which returns True if the two filenames 

758# compare equal, to circumvent os.path.samefile returning False for mounts in UNC (#7678). 

759if sys.platform.startswith("win"): 

760 

761 def _is_same(f1: str, f2: str) -> bool: 

762 return Path(f1) == Path(f2) or os.path.samefile(f1, f2) 

763 

764else: 

765 

766 def _is_same(f1: str, f2: str) -> bool: 

767 return os.path.samefile(f1, f2) 

768 

769 

770def module_name_from_path(path: Path, root: Path) -> str: 

771 """ 

772 Return a dotted module name based on the given path, anchored on root. 

773 

774 For example: path="projects/src/tests/test_foo.py" and root="/projects", the 

775 resulting module name will be "src.tests.test_foo". 

776 """ 

777 path = path.with_suffix("") 

778 try: 

779 relative_path = path.relative_to(root) 

780 except ValueError: 

781 # If we can't get a relative path to root, use the full path, except 

782 # for the first part ("d:\\" or "/" depending on the platform, for example). 

783 path_parts = path.parts[1:] 

784 else: 

785 # Use the parts for the relative path to the root path. 

786 path_parts = relative_path.parts 

787 

788 # Module name for packages do not contain the __init__ file, unless 

789 # the `__init__.py` file is at the root. 

790 if len(path_parts) >= 2 and path_parts[-1] == "__init__": 

791 path_parts = path_parts[:-1] 

792 

793 # Module names cannot contain ".", normalize them to "_". This prevents 

794 # a directory having a "." in the name (".env.310" for example) causing extra intermediate modules. 

795 # Also, important to replace "." at the start of paths, as those are considered relative imports. 

796 path_parts = tuple(x.replace(".", "_") for x in path_parts) 

797 

798 return ".".join(path_parts) 

799 

800 

801def insert_missing_modules(modules: dict[str, ModuleType], module_name: str) -> None: 

802 """ 

803 Used by ``import_path`` to create intermediate modules when using mode=importlib. 

804 

805 When we want to import a module as "src.tests.test_foo" for example, we need 

806 to create empty modules "src" and "src.tests" after inserting "src.tests.test_foo", 

807 otherwise "src.tests.test_foo" is not importable by ``__import__``. 

808 """ 

809 module_parts = module_name.split(".") 

810 while module_name: 

811 parent_module_name, _, child_name = module_name.rpartition(".") 

812 if parent_module_name: 

813 parent_module = modules.get(parent_module_name) 

814 if parent_module is None: 

815 try: 

816 # If sys.meta_path is empty, calling import_module will issue 

817 # a warning and raise ModuleNotFoundError. To avoid the 

818 # warning, we check sys.meta_path explicitly and raise the error 

819 # ourselves to fall back to creating a dummy module. 

820 if not sys.meta_path: 

821 raise ModuleNotFoundError 

822 parent_module = importlib.import_module(parent_module_name) 

823 except ModuleNotFoundError: 

824 parent_module = ModuleType( 

825 module_name, 

826 doc="Empty module created by pytest's importmode=importlib.", 

827 ) 

828 modules[parent_module_name] = parent_module 

829 

830 # Add child attribute to the parent that can reference the child 

831 # modules. 

832 if not hasattr(parent_module, child_name): 

833 setattr(parent_module, child_name, modules[module_name]) 

834 

835 module_parts.pop(-1) 

836 module_name = ".".join(module_parts) 

837 

838 

839def resolve_package_path(path: Path) -> Path | None: 

840 """Return the Python package path by looking for the last 

841 directory upwards which still contains an __init__.py. 

842 

843 Returns None if it cannot be determined. 

844 """ 

845 result = None 

846 for parent in itertools.chain((path,), path.parents): 

847 if parent.is_dir(): 

848 if not (parent / "__init__.py").is_file(): 

849 break 

850 if not parent.name.isidentifier(): 

851 break 

852 result = parent 

853 return result 

854 

855 

856def resolve_pkg_root_and_module_name( 

857 path: Path, *, consider_namespace_packages: bool = False 

858) -> tuple[Path, str]: 

859 """ 

860 Return the path to the directory of the root package that contains the 

861 given Python file, and its module name: 

862 

863 src/ 

864 app/ 

865 __init__.py 

866 core/ 

867 __init__.py 

868 models.py 

869 

870 Passing the full path to `models.py` will yield Path("src") and "app.core.models". 

871 

872 If consider_namespace_packages is True, then we additionally check upwards in the hierarchy 

873 for namespace packages: 

874 

875 https://packaging.python.org/en/latest/guides/packaging-namespace-packages 

876 

877 Raises CouldNotResolvePathError if the given path does not belong to a package (missing any __init__.py files). 

878 """ 

879 pkg_root: Path | None = None 

880 pkg_path = resolve_package_path(path) 

881 if pkg_path is not None: 

882 pkg_root = pkg_path.parent 

883 if consider_namespace_packages: 

884 start = pkg_root if pkg_root is not None else path.parent 

885 for candidate in (start, *start.parents): 

886 module_name = compute_module_name(candidate, path) 

887 if module_name and is_importable(module_name, path): 

888 # Point the pkg_root to the root of the namespace package. 

889 pkg_root = candidate 

890 break 

891 

892 if pkg_root is not None: 

893 module_name = compute_module_name(pkg_root, path) 

894 if module_name: 

895 return pkg_root, module_name 

896 

897 raise CouldNotResolvePathError(f"Could not resolve for {path}") 

898 

899 

900def is_importable(module_name: str, module_path: Path) -> bool: 

901 """ 

902 Return if the given module path could be imported normally by Python, akin to the user 

903 entering the REPL and importing the corresponding module name directly, and corresponds 

904 to the module_path specified. 

905 

906 :param module_name: 

907 Full module name that we want to check if is importable. 

908 For example, "app.models". 

909 

910 :param module_path: 

911 Full path to the python module/package we want to check if is importable. 

912 For example, "/projects/src/app/models.py". 

913 """ 

914 try: 

915 # Note this is different from what we do in ``_import_module_using_spec``, where we explicitly search through 

916 # sys.meta_path to be able to pass the path of the module that we want to import (``meta_importer.find_spec``). 

917 # Using importlib.util.find_spec() is different, it gives the same results as trying to import 

918 # the module normally in the REPL. 

919 spec = importlib.util.find_spec(module_name) 

920 except (ImportError, ValueError, ImportWarning): 

921 return False 

922 else: 

923 return spec_matches_module_path(spec, module_path) 

924 

925 

926def compute_module_name(root: Path, module_path: Path) -> str | None: 

927 """Compute a module name based on a path and a root anchor.""" 

928 try: 

929 path_without_suffix = module_path.with_suffix("") 

930 except ValueError: 

931 # Empty paths (such as Path.cwd()) might break meta_path hooks (like our own assertion rewriter). 

932 return None 

933 

934 try: 

935 relative = path_without_suffix.relative_to(root) 

936 except ValueError: # pragma: no cover 

937 return None 

938 names = list(relative.parts) 

939 if not names: 

940 return None 

941 if names[-1] == "__init__": 

942 names.pop() 

943 return ".".join(names) 

944 

945 

946class CouldNotResolvePathError(Exception): 

947 """Custom exception raised by resolve_pkg_root_and_module_name.""" 

948 

949 

950def scandir( 

951 path: str | os.PathLike[str], 

952 sort_key: Callable[[os.DirEntry[str]], object] = lambda entry: entry.name, 

953) -> list[os.DirEntry[str]]: 

954 """Scan a directory recursively, in breadth-first order. 

955 

956 The returned entries are sorted according to the given key. 

957 The default is to sort by name. 

958 If the directory does not exist, return an empty list. 

959 """ 

960 entries = [] 

961 # Attempt to create a scandir iterator for the given path. 

962 try: 

963 scandir_iter = os.scandir(path) 

964 except FileNotFoundError: 

965 # If the directory does not exist, return an empty list. 

966 return [] 

967 # Use the scandir iterator in a context manager to ensure it is properly closed. 

968 with scandir_iter as s: 

969 for entry in s: 

970 try: 

971 entry.is_file() 

972 except OSError as err: 

973 if _ignore_error(err): 

974 continue 

975 # Reraise non-ignorable errors to avoid hiding issues. 

976 raise 

977 entries.append(entry) 

978 entries.sort(key=sort_key) # type: ignore[arg-type] 

979 return entries 

980 

981 

982def visit( 

983 path: str | os.PathLike[str], recurse: Callable[[os.DirEntry[str]], bool] 

984) -> Iterator[os.DirEntry[str]]: 

985 """Walk a directory recursively, in breadth-first order. 

986 

987 The `recurse` predicate determines whether a directory is recursed. 

988 

989 Entries at each directory level are sorted. 

990 """ 

991 entries = scandir(path) 

992 yield from entries 

993 for entry in entries: 

994 if entry.is_dir() and recurse(entry): 

995 yield from visit(entry.path, recurse) 

996 

997 

998def absolutepath(path: str | os.PathLike[str]) -> Path: 

999 """Convert a path to an absolute path using os.path.abspath. 

1000 

1001 Prefer this over Path.resolve() (see #6523). 

1002 Prefer this over Path.absolute() (not public, doesn't normalize). 

1003 """ 

1004 return Path(os.path.abspath(path)) 

1005 

1006 

1007def commonpath(path1: Path, path2: Path) -> Path | None: 

1008 """Return the common part shared with the other path, or None if there is 

1009 no common part. 

1010 

1011 If one path is relative and one is absolute, returns None. 

1012 """ 

1013 try: 

1014 return Path(os.path.commonpath((str(path1), str(path2)))) 

1015 except ValueError: 

1016 return None 

1017 

1018 

1019def bestrelpath(directory: Path, dest: Path) -> str: 

1020 """Return a string which is a relative path from directory to dest such 

1021 that directory/bestrelpath == dest. 

1022 

1023 The paths must be either both absolute or both relative. 

1024 

1025 If no such path can be determined, returns dest. 

1026 """ 

1027 assert isinstance(directory, Path) 

1028 assert isinstance(dest, Path) 

1029 if dest == directory: 

1030 return os.curdir 

1031 # Find the longest common directory. 

1032 base = commonpath(directory, dest) 

1033 # Can be the case on Windows for two absolute paths on different drives. 

1034 # Can be the case for two relative paths without common prefix. 

1035 # Can be the case for a relative path and an absolute path. 

1036 if not base: 

1037 return str(dest) 

1038 reldirectory = directory.relative_to(base) 

1039 reldest = dest.relative_to(base) 

1040 return os.path.join( 

1041 # Back from directory to base. 

1042 *([os.pardir] * len(reldirectory.parts)), 

1043 # Forward from base to dest. 

1044 *reldest.parts, 

1045 ) 

1046 

1047 

1048def safe_exists(p: Path) -> bool: 

1049 """Like Path.exists(), but account for input arguments that might be too long (#11394).""" 

1050 try: 

1051 return p.exists() 

1052 except (ValueError, OSError): 

1053 # ValueError: stat: path too long for Windows 

1054 # OSError: [WinError 123] The filename, directory name, or volume label syntax is incorrect 

1055 return False